Skip to content

Workflow Snapshots and Orchestrators

Overview

FlowDrop provides a unified Workflow Snapshot system that enables:

  • Workflow Resume: Pause and continue workflow execution from any point
  • Orchestrator Handover: Transfer execution between different orchestrators (sync → async)
  • State Inspection: Debug and audit workflow execution history
  • Persistence: Durable storage of execution state across requests

This document explains how the snapshot system integrates with FlowDrop's orchestrators.


Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         WorkflowSnapshot (DTO)                          │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  executionId: "exec_abc123"                                      │  │
│  │  workflowId: "my_workflow"                                       │  │
│  │  workflowVersion: "a1b2c3d4e5f6"                                 │  │
│  │  status: "completed" | "running" | "paused" | "failed"           │  │
│  │  nodeStates: { nodeId → NodeSnapshot }                           │  │
│  │  initialInput: { ... }                                           │  │
│  │  messages: [ ... ]  (for chat/agent workflows)                   │  │
│  │  metadata: { ... }                                               │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                                                         │
│  NodeSnapshot (per node):                                               │
│  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐     │
│  │ nodeId: "node_1" │  │ nodeId: "node_2" │  │ nodeId: "node_3" │     │
│  │ status: completed│  │ status: completed│  │ status: pending  │     │
│  │ output: {...}    │  │ output: {...}    │  │ output: null     │     │
│  │ injected: false  │  │ injected: true   │  │ injected: false  │     │
│  │ executionOrder: 1│  │ executionOrder: 2│  │ executionOrder: 0│     │
│  └──────────────────┘  └──────────────────┘  └──────────────────┘     │
└─────────────────────────────────────────────────────────────────────────┘

Key Concepts

1. WorkflowSnapshot

A complete representation of workflow execution state at a point in time.

use Drupal\flowdrop_orchestration\DTO\Snapshot\WorkflowSnapshot;

// Create a new snapshot
$snapshot = WorkflowSnapshot::create(
  workflowId: 'my_workflow',
  workflowVersion: 'abc123',
  initialInput: ['user_id' => 42],
);

// Access state
$status = $snapshot->status;           // 'pending', 'running', 'completed', 'failed'
$nodeState = $snapshot->getNodeState('node_1');
$completedNodes = $snapshot->getCompletedNodes();

2. NodeSnapshot

Represents the state of a single node within a workflow.

use Drupal\flowdrop_orchestration\DTO\Snapshot\NodeSnapshot;

$nodeSnapshot = new NodeSnapshot(
  nodeId: 'process_data',
  status: 'completed',
  output: ['result' => 'processed'],
  error: null,
  injected: false,        // TRUE if state was pre-loaded, not executed
  executionOrder: 3,      // Order in which this node was executed
  metadata: [],
);

// Check node status
$nodeSnapshot->isCompleted();  // true
$nodeSnapshot->isPending();    // false
$nodeSnapshot->isFailed();     // false

3. The injected Flag

The injected flag is crucial for understanding execution history:

injected Meaning
false Node was actually executed by the orchestrator
true Node state was pre-loaded (from handover, resume, or external source)

This enables: - Audit trails: Distinguish between executed and pre-loaded state - Side-effect safety: Know which nodes actually ran - Debugging: Understand the execution path


Orchestrators

FlowDrop provides three orchestrators, all supporting the snapshot system:

1. Synchronous Orchestrator

Executes workflows in a single request. Best for: - Short-running workflows - Real-time responses - Development/testing

use Drupal\flowdrop_runtime\DTO\Orchestrator\OrchestrationRequest;

$request = new OrchestrationRequest(
  workflowId: 'my_workflow',
  pipelineId: '',
  workflow: $workflowDefinition,
  initialData: ['input' => 'value'],
  options: ['orchestrator' => 'flowdrop_runtime:synchronous'],
);

$response = $syncOrchestrator->orchestrate($request);

// Get the generated snapshot from response metadata
$snapshotData = $response->getMetadata()['snapshot'];
$snapshot = WorkflowSnapshot::fromArray($snapshotData);

2. Asynchronous Orchestrator

Uses Drupal's queue system for background processing. Best for: - Long-running workflows - High-volume processing - Fault tolerance

$request = new OrchestrationRequest(
  workflowId: 'my_workflow',
  pipelineId: '',
  workflow: $workflowDefinition,
  initialData: ['input' => 'value'],
  options: ['orchestrator' => 'flowdrop_runtime:asynchronous'],
  initialSnapshot: $existingSnapshot,  // Optional: for resume/handover
);

$response = $asyncOrchestrator->orchestrate($request);
// Jobs are created and queued for processing

3. StateGraph Orchestrator

Stateful graph execution with checkpointing. Best for: - Chat/agent workflows - Conditional routing with loops - Human-in-the-loop workflows

