Task Progress Tracking
Emit progress updates and step definitions from Celery tasks.
Kanchi can display progress bars and step lists for long-running tasks. Emit custom progress events through your Celery broker and Kanchi will store them, stream them to the UI, and expose them via the API.
Progress events are optional and do not change task state. Kanchi still relies on Celery lifecycle events for task status.
Event types
Kanchi listens for two custom events:
kanchi-task-steps(optional) defines the steps a task will run.kanchi-task-progressreports progress between 0 and 100.
{
"type": "kanchi-task-steps",
"task_id": "c2a6f1d4-0f4d-4e0d-8c3e-5a9f6d1b2c3d",
"task_name": "myapp.tasks.import_data",
"steps": [
{"key": "download", "label": "Download file", "order": 1},
{"key": "process", "label": "Process data", "order": 2, "description": "Validate rows"}
],
"timestamp": 1735342199.456
}{
"type": "kanchi-task-progress",
"task_id": "c2a6f1d4-0f4d-4e0d-8c3e-5a9f6d1b2c3d",
"task_name": "myapp.tasks.import_data",
"progress": 42.5,
"step_key": "download",
"message": "Downloading file",
"meta": {"bytes": 1048576},
"timestamp": 1735342200.123
}Notes:
stepsrequire akeyandlabel. Optional fields aredescription,total, andorder.step_keyshould match one of the step keys if you want step highlighting.metamust be JSON-serializable.- Celery will attach the
typefield automatically when events are emitted.
Use the Kanchi SDK
kanchi-sdk is a lightweight helper for emitting the same events.
pip install kanchi-sdkClass-based tasks (mixin)
from celery import Celery
from kanchi_sdk import KanchiTaskMixin, StepDef
app = Celery("myapp")
class ImportTask(KanchiTaskMixin, app.Task):
name = "myapp.tasks.import_data"
def run(self, file_id: str) -> None:
steps: list[StepDef] = [
{"key": "download", "label": "Download"},
{"key": "process", "label": "Process"},
{"key": "finalize", "label": "Finalize"},
]
self.define_kanchi_steps(steps)
self.send_kanchi_progress(0, step_key="download", message="Starting download")
# ...
self.send_kanchi_progress(60, step_key="process", message="Processing")
# ...
self.send_kanchi_progress(100, step_key="finalize", message="Done")Function tasks
from kanchi_sdk import define_kanchi_steps, send_kanchi_progress
@app.task(bind=True)
def import_data(self, file_id: str):
define_kanchi_steps(
[
{"key": "download", "label": "Download"},
{"key": "process", "label": "Process"},
],
task_id=self.request.id,
task_name=self.name,
)
send_kanchi_progress(
25,
step_key="download",
message="Fetching",
task_id=self.request.id,
task_name=self.name,
)Set KANCHI_SDK_VERBOSE=1 to log debug output from the SDK during development.
Without the SDK
If you prefer not to install the SDK, emit the same events manually with Celery's event dispatcher:
import time
from celery import current_app, current_task
def send_progress(progress, *, step_key=None, message=None, meta=None):
if not current_task or not current_task.request:
return
payload = {
"task_id": current_task.request.id,
"task_name": current_task.name,
"progress": float(progress),
"timestamp": time.time(),
}
if step_key:
payload["step_key"] = step_key
if message:
payload["message"] = message
if meta:
payload["meta"] = meta
with current_app.connection_for_write() as conn:
dispatcher = current_app.events.Dispatcher(conn)
dispatcher.send("kanchi-task-progress", **payload)
def define_steps(steps):
if not current_task or not current_task.request:
return
payload = {
"task_id": current_task.request.id,
"task_name": current_task.name,
"steps": steps,
"timestamp": time.time(),
}
with current_app.connection_for_write() as conn:
dispatcher = current_app.events.Dispatcher(conn)
dispatcher.send("kanchi-task-steps", **payload)The manual approach gives you the same results as the SDK. Keep the payloads JSON-serializable so they can travel through your broker.
API access
Kanchi exposes the latest snapshot and recent history at:
GET /api/tasks/{task_id}/progressThe response includes latest, steps, and history so you can build custom UI
or analytics on top of progress updates.