Advanced Features¶
Plugins¶
CLI plugins¶
CLI plugins are particularly useful if you want to pack up your workflow and share it with others.
Demo plugin
There is a demo plugin at /PROJECT_ROOT/plugins/labtasker_plugin_task_count.
It creates a new custom command labtasker task count, which shows how many tasks are at each state.
To install officially bundled plugins:
To install other plugins, simply install it like a regular Python package.
Note
Behind the hood, it uses Typer command registry and setuptools entry points to implement custom CLI commands.
To write your own CLI plugin, see Setuptools Doc and Typer Doc for details.
There is a cookiecutter plugin template here.
Workflow plugins [WIP]¶
Custom resolvers¶
Sometimes after we fetched task args from the server, we need to convert it into other types (such as dataclasses) for further processing.
Suppose you have a set of tasks submitted like this:
import labtasker
if __name__ == "__main__":
    for i in range(5):
        print(f"Submitting i={i}")
        labtasker.submit_task(
            args={
                "args_a": {"a": i, "b": "boy"},
                "args_b": {"foo": 2 * i, "bar": "baz"},
            }
        )
You can manually specify the required_fields and convert them into your own dataclass manually:
import time
from dataclasses import dataclass
import labtasker
@dataclass
class ArgsGroupA:
    a: int
    b: str
@dataclass
class ArgsGroupB:
    foo: int
    bar: str
@labtasker.loop(required_fields=["args_a", "args_b"], pass_args_dict=True)
def main(args):
    args_a = ArgsGroupA(**args["args_a"])
    args_b = ArgsGroupB(**args["args_b"])
    print(f"got args_a: {args_a}")
    print(f"got args_b: {args_b}")
    time.sleep(0.5)
if __name__ == "__main__":
    main()
Now, you can achieve a more elegant solution by using a custom resolver:
import time
from dataclasses import dataclass
from typing import Any, Dict
from typing_extensions import Annotated
import labtasker
from labtasker import Required
@dataclass
class ArgsGroupA:
    a: int
    b: str
@dataclass
class ArgsGroupB:
    foo: int
    bar: str
@labtasker.loop()
def main(
    # use type annotation/default values to automatically resolve the required_fields
    # use the self-defined resolver to convert the task args into custom types
    args_a: Annotated[
        Dict[str, Any], Required(resolver=lambda a: ArgsGroupA(**a))
    ],  # option1. use Annotated
    args_b=Required(resolver=lambda b: ArgsGroupB(**b)),  # option2. use default kwarg
):
    print(f"got args_a: {args_a}")
    print(f"got args_b: {args_b}")
    time.sleep(0.5)
if __name__ == "__main__":
    main()
Event system¶
Labtasker implements a simple event notification system based on Server Sent Events (SSE). This is particularly useful for real-time notifications, workflows, and other use cases.
Demo: labtasker event listen¶
We use the labtasker event listen command as a demo.
It will listen to the FSM state transition events from the server and print them out.
labtasker event listen
Demo: email notification on task failure¶
Using the event listener, it is very easy to implement a simple email notification system on various events.
For example, you can listen for pending -> failed state transition events and
send notification email.
import subprocess
import typer
from labtasker import connect_events
app = typer.Typer()
def send(
    recipient: str,
    subject: str,
    content: str,
):
    try:
        mail_cmd = ["mail", "-s", subject, recipient]
        process = subprocess.run(
            mail_cmd,
            input=content.encode(),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        if process.returncode == 0:
            print("Email sent successfully!")
        else:
            error_message = process.stderr.decode().strip()
            print(f"Error sending email: {error_message}")
            raise typer.Exit(process.returncode)
    except Exception as e:
        print(f"Error sending email: {str(e)}")
        raise typer.Exit(1)
@app.command()
def email_on_task_failure(
    recipient: str = typer.Option(
        ...,
        envvar="RECIPIENT",
        help="Recipient email address (defaults to RECIPIENT environment variable)",
    ),
):
    """
    Send an email if received task failed event.
    The recipient defaults to the RECIPIENT environment variable if not specified.
    """
    listener = connect_events()
    print(f"Connected. Client listener ID: {listener.get_client_id()}")
    for event_resp in listener.iter_events():
        if not event_resp.event.type == "state_transition":
            continue
        fsm_event = event_resp.event
        if fsm_event.old_state == "running" and fsm_event.new_state == "failed":
            # running -> failed
            print(f"Task {fsm_event.entity_id} failed. Attempt to send email...")
            send(
                recipient=recipient,
                subject="Task failed",
                content=f"Task {fsm_event.entity_id} failed.",
            )
if __name__ == "__main__":
    app()
Below is a recorded demo running a simulated unstable job with 50% chance of crashing.
"""
Simulate running an unstable job.
"""
import random
import time
import labtasker
from labtasker import Required
def job(arg1: int, arg2: int):
    """Simulate a long-running job"""
    time.sleep(1.5)  # simulate a long-running job
    if random.uniform(0, 1) < 0.5:  # simulate unstable events
        raise Exception("Random exception")
    return arg1 + arg2
@labtasker.loop()
def main(arg1: int = Required(), arg2: int = Required()):
    result = job(arg1, arg2)
    print(f"The result is {result}")
if __name__ == "__main__":
    main()
The notification script successfully captures the event and sends email.
Email notification on task failure