$request = new OrchestrationRequest(
  workflowId: 'agent_workflow',
  pipelineId: '',
  workflow: $workflowDefinition,
  initialData: ['messages' => [...]],
  options: [
    'orchestrator' => 'stategraph',
    'stategraph' => [
      'threadId' => 'conversation_123',
      'maxIterations' => 10,
      'createCheckpoints' => true,
    ],
  ],
);

Orchestrator Handover

The snapshot system enables seamless transfer of execution between orchestrators.

Use Case: Sync to Async Handover

Execute initial nodes synchronously for fast response, then hand over to async for remaining work.

// Step 1: Execute first part synchronously
$syncRequest = new OrchestrationRequest(
  workflowId: 'my_workflow',
  pipelineId: '',
  workflow: $workflowDefinition,
  initialData: ['input' => 'value'],
  options: ['orchestrator' => 'flowdrop_runtime:synchronous'],
);

$syncResponse = $syncOrchestrator->orchestrate($syncRequest);

// Step 2: Extract snapshot from response
$snapshotData = $syncResponse->getMetadata()['snapshot'];
$snapshot = WorkflowSnapshot::fromArray($snapshotData);

// At this point, snapshot contains:
// - node_1: completed (output: {...})
// - node_2: completed (output: {...})
// - node_3: pending
// - node_4: pending

// Step 3: Hand over to async orchestrator
$asyncRequest = new OrchestrationRequest(
  workflowId: 'my_workflow',
  pipelineId: '',
  workflow: $workflowDefinition,
  initialData: [],  // Already in snapshot
  options: ['orchestrator' => 'flowdrop_runtime:asynchronous'],
  initialSnapshot: $snapshot,  // <-- Pre-loaded state
);

$asyncResponse = $asyncOrchestrator->orchestrate($asyncRequest);
// Jobs created:
// - node_1: status=completed, is_injected=true (pre-loaded)
// - node_2: status=completed, is_injected=true (pre-loaded)
// - node_3: status=pending (will execute)
// - node_4: status=pending (will execute)

Sequence Diagram

┌─────────┐     ┌──────────────┐     ┌───────────────┐     ┌───────────────┐
│ Trigger │     │    Sync      │     │    Async      │     │    Queue      │
│         │     │ Orchestrator │     │ Orchestrator  │     │    Worker     │
└────┬────┘     └──────┬───────┘     └───────┬───────┘     └───────┬───────┘
     │                 │                     │                     │
     │ orchestrate()   │                     │                     │
     │────────────────>│                     │                     │
     │                 │                     │                     │
     │                 │ Execute nodes 1-2   │                     │
     │                 │──────────────────>  │                     │
     │                 │                     │                     │
     │                 │ generateSnapshot()  │                     │
     │                 │──────────────────>  │                     │
     │                 │                     │                     │
     │<────────────────│ Response + Snapshot │                     │
     │                 │                     │                     │
     │                 │                     │                     │
     │ orchestrate(snapshot)                 │                     │
     │──────────────────────────────────────>│                     │
     │                 │                     │                     │
     │                 │                     │ Create jobs         │
     │                 │                     │ (1-2 as injected)   │
     │                 │                     │────────────────────>│
     │                 │                     │                     │
     │<──────────────────────────────────────│ Pipeline ID         │
     │                 │                     │                     │
     │                 │                     │     Execute 3-4     │
     │                 │                     │<────────────────────│

Workflow State Manager

The WorkflowStateManager service provides the API for managing snapshots.

Creating and Managing Snapshots

use Drupal\flowdrop_runtime\Service\WorkflowState\WorkflowStateManagerInterface;

/** @var WorkflowStateManagerInterface $stateManager */
$stateManager = \Drupal::service('flowdrop_runtime.workflow_state_manager');

// Create a new snapshot
$snapshot = $stateManager->createSnapshot(
  workflowId: 'my_workflow',
  workflowVersion: $stateManager->computeWorkflowVersion($workflowDefinition),
  initialInput: ['key' => 'value'],
  threadId: 'conversation_123',  // Optional
);

// Save to in-memory cache
$snapshot = $stateManager->save($snapshot);

// Update node states
$snapshot = $stateManager->setNodeRunning($snapshot->executionId, 'node_1');
$snapshot = $stateManager->setNodeCompleted(
  $snapshot->executionId,
  'node_1',
  ['result' => 'success'],
);

// Mark multiple nodes as completed (for handover)
$snapshot = $stateManager->markNodesCompleted(
  $snapshot->executionId,
  [
    'node_1' => ['output' => 'data1'],
    'node_2' => ['output' => 'data2'],
  ],
);

// Retrieve a snapshot
$snapshot = $stateManager->getSnapshot('exec_abc123');

