Skip to content

Advanced Features

Plugins

CLI plugins

CLI plugins are particularly useful if you want to pack up your workflow and share it with others.

Demo plugin

There is a demo plugin at /PROJECT_ROOT/plugins/labtasker_plugin_task_count.

It creates a new custom command labtasker task count, which shows how many tasks are at each state.

To install officially bundled plugins:

pip install 'labtasker[plugins]'
pip install 'labtasker[plugins] @ git+https://github.com/luocfprime/labtasker.git'

To install other plugins, simply install it like a regular Python package.

pip install labtasker-plugin-task-count

Note

Behind the hood, it uses Typer command registry and setuptools entry points to implement custom CLI commands.

To write your own CLI plugin, see Setuptools Doc and Typer Doc for details.

Workflow plugins [WIP]

Custom resolvers

Sometimes after we fetched task args from the server, we need to convert it into other types (such as dataclasses) for further processing.

Suppose you have a set of tasks submitted like this:

demo/advanced/custom_resolver/submit_job.py
import labtasker

if __name__ == "__main__":
    for i in range(5):
        print(f"Submitting i={i}")
        labtasker.submit_task(
            args={
                "args_a": {"a": i, "b": "boy"},
                "args_b": {"foo": 2 * i, "bar": "baz"},
            }
        )

You can manually specify the required_fields and convert them into your own dataclass manually:

demo/advanced/custom_resolver/wo.py
import time
from dataclasses import dataclass

import labtasker


@dataclass
class ArgsGroupA:
    a: int
    b: str


@dataclass
class ArgsGroupB:
    foo: int
    bar: str


@labtasker.loop(required_fields=["args_a", "args_b"], pass_args_dict=True)
def main(args):
    args_a = ArgsGroupA(**args["args_a"])
    args_b = ArgsGroupB(**args["args_b"])
    print(f"got args_a: {args_a}")
    print(f"got args_b: {args_b}")
    time.sleep(0.5)


if __name__ == "__main__":
    main()

Now, you can achieve a more elegant solution by using a custom resolver:

demo/advanced/custom_resolver/w.py
import time
from dataclasses import dataclass
from typing import Any, Dict

from typing_extensions import Annotated

import labtasker
from labtasker import Required


@dataclass
class ArgsGroupA:
    a: int
    b: str


@dataclass
class ArgsGroupB:
    foo: int
    bar: str


@labtasker.loop()
def main(
    # use type annotation/default values to automatically resolve the required_fields
    # use the self-defined resolver to convert the task args into custom types
    args_a: Annotated[
        Dict[str, Any], Required(resolver=lambda a: ArgsGroupA(**a))
    ],  # option1. use Annotated
    args_b=Required(resolver=lambda b: ArgsGroupB(**b)),  # option2. use default kwarg
):
    print(f"got args_a: {args_a}")
    print(f"got args_b: {args_b}")
    time.sleep(0.5)


if __name__ == "__main__":
    main()

Event system

Labtasker implements a simple event notification system based on Server Sent Events (SSE). This is particularly useful for real-time notifications, workflows, and other use cases.

Demo: labtasker event listen

We use the labtasker event listen command as a demo.

It will listen to the FSM state transition events from the server and print them out.

labtasker event listen

Demo: email notification on task failure

Using the event listener, it is very easy to implement a simple email notification system on various events.

For example, you can listen for pending -> failed state transition events and send notification email.

demo/advanced/event_system/email_on_task_failure.py
import subprocess

import typer

from labtasker import connect_events

app = typer.Typer()


def send(
    recipient: str,
    subject: str,
    content: str,
):
    try:
        mail_cmd = ["mail", "-s", subject, recipient]

        process = subprocess.run(
            mail_cmd,
            input=content.encode(),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        if process.returncode == 0:
            print("Email sent successfully!")
        else:
            error_message = process.stderr.decode().strip()
            print(f"Error sending email: {error_message}")
            raise typer.Exit(process.returncode)
    except Exception as e:
        print(f"Error sending email: {str(e)}")
        raise typer.Exit(1)


@app.command()
def email_on_task_failure(
    recipient: str = typer.Option(
        ...,
        envvar="RECIPIENT",
        help="Recipient email address (defaults to RECIPIENT environment variable)",
    ),
):
    """
    Send an email if received task failed event.

    The recipient defaults to the RECIPIENT environment variable if not specified.
    """
    listener = connect_events()
    print(f"Connected. Client listener ID: {listener.get_client_id()}")
    for event_resp in listener.iter_events():
        if not event_resp.event.type == "state_transition":
            continue

        fsm_event = event_resp.event
        if fsm_event.old_state == "running" and fsm_event.new_state == "failed":
            # running -> failed
            print(f"Task {fsm_event.entity_id} failed. Attempt to send email...")
            send(
                recipient=recipient,
                subject="Task failed",
                content=f"Task {fsm_event.entity_id} failed.",
            )


if __name__ == "__main__":
    app()

Below is a recorded demo running a simulated unstable job with 50% chance of crashing.

demo/advanced/event_system/sim_unstable_job.py
"""
Simulate running an unstable job.
"""

import random
import time

import labtasker
from labtasker import Required


def job(arg1: int, arg2: int):
    """Simulate a long-running job"""
    time.sleep(1.5)  # simulate a long-running job
    if random.uniform(0, 1) < 0.5:  # simulate unstable events
        raise Exception("Random exception")
    return arg1 + arg2


@labtasker.loop()
def main(arg1: int = Required(), arg2: int = Required()):
    result = job(arg1, arg2)
    print(f"The result is {result}")


if __name__ == "__main__":
    main()

The notification script successfully captures the event and sends email.

Email notification on task failure