Workflow Events Tutorial¶
This tutorial shows how to configure the workflow engine in settings.py, trigger workflow events from manager updates, and route events using human-readable names and predicates.
1. Configure the workflow engine¶
Add workflow settings in settings.py:
GENERAL_MANAGER = {
"WORKFLOW_MODE": "production",
"WORKFLOW_ENGINE": {
"class": "general_manager.workflow.backends.celery.CeleryWorkflowEngine",
"options": {},
},
"WORKFLOW_EVENT_REGISTRY": {
"class": "general_manager.workflow.event_registry.DatabaseEventRegistry",
"options": {},
},
"WORKFLOW_SIGNAL_BRIDGE": True,
"WORKFLOW_ASYNC": True,
"WORKFLOW_OUTBOX_BATCH_SIZE": 100,
"WORKFLOW_OUTBOX_PROCESS_CHUNK_SIZE": 50,
"WORKFLOW_BEAT_ENABLED": True,
"WORKFLOW_BEAT_OUTBOX_INTERVAL_SECONDS": 5,
"WORKFLOW_BEAT_MAX_JITTER_SECONDS": 2,
"WORKFLOW_DELIVERY_RUNNING_TIMEOUT_SECONDS": 300,
"WORKFLOW_MAX_RETRIES": 3,
"WORKFLOW_DEAD_LETTER_ENABLED": True,
}
WORKFLOW_ENGINEselects the orchestration backend.WORKFLOW_MODEcontrols defaults (localvsproduction).WORKFLOW_EVENT_REGISTRYselects in-memory vs durable DB routing.WORKFLOW_SIGNAL_BRIDGE=Trueenables automatic event creation from manager mutation signals.WORKFLOW_BEAT_*controls periodic outbox draining via Celery Beat.WORKFLOW_OUTBOX_PROCESS_CHUNK_SIZEcontrols per-task claimed outbox batch processing size.WORKFLOW_DELIVERY_RUNNING_TIMEOUT_SECONDSdefines stale-running takeover threshold for delivery attempts.
Important async contract: - in WORKFLOW_ASYNC=True, workflow handlers must be importable top-level callables. - nested/local handlers are marked failed with an explicit error instead of executing inline.
Execution state contract: - resume(...) is only valid for executions in waiting. - cancel(...) is only valid for active executions in pending, running, or waiting. - completed, failed, and cancelled executions are treated as terminal and are not overwritten by later resume/cancel requests.
For local zero-setup mode, use:
GENERAL_MANAGER = {
"WORKFLOW_MODE": "local",
"WORKFLOW_SIGNAL_BRIDGE": True,
}
2. Register a workflow trigger with a readable event name¶
You can register against: - canonical type (for example general_manager.manager.updated), or - human-readable event name (for example manager_updated).
Use a when filter to route only relevant updates:
from general_manager.workflow.event_registry import InMemoryEventRegistry
event_registry = InMemoryEventRegistry()
event_registry.register(
"manager_updated",
handler=start_project_status_workflow,
when=lambda event: (
event.payload.get("manager") == "Project"
and event.payload.get("changes", {}).get("status", {}).get("new") == "active"
),
)
3. Use typed event constructors¶
GeneralManager workflow helpers provide compact, readable event creation:
from general_manager.workflow.events import manager_updated_event
event = manager_updated_event(
manager="Project",
identification={"id": 42},
changes={"status": "active"},
old_values={"status": "draft"},
event_name="project_status_changed",
)
The resulting payload stores per-field diffs:
{
"changes": {
"status": {"old": "draft", "new": "active"}
}
}
Available helpers: - manager_created_event(...) - manager_updated_event(...) - manager_deleted_event(...)
4. Start a workflow and call an action¶
Inside your handler, start a workflow and execute an action:
from general_manager.workflow.backend_registry import get_workflow_engine
from general_manager.workflow.engine import WorkflowDefinition
def project_status_workflow_handler(input_data):
# Replace with your action registry call, e.g. send_email
return {"sent": True, "to": "ops@example.test"}
def start_project_status_workflow(event):
workflow = WorkflowDefinition(
workflow_id="project_status_email",
handler=project_status_workflow_handler,
)
engine = get_workflow_engine()
engine.start(
workflow,
input_data={
"event_id": event.event_id,
"project_id": event.payload["identification"]["id"],
"old_status": event.payload["changes"]["status"]["old"],
"new_status": event.payload["changes"]["status"]["new"],
},
correlation_id=event.event_id,
)
Correlation behavior: - correlation_id is a durable deduplication key per workflow_id. - starting the same workflow with the same correlation_id reuses the existing execution record instead of creating a second one while the original execution is still active or already completed. - after a failed execution, the same correlation_id can start a fresh execution again; failed attempts are not treated as the durable winning record.
5. Trigger without signals (explicit publish)¶
If you do not want signal-based triggering, publish events directly from your service layer, mutation, or view:
from general_manager.workflow.event_registry import get_event_registry
from general_manager.workflow.events import manager_updated_event
event = manager_updated_event(
manager="Project",
identification=project.identification,
changes={"status": "active"},
old_values={"status": "draft"},
event_name="project_status_changed",
)
get_event_registry().publish(event)
This keeps trigger logic explicit while using the same routing and workflow engine contracts.
6. Handle trigger failures with retries and dead-letter hooks¶
InMemoryEventRegistry.register(...) supports per-handler retries and dead-letter callbacks:
dead_letters: list[tuple[str, str]] = []
event_registry.register(
"manager_updated",
handler=start_project_status_workflow,
when=lambda event: event.payload.get("manager") == "Project",
retries=2,
dead_letter_handler=lambda event, exc: dead_letters.append(
(event.event_id, str(exc))
),
)
Behavior: - handler exceptions are isolated and do not crash event publishing for other handlers. - failed handlers are retried up to retries. - after final failure, the dead-letter hook receives (event, exception).
Registration behavior: - identical registrations for the same event are deduplicated at registration time. - "identical" means the event key and all routing-relevant options resolve to the same registration identity. - different when, validator, retry, or dead-letter settings still create separate registrations.