// Validate snapshot against workflow definition
$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);
if (!$validation->isValid()) {
  foreach ($validation->getErrors() as $error) {
    \Drupal::logger('my_module')->error($error->message);
  }
}

Persistent Storage

For durable storage across requests, use the entity storage service:

use Drupal\flowdrop_runtime\Service\WorkflowState\EntitySnapshotStorage;

/** @var EntitySnapshotStorage $storage */
$storage = \Drupal::service('flowdrop_runtime.entity_snapshot_storage');

// Save to database
$entity = $storage->save($snapshot);

// Load from database
$snapshot = $storage->load('exec_abc123');

// List by workflow
$snapshots = $storage->listByWorkflow('my_workflow', limit: 50);

// List by thread (for conversations)
$snapshots = $storage->listByThread('conversation_123');

// Cleanup old snapshots
$deleted = $storage->cleanup(
  olderThan: time() - 604800,  // 7 days
  statuses: ['completed', 'failed'],
);

Pipeline ↔ Snapshot Linking

For pipeline-based orchestrators, the FlowDropPipeline entity includes a snapshot_id field that links to the associated WorkflowSnapshot entity:

use Drupal\flowdrop_pipeline\Entity\FlowDropPipelineInterface;

/** @var FlowDropPipelineInterface $pipeline */

// Get the linked snapshot ID
$snapshotId = $pipeline->getSnapshotId();

// Get the linked snapshot entity
$snapshotEntity = $pipeline->getSnapshot();

// Set a snapshot link (done automatically by orchestrators)
$pipeline->setSnapshotId($snapshotEntity->id());
$pipeline->save();

This linking provides:

Benefit Description
Unified tracking Access detailed state from Pipeline admin UI
Historical data Pipelines retain reference to execution state
Debugging View node-level details for any pipeline execution

Note: SynchronousPipelineOrchestrator automatically links the snapshot after execution. AsynchronousOrchestrator inherits this through delegation.


Validation

The snapshot system validates state consistency:

Version Validation

Detects workflow definition changes that could cause issues:

$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);

// Check for version mismatch
if ($validation->hasError(ValidationResult::ERROR_VERSION_MISMATCH)) {
  // Workflow has changed since snapshot was created
}

Dependency Validation

Ensures completed nodes have all dependencies satisfied:

$validation = $stateManager->validateDependencies($snapshot, $workflowDefinition);

if ($validation->hasError(ValidationResult::ERROR_DEPENDENCY_NOT_MET)) {
  // A node is marked complete but its dependencies aren't
}

Node Existence Validation

Verifies all nodes in snapshot exist in workflow:

if ($validation->hasError(ValidationResult::ERROR_UNKNOWN_NODE)) {
  // Snapshot contains nodes that don't exist in current workflow
}

REST API

Get Snapshot

GET /api/flowdrop-runtime/snapshot/{execution_id}
Authorization: Bearer {token}

Response:

{
  "success": true,
  "snapshot": {
    "executionId": "exec_abc123",
    "workflowId": "my_workflow",
    "status": "completed",
    "nodeStates": {
      "node_1": {
        "nodeId": "node_1",
        "status": "completed",
        "output": {"result": "data"},
        "injected": false,
        "executionOrder": 1
      }
    }
  }
}

Save Snapshot

POST /api/flowdrop-runtime/snapshot
Content-Type: application/json
Authorization: Bearer {token}

{
  "workflowId": "my_workflow",
  "workflowVersion": "abc123",
  "status": "running",
  "nodeStates": {...},
  "initialInput": {...}
}

List Snapshots

GET /api/flowdrop-runtime/snapshots?workflow_id=my_workflow&limit=50
Authorization: Bearer {token}

Delete Snapshot

DELETE /api/flowdrop-runtime/snapshot/{execution_id}
Authorization: Bearer {token}

Admin UI

Access the snapshot management UI at /admin/flowdrop/snapshots:

  • List View: See all snapshots with status, workflow, node counts
  • Detail View: Inspect individual snapshot state, node outputs, messages
  • Cleanup Form: Delete old snapshots by age and status

Configuration

Cron Cleanup

Automatic cleanup runs during Drupal cron:

// In settings.php or config
$config['flowdrop_runtime.settings']['snapshot_cleanup'] = [
  'enabled' => TRUE,
  'max_age' => 604800,        // 7 days in seconds
  'statuses' => ['completed', 'failed'],
  'batch_size' => 100,
];

Permissions

Permission Description
administer flowdrop_workflow_snapshot Full admin access
view flowdrop_workflow_snapshot View snapshots
edit flowdrop_workflow_snapshot Edit metadata
delete flowdrop_workflow_snapshot Delete snapshots

Best Practices

1. Use Snapshots for Long Workflows

For workflows that might be interrupted:

// Save snapshot periodically
$snapshot = $stateManager->save($snapshot);

