Kanchi LogoKanchi
Core

Task Progress Tracking

Emit progress updates and step definitions from Celery tasks.

Kanchi can display progress bars and step lists for long-running tasks. Emit custom progress events through your Celery broker and Kanchi will store them, stream them to the UI, and expose them via the API.

Progress events are optional and do not change task state. Kanchi still relies on Celery lifecycle events for task status.

Event types

Kanchi listens for two custom events:

  • kanchi-task-steps (optional) defines the steps a task will run.
  • kanchi-task-progress reports progress between 0 and 100.
{
  "type": "kanchi-task-steps",
  "task_id": "c2a6f1d4-0f4d-4e0d-8c3e-5a9f6d1b2c3d",
  "task_name": "myapp.tasks.import_data",
  "steps": [
    {"key": "download", "label": "Download file", "order": 1},
    {"key": "process", "label": "Process data", "order": 2, "description": "Validate rows"}
  ],
  "timestamp": 1735342199.456
}
{
  "type": "kanchi-task-progress",
  "task_id": "c2a6f1d4-0f4d-4e0d-8c3e-5a9f6d1b2c3d",
  "task_name": "myapp.tasks.import_data",
  "progress": 42.5,
  "step_key": "download",
  "message": "Downloading file",
  "meta": {"bytes": 1048576},
  "timestamp": 1735342200.123
}

Notes:

  • steps require a key and label. Optional fields are description, total, and order.
  • step_key should match one of the step keys if you want step highlighting.
  • meta must be JSON-serializable.
  • Celery will attach the type field automatically when events are emitted.

Use the Kanchi SDK

kanchi-sdk is a lightweight helper for emitting the same events.

pip install kanchi-sdk

Class-based tasks (mixin)

from celery import Celery
from kanchi_sdk import KanchiTaskMixin, StepDef

app = Celery("myapp")

class ImportTask(KanchiTaskMixin, app.Task):
    name = "myapp.tasks.import_data"

    def run(self, file_id: str) -> None:
        steps: list[StepDef] = [
            {"key": "download", "label": "Download"},
            {"key": "process", "label": "Process"},
            {"key": "finalize", "label": "Finalize"},
        ]
        self.define_kanchi_steps(steps)

        self.send_kanchi_progress(0, step_key="download", message="Starting download")
        # ...
        self.send_kanchi_progress(60, step_key="process", message="Processing")
        # ...
        self.send_kanchi_progress(100, step_key="finalize", message="Done")

Function tasks

from kanchi_sdk import define_kanchi_steps, send_kanchi_progress

@app.task(bind=True)
def import_data(self, file_id: str):
    define_kanchi_steps(
        [
            {"key": "download", "label": "Download"},
            {"key": "process", "label": "Process"},
        ],
        task_id=self.request.id,
        task_name=self.name,
    )

    send_kanchi_progress(
        25,
        step_key="download",
        message="Fetching",
        task_id=self.request.id,
        task_name=self.name,
    )

Set KANCHI_SDK_VERBOSE=1 to log debug output from the SDK during development.

Without the SDK

If you prefer not to install the SDK, emit the same events manually with Celery's event dispatcher:

import time
from celery import current_app, current_task


def send_progress(progress, *, step_key=None, message=None, meta=None):
    if not current_task or not current_task.request:
        return
    payload = {
        "task_id": current_task.request.id,
        "task_name": current_task.name,
        "progress": float(progress),
        "timestamp": time.time(),
    }
    if step_key:
        payload["step_key"] = step_key
    if message:
        payload["message"] = message
    if meta:
        payload["meta"] = meta

    with current_app.connection_for_write() as conn:
        dispatcher = current_app.events.Dispatcher(conn)
        dispatcher.send("kanchi-task-progress", **payload)


def define_steps(steps):
    if not current_task or not current_task.request:
        return
    payload = {
        "task_id": current_task.request.id,
        "task_name": current_task.name,
        "steps": steps,
        "timestamp": time.time(),
    }
    with current_app.connection_for_write() as conn:
        dispatcher = current_app.events.Dispatcher(conn)
        dispatcher.send("kanchi-task-steps", **payload)

The manual approach gives you the same results as the SDK. Keep the payloads JSON-serializable so they can travel through your broker.

API access

Kanchi exposes the latest snapshot and recent history at:

GET /api/tasks/{task_id}/progress

The response includes latest, steps, and history so you can build custom UI or analytics on top of progress updates.