Claude Code Skill¶
Labtasker provides a skill definition for Claude Code. With it installed, Claude understands how to write submit scripts, run scripts, and task management scripts — in both CLI and Python — including metadata, filtering, updating, deleting, and reading task summaries.
Install¶
Create ~/.claude/skills/labtasker/SKILL.md with the following content:
---
name: labtasker
description: Use when helping users distribute ML experiment tasks, convert for-loop scripts into task queues, manage parallel GPU workers, handle failures/retries, or query/filter/update/delete experiment results with Labtasker
---
# Labtasker
## Overview
Labtasker replaces `for` loops in ML experiment scripts with a task queue, enabling parallelization, failure handling, and task management with minimal code changes.
**Links**: [GitHub](https://github.com/luocfprime/labtasker) | [Docs](https://luocfprime.github.io/labtasker/)
## Core Concepts
- **Queue**: Named task bucket; all tasks belong to one queue; workers pull from it.
- **Task fields**: `task_id`, `task_name`, `status` (pending/running/success/failed/cancelled), `args` (dict), `metadata` (dict), `summary` (dict, always a dict — never None — even if no metrics written), `priority`, `max_retries`, `retries`, `created_at`, `last_modified`, `start_time` (None until task starts running), `worker_id` (None until a worker claims the task).
- **Loop**: Continuously fetches pending tasks in descending priority order (highest `priority` value first); exits automatically when no more pending tasks match.
- **Retries**: `retries` starts at `0` on the first attempt. The loop auto-increments it on each task failure. When `retries` reaches `max_retries`, the task is permanently marked `failed`. Use `reset_pending=True` to reset both status and retries when re-queuing.
- **No More, No Less**: Submitted `args` keys must EXACTLY match what the run script/function declares as required. Extra = inconsistent records. Missing = fetch failure. Exception: with `pass_args_dict=True` + `required_fields`, only the listed keys are required — extra keys in `args` are accessible via `args.get(key, default)` for variable-arg tasks.
## Setup
```bash
pip install labtasker
labtasker-server serve & # start server
labtasker init # interactive config — creates .labtasker/config.toml
labtasker queue create-from-config
```
---
## 1. Submitting Tasks
### CLI
```bash
# Basic
labtasker task submit -- --lr=0.001 --model=resnet
# With name, metadata, priority, max-retries
labtasker task submit \
--name grid_search \
--metadata '{"tags": ["v1", "experimental"], "note": "baseline run"}' \
--priority 10 \
--max-retries 5 \
-- --lr=0.001 --model=resnet
# Batch submit in a loop
for lr in 0.001 0.01 0.1; do
for model in resnet vit; do
labtasker task submit \
--name grid_search \
--metadata '{"tags": ["v1"]}' \
-- --lr=$lr --model=$model
done
done
# Alternative: pass args as JSON dict string
labtasker task submit --args '{"arg1": 0, "arg2": 3}'
# Nested args via dot-notation — --foo.bar=1 becomes args["foo"]["bar"] = 1
labtasker task submit -- --optimizer.lr=0.01 --optimizer.momentum=0.9
```
Special characters in args — always use `--key=value` form:
```bash
labtasker task submit -- --value=-1 --label="" --desc="hello world"
```
### Python
```python
import labtasker
for lr in [0.001, 0.01, 0.1]:
for model in ["resnet", "vit"]:
labtasker.submit_task(
task_name="grid_search", # optional
args={"arg1": lr, "arg2": model},
metadata={ # optional — use for filtering/tagging
"tags": ["v1", "experimental"],
"note": "baseline run",
},
max_retries=3, # optional, default 3
priority=0, # optional, default 0 (medium)
)
# Nested args as dict (use Python loop to access nested values)
labtasker.submit_task(
args={
"args_a": {"a": 1, "b": "boy"},
"args_b": {"foo": 2, "bar": "baz"},
},
metadata={"tags": ["sweep-v2"]},
)
```
---
## 2. Running Tasks (Loop)
### CLI
```bash
# Basic — %(key) interpolates TOP-LEVEL task args at runtime
labtasker loop -- python train.py --lr '%(lr)' --model '%(model)'
# With metadata filter (only run tasks tagged "v1")
labtasker loop \
--extra-filter '"v1" in list(metadata.tags)' \
-- python train.py --lr '%(lr)' --model '%(model)'
# Env vars MUST go outside the loop command
CUDA_VISIBLE_DEVICES=0 labtasker loop -- python train.py --lr '%(lr)'
# Multiple parallel GPU workers — run the same command in the background per GPU
CUDA_VISIBLE_DEVICES=0 labtasker loop -- python train.py --lr '%(lr)' --model '%(model)' &
CUDA_VISIBLE_DEVICES=1 labtasker loop -- python train.py --lr '%(lr)' --model '%(model)' &
CUDA_VISIBLE_DEVICES=2 labtasker loop -- python train.py --lr '%(lr)' --model '%(model)' &
CUDA_VISIBLE_DEVICES=3 labtasker loop -- python train.py --lr '%(lr)' --model '%(model)' &
wait
# Complex scripts where %(var) appears inside multi-line logic — use --script-path
LABTASKER_TASK_SCRIPT=$(mktemp)
cat <<'EOF' > "$LABTASKER_TASK_SCRIPT"
LOG_DIR=/logs/%(dataset)/%(model)
python train.py --dataset %(dataset) --model %(model) --log-dir $LOG_DIR
EOF
labtasker loop --script-path $LABTASKER_TASK_SCRIPT
```
**`%(key)` accesses top-level args keys only.** If args has a nested dict (e.g., `{"optimizer": {"lr": 0.01}}`), use the Python loop instead — CLI cannot interpolate nested values.
### Python
```python
import labtasker
from labtasker import Required
# Required() fields are auto-filled from task args; types are cast automatically.
# Parameter name MUST match the args key exactly.
@labtasker.loop()
def main(arg1: int = Required(), arg2: int = Required()):
result = arg1 + arg2
print(f"The result is {result}")
if __name__ == "__main__":
main()
# With extra_filter (Python query string)
@labtasker.loop(
extra_filter='"v1" in list(metadata.tags)'
)
def main(lr: float = Required(), model: str = Required()):
train(lr=lr, model=model)
# Access args as dict instead of keyword args
@labtasker.loop(required_fields=["lr", "model"], pass_args_dict=True)
def main(args):
train(lr=args["lr"], model=args["model"])
# Nested args — param name must match the args key; resolver receives args[param_name]
from typing import Any, Dict
from typing_extensions import Annotated
from dataclasses import dataclass
@dataclass
class ArgsGroupA:
a: int
b: str
@dataclass
class ArgsGroupB:
foo: int
bar: str
@labtasker.loop()
def main(
args_a: Annotated[Dict[str, Any], Required(resolver=lambda a: ArgsGroupA(**a))],
args_b=Required(resolver=lambda b: ArgsGroupB(**b)),
):
print(f"got args_a: {args_a}")
print(f"got args_b: {args_b}")
# Simple nested dict access — use identity resolver to receive the raw dict
# (no dataclass needed; resolver receives args[param_name])
@labtasker.loop()
def main(optimizer=Required(resolver=lambda x: x)):
# optimizer is the raw dict: {"lr": 0.001, "momentum": 0.9}
lr = optimizer["lr"]
momentum = optimizer.get("momentum", 0.9)
```
The loop exits automatically when no more pending tasks match the filter.
**Dot-separated keys are nested**: `--foo.bar=1` in CLI becomes `args["foo"]["bar"] = 1` in Python. Access nested args correctly: `task_info().args["foo"]["bar"]`, not `task_info().args["foo.bar"]`.
---
## 3. Querying (Listing) Tasks
### CLI
```bash
# By status
labtasker task ls -s pending
labtasker task ls -s running
labtasker task ls -s failed
labtasker task ls -s success
# Python-native filter syntax
labtasker task ls -f 'args.lr > 0.01' -q --no-pager
labtasker task ls -f 'task_name == "grid_search"' -q --no-pager
labtasker task ls -f '"v1" in list(metadata.tags)' -q --no-pager
labtasker task ls -f 'created_at >= date("3 hours ago")' -q --no-pager
labtasker task ls -f 'args.lr > 0.01 and "v1" in list(metadata.tags)' -q --no-pager
labtasker task ls -f 'regex(task_name, "^grid-.*")' -q --no-pager
# Filter by summary fields — same dot-notation works on any field
labtasker task ls -s success -f 'summary.acc > 0.9' --no-pager
# Filter with OR
labtasker task ls -f 'status == "failed" or status == "pending"' --no-pager
# Sort results (CLI only — Python requires manual sorting)
labtasker task ls -s success -S 'created_at:desc' --no-pager
labtasker task ls -s success -S 'summary.acc:desc' --no-pager
# -q prints only task IDs (useful for piping); --no-pager disables pager
```
**Filter operators** — Python syntax: `==`, `>`, `<`, `>=`, `<=`, `in`, `and`, `or`, `regex()`, `date()`
**NOT supported**: `!=`, `not`, `not in` (three-valued logic). Workaround for status: use `-s STATUS` flag. For string negation: use `regex()`.
**`-s` and `-f` are ANDed**: using both narrows results to tasks matching both conditions.
**Default `--limit` is 100.** For batch operations (cancel/delete/update all matching tasks), always pass `--limit 10000` or a large value to avoid silently missing tasks beyond 100.
### Python
```python
import labtasker
# List by status
response = labtasker.ls_tasks(status="failed")
tasks = response.content # List[Task]
# With filter (Python query string)
response = labtasker.ls_tasks(
extra_filter='args.lr > 0.01 and "v1" in list(metadata.tags)',
limit=100,
)
# Filter by summary field (same dot-notation)
response = labtasker.ls_tasks(
status="success",
extra_filter='summary.acc > 0.9',
limit=1000,
)
# ls_tasks() has NO sort parameter — sort results manually in Python
tasks_sorted = sorted(
response.content,
key=lambda t: t.summary.get("acc") or 0,
reverse=True,
)
# Access task fields
for task in response.content:
print(task.task_id, task.status, task.args, task.metadata, task.summary)
print(task.created_at, task.worker_id, task.retries)
```
---
## 4. Task Summary (Write and Read)
`task.summary` is a dict stored per-task. Jobs write metrics into it; you read it back after tasks complete.
### Writing summary inside the loop
Use `labtasker.report_task_status()` to persist metrics. **This call is optional** — if you simply return from the function without calling it, the loop auto-marks the task `success` with no summary. Call it when you want to record metrics.
```python
import labtasker
from labtasker import Required
@labtasker.loop()
def main(lr: float = Required(), model: str = Required()):
acc, loss = train(lr=lr, model=model)
# Optional: write metrics into the task summary; status="running" keeps the task alive
# Loop auto-marks "success" on normal return (after this call completes)
labtasker.report_task_status(
task_id=labtasker.task_info().task_id,
status="running", # loop marks success on normal return
summary={"acc": acc, "loss": loss},
)
```
To cancel a task mid-run (e.g., on NaN loss) — use `status="cancelled"` then `return`. The loop preserves the cancelled status and will NOT retry it:
```python
@labtasker.loop()
def main(lr: float = Required(), model: str = Required()):
task = labtasker.task_info()
if task.retries == 0:
data = expensive_preprocess() # only on first attempt
else:
data = load_cached_data() # skip on retries
loss = train_step(data)
if math.isnan(loss):
labtasker.report_task_status(
task_id=task.task_id,
status="cancelled", # preserves cancelled — loop will not overwrite
)
return # stop this task; loop moves to next pending task
```
### Reading summaries after tasks complete
```python
import labtasker
# Fetch all successful tasks and print their summaries
response = labtasker.ls_tasks(status="success", limit=1000)
for task in response.content:
print(
f"task_id={task.task_id} "
f"args={task.args} "
f"summary={task.summary}"
)
# Aggregate: collect metric from summary and rank
results = [
{"lr": t.args["lr"], "model": t.args["model"], "acc": t.summary.get("acc")}
for t in response.content
]
results.sort(key=lambda x: x["acc"] or 0, reverse=True)
for r in results:
print(r)
```
---
## 5. Updating Tasks
### Update semantics
**CLI `-u 'field=value'`** patches individual dot-nested fields: `-u 'args.lr=0.005'` sets only `args.lr`, all other args untouched.
**Python `TaskUpdateRequest`** merges by default: only the keys you include inside a dict field are written; other existing keys inside that dict are preserved. Unspecified top-level optional fields (`args`, `metadata`, `summary`, `priority`, etc.) are not touched. To **replace** a dict field entirely (overwrite, not merge), add it to `replace_fields`: `TaskUpdateRequest(**{"_id": id, "args": {...}, "replace_fields": ["args"]})`.
**`reset_pending=True`** sets `status → pending` AND resets `retries → 0`. Setting `"status": "pending"` in `TaskUpdateRequest` alone changes status but does **not** reset the retry counter. Always prefer `reset_pending=True` when re-queuing failed tasks.
### CLI
```bash
# Update a specific task's args (patches only args.lr; other args unchanged)
labtasker task update --id <task_id> -u 'args.lr=0.005'
# Update multiple fields
labtasker task update --id <task_id> -u 'args.lr=0.005' -u 'metadata.tags=["v2"]'
# Batch update: fix args and reset to pending (also resets retries)
# NOTE: add --limit 10000 to avoid silently missing tasks beyond the default 100
labtasker task ls -f 'status == "failed" and args.model == "resnet"' -q --limit 10000 \
| xargs -I{} labtasker task update --id {} -u 'args.lr=0.005' --reset-pending --quiet
# Alternative: task update supports filter flags directly (no pipe needed)
labtasker task update -s failed -f 'args.model == "resnet"' -u 'args.lr=0.005' --reset-pending --quiet
# Cancel pending tasks (keep records, mark as cancelled)
labtasker task ls -s pending -f 'args.lr > 0.05' -q --limit 10000 \
| xargs -I{} labtasker task update --id {} -u 'status=cancelled' --quiet
# Bump priority on urgent pending tasks
labtasker task ls -f '"urgent" in list(metadata.tags)' -s pending -q --limit 10000 \
| xargs -I{} labtasker task update --id {} -u 'priority=100' --quiet
```
### Python
```python
import labtasker
from labtasker.api_models import TaskUpdateRequest
# Update a single task — only specified keys are merged/updated
labtasker.update_tasks([
TaskUpdateRequest(**{"_id": task_id, "args": {"lr": 0.005}})
])
# Batch: fix lr and reset to pending (also resets retries=0)
response = labtasker.ls_tasks(status="failed")
updates = [
TaskUpdateRequest(**{
"_id": task.task_id,
"args": {"lr": 0.005}, # merges into existing args; other keys preserved
})
for task in response.content
]
if updates:
labtasker.update_tasks(updates, reset_pending=True)
# Cancel pending tasks matching a filter
response = labtasker.ls_tasks(
status="pending",
extra_filter='args.lr > 0.05',
)
updates = [
TaskUpdateRequest(**{"_id": task.task_id, "status": "cancelled"})
for task in response.content
]
if updates:
labtasker.update_tasks(updates)
# Update metadata from inside a loop (merges new field without losing existing ones)
@labtasker.loop()
def main(lr: float = Required(), model: str = Required()):
train(lr=lr, model=model)
task = labtasker.task_info()
labtasker.update_tasks([
TaskUpdateRequest(**{"_id": task.task_id, "metadata": {"run_note": "done"}})
]) # merges "run_note" into existing metadata; other metadata keys preserved
```
---
## 6. Deleting Tasks
### CLI
```bash
# Delete by id (positional argument)
labtasker task delete <task_id>
# Batch delete via pipe: -q prints only ids, -y skips confirmation
# NOTE: add --limit 10000 to avoid silently missing tasks beyond the default 100
labtasker task ls -f 'created_at < date("10 minutes ago")' -q --limit 10000 | labtasker task delete -y
# Delete all failed tasks
labtasker task ls -s failed -q --limit 10000 | labtasker task delete -y
# Delete by task name pattern
labtasker task ls -f 'regex(task_name, "^debug-.*")' -q --limit 10000 | labtasker task delete -y
```
### Python
```python
import labtasker
# Delete a single task
labtasker.delete_task(task_id="<task_id>")
# Batch delete matching a filter
response = labtasker.ls_tasks(
extra_filter='created_at < date("10 minutes ago")'
)
for task in response.content:
labtasker.delete_task(task_id=task.task_id)
```
---
## Quick Reference
| Operation | CLI | Python |
|-----------|-----|--------|
| Submit | `labtasker task submit -- --k=v` | `labtasker.submit_task(args={...}, metadata={...})` |
| Submit with tags | `--metadata '{"tags": ["v1"]}'` | `metadata={"tags": ["v1"]}` |
| Run loop | `labtasker loop -- cmd '%(arg)'` | `@labtasker.loop()` + `Required()` |
| Run with filter | `--extra-filter 'filter'` | `@labtasker.loop(extra_filter='...')` |
| Parallel workers | `CUDA_VISIBLE_DEVICES=N labtasker loop ... &` | run multiple processes |
| List by status | `labtasker task ls -s failed` | `labtasker.ls_tasks(status="failed")` |
| List with filter | `labtasker task ls -f 'expr'` | `labtasker.ls_tasks(extra_filter='expr')` |
| Sort | `labtasker task ls -S created_at:desc` | `sorted(response.content, key=lambda t: ...)` |
| Write summary | — | `labtasker.report_task_status(task_id=..., status="running", summary={...})` |
| Update | `labtasker task update --id X -u 'args.k=v'` | `labtasker.update_tasks([TaskUpdateRequest(...)])` |
| Cancel | `... -q \| xargs ... -u 'status=cancelled'` | `TaskUpdateRequest(**{"_id": id, "status": "cancelled"})` |
| Retry failed | `--reset-pending` | `update_tasks(updates, reset_pending=True)` |
| Delete | `labtasker task ls -s failed -q \| labtasker task delete -y` | `labtasker.delete_task(task_id=...)` |
| Current task | — | `labtasker.task_info()` |
---
## API Signatures
For queue and worker management, see the [full documentation](https://luocfprime.github.io/labtasker/latest/).
### CLI
```
labtasker task submit
[ARGS...] # task args as CLI flags after -- e.g. -- --lr=0.01 --model=vit
[--args JSON_STR] # alternative: pass args as JSON dict string
[--name NAME] # task name for identification
[--metadata JSON_STR] # e.g. '{"tags": ["v1"], "note": "baseline"}'
[--max-retries INT] # retry attempts on failure (default: 3)
[--priority INT] # higher = higher priority (default: 0)
labtasker loop
[-- CMD ARGS] # command with %(key) placeholders; %(key) = top-level args only
[--script-path FILE] # path to script file with %(key) placeholders (for multi-line logic)
[-f / --extra-filter EXPR] # Python query string to filter which tasks to run
# For nested dict args, use the Python @labtasker.loop() instead of CLI loop
labtasker task ls
[--id TASK_ID] # filter by task ID
[--name TASK_NAME] # filter by task name
[-s / --status STATUS] # pending|running|success|failed|cancelled
[-f / --extra-filter EXPR] # Python query string (supports args.field, metadata.field, summary.field)
[-q / --quiet] # print task IDs only (for piping)
[--no-pager] # disable pager output
[--limit INT] # max results (default: 100)
[-S / --sort FIELD:asc|desc] # e.g. -S created_at:desc or -S summary.acc:desc
labtasker task update
[--id TASK_ID] # filter by task ID
[--name TASK_NAME] # filter by task name
[-s / --status STATUS] # filter by status
[-f / --extra-filter EXPR] # filter by Python query
[-u / --update 'field=value'] # patch a field (repeatable); dot-notation for nested: -u args.lr=0.01
[-- field=value ...] # positional update syntax (alternative to -u)
[--reset-pending] # set status=pending AND reset retries=0
[-q / --quiet] # skip confirmations (for scripts/pipes)
labtasker task delete
[TASK_IDS...] # task IDs to delete (positional, or piped from stdin)
[-y / --yes] # skip confirmation prompt
```
### Python
```python
labtasker.submit_task(
task_name: str = None,
args: Dict = None, # task args — keys must match Required() params exactly
metadata: Dict = None, # e.g. {"tags": ["v1"], "note": "baseline"}
max_retries: int = 3,
priority: int = 0, # higher = higher priority
) -> TaskSubmitResponse # .task_id: str
@labtasker.loop(
extra_filter: str = None, # Python query string
required_fields: List[str] = None, # explicit field list (auto-inferred from Required() if unset)
pass_args_dict: bool = False, # pass args as dict instead of keyword args
)
def main(
param: Type = Required(), # param name must match args key; type-cast automatically
param: Annotated[T, Required(resolver=fn)] = ..., # resolver receives args[param_name]
param=Required(resolver=fn), # shorthand form without Annotated
):
...
# Loop exits automatically when no more pending tasks match.
labtasker.task_info() -> Task # available inside @labtasker.loop() only
# .task_id .task_name .args .metadata .summary
# .status .retries .priority .max_retries
# .created_at .last_modified .start_time .worker_id
labtasker.report_task_status(
task_id: str,
status: str, # "running"|"success"|"failed"|"cancelled"
summary: Dict = None, # merges into existing summary; None = no change
) -> None
# Inside loop: status="running" writes metrics (loop auto-marks success/failed on return/exception,
# UNLESS you already called this with "cancelled" — that terminal status is preserved).
# status="cancelled" inside loop: cancels task, will not be retried. Return immediately after.
# Outside loop (post-processing): call with the task's current status to update summary only,
# e.g. report_task_status(task_id=t.task_id, status="success", summary={"rank": 1})
labtasker.ls_tasks(
task_id: str = None,
task_name: str = None,
status: str = None, # pending|running|success|failed|cancelled
extra_filter: str = None, # Python query string; dot-notation on any field
limit: int = 100, # WARNING: default 100 — set limit=10000 for batch ops to avoid silent truncation
) -> TaskLsResponse # .content: List[Task]
# NOTE: no sort parameter — sort in Python: sorted(response.content, key=lambda t: ...)
# NOTE: summary is always a dict (never None) — t.summary.get("key") is always safe
# Task fields (on objects from ls_tasks and task_info):
# task_id, task_name, status, args, metadata, summary
# priority, max_retries, retries
# created_at, last_modified, start_time, worker_id
labtasker.update_tasks(
task_updates: List[TaskUpdateRequest],
reset_pending: bool = False, # sets status=pending AND resets retries=0
) -> TaskLsResponse
# Default: MERGE semantics — only keys you include in args/metadata/summary are changed;
# other existing keys inside those dicts are preserved.
TaskUpdateRequest(**{
"_id": str, # task_id — use "_id" alias key (or keyword: task_id=...)
"status": str, # optional — "cancelled", "pending", etc.
"args": Dict, # optional — merged by default; add to replace_fields to overwrite entirely
"metadata": Dict, # optional — merged by default
"summary": Dict, # optional — merged by default; use to update summary from outside loop
"priority": int, # optional
"max_retries": int, # optional
"replace_fields": List[str], # optional — fields to REPLACE entirely instead of merge
# e.g. replace_fields=["args"] replaces whole args dict (not merge)
})
labtasker.delete_task(task_id: str) -> None
# To force-cancel a running task from OUTSIDE the loop (e.g., hung job detection):
labtasker.update_tasks([
TaskUpdateRequest(**{"_id": task_id, "status": "cancelled"})
])
```
### Filter Query Syntax
```
# Comparison
field == value field > value field >= value
field < value field <= value
# Membership (array field)
"val" in list(field)
# Logic
expr and expr expr or expr
# Functions
date("3 hours ago") # datetime: "X ago" means X before now
regex(field, "^pattern.*") # regex match on string field
# Dot-notation works on any field:
args.lr > 0.01
metadata.group == "A"
summary.acc > 0.9
created_at >= date("7 days ago")
# NOT supported: != not not in
# Workaround for status negation: use -s STATUS flag (CLI) or status= param (Python)
# Workaround for OR over statuses: use status == "pending" or status == "failed"
```
> **Event system (SSE)**: For real-time task lifecycle events and workflow automation,
> see [Advanced Features](https://luocfprime.github.io/labtasker/latest/guide/advanced/).
---
## Workflow
1. Identify the serial loop in the user's script; choose CLI or Python based on their code style.
2. **Submit script**: iterate parameter grid, call `submit_task`/`labtasker task submit` with matching `args` and any `metadata` tags needed.
3. **Run script**: declare exactly the same keys as `Required()` params or `%(key)` placeholders — no more, no less.
4. For parallel runs: each worker independently executes the run script; all pull from the same queue.
5. Env vars must go outside the `labtasker loop` command (CLI).
6. To retry failures: use `update_tasks(updates, reset_pending=True)` (resets status AND retries); or `--reset-pending` in CLI.
7. To review results: `ls_tasks(status="success")` and inspect `.summary` and `.args` on each task.