// Or persist to database for durability
$storage->save($snapshot);

2. Validate Before Resume

Always validate snapshots before resuming:

$validation = $stateManager->validateSnapshot($snapshot, $workflowDefinition);
if (!$validation->isValid()) {
  throw new \Exception('Cannot resume: ' . $validation->getErrors()[0]->message);
}

3. Use Thread IDs for Conversations

For chat/agent workflows, use consistent thread IDs:

$snapshot = $stateManager->createSnapshot(
  workflowId: 'chat_agent',
  workflowVersion: '...',
  initialInput: $input,
  threadId: $conversationId,  // e.g., user session ID
);

4. Clean Up Old Snapshots

Configure automatic cleanup or run manually:

// Cleanup snapshots older than 30 days
$storage->cleanup(time() - 2592000, ['completed', 'failed']);

5. Monitor Injected Nodes

Track injected nodes for audit purposes:

foreach ($snapshot->nodeStates as $nodeId => $nodeState) {
  if ($nodeState->injected) {
    \Drupal::logger('audit')->info('Node @node was injected, not executed', [
      '@node' => $nodeId,
    ]);
  }
}

Troubleshooting

Snapshot Not Found

Workflow snapshot not found: exec_abc123
  • Check if snapshot was saved to persistent storage
  • In-memory snapshots are lost after request ends
  • Use EntitySnapshotStorage::save() for persistence

Version Mismatch

Workflow version mismatch. Expected: abc123, got: def456
  • Workflow definition changed since snapshot was created
  • Either update the snapshot or create a new execution

Dependency Not Met

Node 'node_3' is completed but dependency 'node_2' is pending
  • Invalid snapshot state (possibly from manual manipulation)
  • Re-execute the workflow or fix the dependency chain

Table Not Found

Table 'flowdrop_workflow_snapshot' doesn't exist

Run:

drush ev "\Drupal::entityDefinitionUpdateManager()->installEntityType(\Drupal::entityTypeManager()->getDefinition('flowdrop_workflow_snapshot'));"
drush cr



Snapshot Support Matrix

Component Generates Snapshot Accepts Snapshot Status
SynchronousOrchestrator ✅ Yes ✅ Yes Full support
SynchronousPipelineOrchestrator ✅ Yes ⚠️ Via jobs Full support
AsynchronousOrchestrator ✅ Via sync pipeline ✅ Via initializeJobsFromSnapshot Full support
StateGraphOrchestrator ✅ Yes ✅ Yes Full support
TriggerManager N/A (receives from orchestrator) ❌ Not yet Fresh executions only
PlaygroundService N/A (receives from orchestrator) ❌ Not yet Fresh executions only
WorkflowExecutionForm N/A (receives from orchestrator) ❌ Not yet Fresh executions only

Entry Points Analysis

Entry Point Module How it Triggers Orchestration Snapshot Handling
Entity triggers flowdrop_trigger TriggerManager::executeTrigger()orchestrate() Creates fresh OrchestrationRequest, gets snapshot in response
Form triggers flowdrop_trigger TriggerManager::executeFormTrigger()orchestrate() Creates fresh OrchestrationRequest, gets snapshot in response
Playground flowdrop_playground PlaygroundService::executeWorkflow()orchestrate() Creates fresh OrchestrationRequest, gets snapshot in response
Manual execution flowdrop_runtime WorkflowExecutionFormexecutePipeline() Direct pipeline execution, gets snapshot in response
Queue worker flowdrop_runtime PipelineExecutionWorkerexecutePipeline() Inherits snapshot from sync pipeline execution
StateGraph test flowdrop_stategraph StateGraphTestFormorchestrate() Creates fresh OrchestrationRequest, gets snapshot in response

Implementation Notes

All orchestrator plugins now delegate to their corresponding service from the Drupal container instead of instantiating them directly. This ensures:

  1. Proper dependency injection: Services like WorkflowStateManager are properly injected
  2. Snapshot generation: All orchestrators generate snapshots after execution
  3. Consistent configuration: Services are configured once in *.services.yml

The fix was applied to these plugins: - SynchronousOrchestratorPlugin → uses @flowdrop_runtime.synchronous_orchestrator - SynchronousPipelineOrchestratorPlugin → uses @flowdrop_runtime.synchronous_pipeline_orchestrator - AsynchronousOrchestratorPlugin → uses @flowdrop_runtime.asynchronous_orchestrator - StateGraphOrchestratorPlugin → uses @flowdrop_stategraph.orchestrator

Future Improvements

  1. Store Snapshots from Triggers: The TriggerManager could optionally store the returned snapshot for resume capabilities.

  2. Resume from Trigger: Add ability to resume a triggered workflow from a stored snapshot.

  3. Playground Conversation Memory: Use snapshots to maintain state across playground interactions.