Workflow Snapshots and Orchestrators¶
Overview¶
FlowDrop provides a unified Workflow Snapshot system that enables:
- Workflow Resume: Pause and continue workflow execution from any point
- Orchestrator Handover: Transfer execution between different orchestrators (sync → async)
- State Inspection: Debug and audit workflow execution history
- Persistence: Durable storage of execution state across requests
This document explains how the snapshot system integrates with FlowDrop's orchestrators.
Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ WorkflowSnapshot (DTO) │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ executionId: "exec_abc123" │ │
│ │ workflowId: "my_workflow" │ │
│ │ workflowVersion: "a1b2c3d4e5f6" │ │
│ │ status: "completed" | "running" | "paused" | "failed" │ │
│ │ nodeStates: { nodeId → NodeSnapshot } │ │
│ │ initialInput: { ... } │ │
│ │ messages: [ ... ] (for chat/agent workflows) │ │
│ │ metadata: { ... } │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ NodeSnapshot (per node): │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ nodeId: "node_1" │ │ nodeId: "node_2" │ │ nodeId: "node_3" │ │
│ │ status: completed│ │ status: completed│ │ status: pending │ │
│ │ output: {...} │ │ output: {...} │ │ output: null │ │
│ │ injected: false │ │ injected: true │ │ injected: false │ │
│ │ executionOrder: 1│ │ executionOrder: 2│ │ executionOrder: 0│ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Key Concepts¶
1. WorkflowSnapshot¶
A complete representation of workflow execution state at a point in time.
use Drupal\flowdrop_orchestration\DTO\Snapshot\WorkflowSnapshot;
// Create a new snapshot
$snapshot = WorkflowSnapshot::create(
workflowId: 'my_workflow',
workflowVersion: 'abc123',
initialInput: ['user_id' => 42],
);
// Access state
$status = $snapshot->status; // 'pending', 'running', 'completed', 'failed'
$nodeState = $snapshot->getNodeState('node_1');
$completedNodes = $snapshot->getCompletedNodes();
2. NodeSnapshot¶
Represents the state of a single node within a workflow.
use Drupal\flowdrop_orchestration\DTO\Snapshot\NodeSnapshot;
$nodeSnapshot = new NodeSnapshot(
nodeId: 'process_data',
status: 'completed',
output: ['result' => 'processed'],
error: null,
injected: false, // TRUE if state was pre-loaded, not executed
executionOrder: 3, // Order in which this node was executed
metadata: [],
);
// Check node status
$nodeSnapshot->isCompleted(); // true
$nodeSnapshot->isPending(); // false
$nodeSnapshot->isFailed(); // false
3. The injected Flag¶
The injected flag is crucial for understanding execution history:
injected |
Meaning |
|---|---|
false |
Node was actually executed by the orchestrator |
true |
Node state was pre-loaded (from handover, resume, or external source) |
This enables: - Audit trails: Distinguish between executed and pre-loaded state - Side-effect safety: Know which nodes actually ran - Debugging: Understand the execution path
Orchestrators¶
FlowDrop provides three orchestrators, all supporting the snapshot system:
1. Synchronous Orchestrator¶
Executes workflows in a single request. Best for: - Short-running workflows - Real-time responses - Development/testing
use Drupal\flowdrop_runtime\DTO\Orchestrator\OrchestrationRequest;
$request = new OrchestrationRequest(
workflowId: 'my_workflow',
pipelineId: '',
workflow: $workflowDefinition,
initialData: ['input' => 'value'],
options: ['orchestrator' => 'flowdrop_runtime:synchronous'],
);
$response = $syncOrchestrator->orchestrate($request);
// Get the generated snapshot from response metadata
$snapshotData = $response->getMetadata()['snapshot'];
$snapshot = WorkflowSnapshot::fromArray($snapshotData);
2. Asynchronous Orchestrator¶
Uses Drupal's queue system for background processing. Best for: - Long-running workflows - High-volume processing - Fault tolerance
$request = new OrchestrationRequest(
workflowId: 'my_workflow',
pipelineId: '',
workflow: $workflowDefinition,
initialData: ['input' => 'value'],
options: ['orchestrator' => 'flowdrop_runtime:asynchronous'],
initialSnapshot: $existingSnapshot, // Optional: for resume/handover
);
$response = $asyncOrchestrator->orchestrate($request);
// Jobs are created and queued for processing
3. StateGraph Orchestrator¶
Stateful graph execution with checkpointing. Best for: - Chat/agent workflows - Conditional routing with loops - Human-in-the-loop workflows
$request = new OrchestrationRequest(
workflowId: 'agent_workflow',
pipelineId: '',
workflow: $workflowDefinition,
initialData: ['messages' => [...]],
options: [
'orchestrator' => 'stategraph',
'stategraph' => [
'threadId' => 'conversation_123',
'maxIterations' => 10,
'createCheckpoints' => true,
],
],
);
Orchestrator Handover¶
The snapshot system enables seamless transfer of execution between orchestrators.
Use Case: Sync to Async Handover¶
Execute initial nodes synchronously for fast response, then hand over to async for remaining work.
// Step 1: Execute first part synchronously
$syncRequest = new OrchestrationRequest(
workflowId: 'my_workflow',
pipelineId: '',
workflow: $workflowDefinition,
initialData: ['input' => 'value'],
options: ['orchestrator' => 'flowdrop_runtime:synchronous'],
);
$syncResponse = $syncOrchestrator->orchestrate($syncRequest);
// Step 2: Extract snapshot from response
$snapshotData = $syncResponse->getMetadata()['snapshot'];
$snapshot = WorkflowSnapshot::fromArray($snapshotData);
// At this point, snapshot contains:
// - node_1: completed (output: {...})
// - node_2: completed (output: {...})
// - node_3: pending
// - node_4: pending
// Step 3: Hand over to async orchestrator
$asyncRequest = new OrchestrationRequest(
workflowId: 'my_workflow',
pipelineId: '',
workflow: $workflowDefinition,
initialData: [], // Already in snapshot
options: ['orchestrator' => 'flowdrop_runtime:asynchronous'],
initialSnapshot: $snapshot, // <-- Pre-loaded state
);
$asyncResponse = $asyncOrchestrator->orchestrate($asyncRequest);
// Jobs created:
// - node_1: status=completed, is_injected=true (pre-loaded)
// - node_2: status=completed, is_injected=true (pre-loaded)
// - node_3: status=pending (will execute)
// - node_4: status=pending (will execute)
Sequence Diagram¶
┌─────────┐ ┌──────────────┐ ┌───────────────┐ ┌───────────────┐
│ Trigger │ │ Sync │ │ Async │ │ Queue │
│ │ │ Orchestrator │ │ Orchestrator │ │ Worker │
└────┬────┘ └──────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │ │
│ orchestrate() │ │ │
│────────────────>│ │ │
│ │ │ │
│ │ Execute nodes 1-2 │ │
│ │──────────────────> │ │
│ │ │ │
│ │ generateSnapshot() │ │
│ │──────────────────> │ │
│ │ │ │
│<────────────────│ Response + Snapshot │ │
│ │ │ │
│ │ │ │
│ orchestrate(snapshot) │ │
│──────────────────────────────────────>│ │
│ │ │ │
│ │ │ Create jobs │
│ │ │ (1-2 as injected) │
│ │ │────────────────────>│
│ │ │ │
│<──────────────────────────────────────│ Pipeline ID │
│ │ │ │
│ │ │ Execute 3-4 │
│ │ │<────────────────────│
Workflow State Manager¶
The WorkflowStateManager service provides the API for managing snapshots.
Creating and Managing Snapshots¶
use Drupal\flowdrop_runtime\Service\WorkflowState\WorkflowStateManagerInterface;
/** @var WorkflowStateManagerInterface $stateManager */
$stateManager = \Drupal::service('flowdrop_runtime.workflow_state_manager');
// Create a new snapshot
$snapshot = $stateManager->createSnapshot(
workflowId: 'my_workflow',
workflowVersion: $stateManager->computeWorkflowVersion($workflowDefinition),
initialInput: ['key' => 'value'],
threadId: 'conversation_123', // Optional
);
// Save to in-memory cache
$snapshot = $stateManager->save($snapshot);
// Update node states
$snapshot = $stateManager->setNodeRunning($snapshot->executionId, 'node_1');
$snapshot = $stateManager->setNodeCompleted(
$snapshot->executionId,
'node_1',
['result' => 'success'],
);
// Mark multiple nodes as completed (for handover)
$snapshot = $stateManager->markNodesCompleted(
$snapshot->executionId,
[
'node_1' => ['output' => 'data1'],
'node_2' => ['output' => 'data2'],
],
);
// Retrieve a snapshot
$snapshot = $stateManager->getSnapshot('exec_abc123');
// Validate snapshot against workflow definition
$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);
if (!$validation->isValid()) {
foreach ($validation->getErrors() as $error) {
\Drupal::logger('my_module')->error($error->message);
}
}
Persistent Storage¶
For durable storage across requests, use the entity storage service:
use Drupal\flowdrop_runtime\Service\WorkflowState\EntitySnapshotStorage;
/** @var EntitySnapshotStorage $storage */
$storage = \Drupal::service('flowdrop_runtime.entity_snapshot_storage');
// Save to database
$entity = $storage->save($snapshot);
// Load from database
$snapshot = $storage->load('exec_abc123');
// List by workflow
$snapshots = $storage->listByWorkflow('my_workflow', limit: 50);
// List by thread (for conversations)
$snapshots = $storage->listByThread('conversation_123');
// Cleanup old snapshots
$deleted = $storage->cleanup(
olderThan: time() - 604800, // 7 days
statuses: ['completed', 'failed'],
);
Pipeline ↔ Snapshot Linking¶
For pipeline-based orchestrators, the FlowDropPipeline entity includes a snapshot_id field that links to the associated WorkflowSnapshot entity:
use Drupal\flowdrop_pipeline\Entity\FlowDropPipelineInterface;
/** @var FlowDropPipelineInterface $pipeline */
// Get the linked snapshot ID
$snapshotId = $pipeline->getSnapshotId();
// Get the linked snapshot entity
$snapshotEntity = $pipeline->getSnapshot();
// Set a snapshot link (done automatically by orchestrators)
$pipeline->setSnapshotId($snapshotEntity->id());
$pipeline->save();
This linking provides:
| Benefit | Description |
|---|---|
| Unified tracking | Access detailed state from Pipeline admin UI |
| Historical data | Pipelines retain reference to execution state |
| Debugging | View node-level details for any pipeline execution |
Note: SynchronousPipelineOrchestrator automatically links the snapshot after execution. AsynchronousOrchestrator inherits this through delegation.
Validation¶
The snapshot system validates state consistency:
Version Validation¶
Detects workflow definition changes that could cause issues:
$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);
// Check for version mismatch
if ($validation->hasError(ValidationResult::ERROR_VERSION_MISMATCH)) {
// Workflow has changed since snapshot was created
}
Dependency Validation¶
Ensures completed nodes have all dependencies satisfied:
$validation = $stateManager->validateDependencies($snapshot, $workflowDefinition);
if ($validation->hasError(ValidationResult::ERROR_DEPENDENCY_NOT_MET)) {
// A node is marked complete but its dependencies aren't
}
Node Existence Validation¶
Verifies all nodes in snapshot exist in workflow:
if ($validation->hasError(ValidationResult::ERROR_UNKNOWN_NODE)) {
// Snapshot contains nodes that don't exist in current workflow
}
REST API¶
Get Snapshot¶
GET /api/flowdrop-runtime/snapshot/{execution_id}
Authorization: Bearer {token}
Response:
{
"success": true,
"snapshot": {
"executionId": "exec_abc123",
"workflowId": "my_workflow",
"status": "completed",
"nodeStates": {
"node_1": {
"nodeId": "node_1",
"status": "completed",
"output": {"result": "data"},
"injected": false,
"executionOrder": 1
}
}
}
}
Save Snapshot¶
POST /api/flowdrop-runtime/snapshot
Content-Type: application/json
Authorization: Bearer {token}
{
"workflowId": "my_workflow",
"workflowVersion": "abc123",
"status": "running",
"nodeStates": {...},
"initialInput": {...}
}
List Snapshots¶
GET /api/flowdrop-runtime/snapshots?workflow_id=my_workflow&limit=50
Authorization: Bearer {token}
Delete Snapshot¶
DELETE /api/flowdrop-runtime/snapshot/{execution_id}
Authorization: Bearer {token}
Admin UI¶
Access the snapshot management UI at /admin/flowdrop/snapshots:
- List View: See all snapshots with status, workflow, node counts
- Detail View: Inspect individual snapshot state, node outputs, messages
- Cleanup Form: Delete old snapshots by age and status
Configuration¶
Cron Cleanup¶
Automatic cleanup runs during Drupal cron:
// In settings.php or config
$config['flowdrop_runtime.settings']['snapshot_cleanup'] = [
'enabled' => TRUE,
'max_age' => 604800, // 7 days in seconds
'statuses' => ['completed', 'failed'],
'batch_size' => 100,
];
Permissions¶
| Permission | Description |
|---|---|
administer flowdrop_workflow_snapshot |
Full admin access |
view flowdrop_workflow_snapshot |
View snapshots |
edit flowdrop_workflow_snapshot |
Edit metadata |
delete flowdrop_workflow_snapshot |
Delete snapshots |
Best Practices¶
1. Use Snapshots for Long Workflows¶
For workflows that might be interrupted:
// Save snapshot periodically
$snapshot = $stateManager->save($snapshot);
// Or persist to database for durability
$storage->save($snapshot);
2. Validate Before Resume¶
Always validate snapshots before resuming:
$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);
if (!$validation->isValid()) {
throw new \Exception('Cannot resume: ' . $validation->getErrors()[0]->message);
}
3. Use Thread IDs for Conversations¶
For chat/agent workflows, use consistent thread IDs:
$snapshot = $stateManager->createSnapshot(
workflowId: 'chat_agent',
workflowVersion: '...',
initialInput: $input,
threadId: $conversationId, // e.g., user session ID
);
4. Clean Up Old Snapshots¶
Configure automatic cleanup or run manually:
// Cleanup snapshots older than 30 days
$storage->cleanup(time() - 2592000, ['completed', 'failed']);
5. Monitor Injected Nodes¶
Track injected nodes for audit purposes:
foreach ($snapshot->nodeStates as $nodeId => $nodeState) {
if ($nodeState->injected) {
\Drupal::logger('audit')->info('Node @node was injected, not executed', [
'@node' => $nodeId,
]);
}
}
Troubleshooting¶
Snapshot Not Found¶
Workflow snapshot not found: exec_abc123
- Check if snapshot was saved to persistent storage
- In-memory snapshots are lost after request ends
- Use
EntitySnapshotStorage::save()for persistence
Version Mismatch¶
Workflow version mismatch. Expected: abc123, got: def456
- Workflow definition changed since snapshot was created
- Either update the snapshot or create a new execution
Dependency Not Met¶
Node 'node_3' is completed but dependency 'node_2' is pending
- Invalid snapshot state (possibly from manual manipulation)
- Re-execute the workflow or fix the dependency chain
Table Not Found¶
Table 'flowdrop_workflow_snapshot' doesn't exist
Run:
drush ev "\Drupal::entityDefinitionUpdateManager()->installEntityType(\Drupal::entityTypeManager()->getDefinition('flowdrop_workflow_snapshot'));"
drush cr
Snapshot Support Matrix¶
| Component | Generates Snapshot | Accepts Snapshot | Status |
|---|---|---|---|
| SynchronousOrchestrator | ✅ Yes | ✅ Yes | Full support |
| SynchronousPipelineOrchestrator | ✅ Yes | ⚠️ Via jobs | Full support |
| AsynchronousOrchestrator | ✅ Via sync pipeline | ✅ Via initializeJobsFromSnapshot |
Full support |
| StateGraphOrchestrator | ✅ Yes | ✅ Yes | Full support |
| TriggerManager | N/A (receives from orchestrator) | ❌ Not yet | Fresh executions only |
| PlaygroundService | N/A (receives from orchestrator) | ❌ Not yet | Fresh executions only |
| WorkflowExecutionForm | N/A (receives from orchestrator) | ❌ Not yet | Fresh executions only |
Entry Points Analysis¶
| Entry Point | Module | How it Triggers Orchestration | Snapshot Handling |
|---|---|---|---|
| Entity triggers | flowdrop_trigger |
TriggerManager::executeTrigger() → orchestrate() |
Creates fresh OrchestrationRequest, gets snapshot in response |
| Form triggers | flowdrop_trigger |
TriggerManager::executeFormTrigger() → orchestrate() |
Creates fresh OrchestrationRequest, gets snapshot in response |
| Playground | flowdrop_playground |
PlaygroundService::executeWorkflow() → orchestrate() |
Creates fresh OrchestrationRequest, gets snapshot in response |
| Manual execution | flowdrop_runtime |
WorkflowExecutionForm → executePipeline() |
Direct pipeline execution, gets snapshot in response |
| Queue worker | flowdrop_runtime |
PipelineExecutionWorker → executePipeline() |
Inherits snapshot from sync pipeline execution |
| StateGraph test | flowdrop_stategraph |
StateGraphTestForm → orchestrate() |
Creates fresh OrchestrationRequest, gets snapshot in response |
Implementation Notes¶
All orchestrator plugins now delegate to their corresponding service from the Drupal container instead of instantiating them directly. This ensures:
- Proper dependency injection: Services like
WorkflowStateManagerare properly injected - Snapshot generation: All orchestrators generate snapshots after execution
- Consistent configuration: Services are configured once in
*.services.yml
The fix was applied to these plugins:
- SynchronousOrchestratorPlugin → uses @flowdrop_runtime.synchronous_orchestrator
- SynchronousPipelineOrchestratorPlugin → uses @flowdrop_runtime.synchronous_pipeline_orchestrator
- AsynchronousOrchestratorPlugin → uses @flowdrop_runtime.asynchronous_orchestrator
- StateGraphOrchestratorPlugin → uses @flowdrop_stategraph.orchestrator
Future Improvements¶
-
Store Snapshots from Triggers: The
TriggerManagercould optionally store the returned snapshot for resume capabilities. -
Resume from Trigger: Add ability to resume a triggered workflow from a stored snapshot.
-
Playground Conversation Memory: Use snapshots to maintain state across playground interactions.