FlowDrop Runtime¶
Execution engine for FlowDrop workflows with synchronous and asynchronous modes, real-time monitoring, snapshots, and JSONPath data extraction.
Overview¶
The flowdrop_runtime module is the primary execution engine for FlowDrop workflows. It provides three orchestrators (synchronous, synchronous-pipeline, and asynchronous), a node runtime service for executing individual nodes, a workflow compiler for validation and optimization, and real-time monitoring capabilities.
The module also manages workflow snapshots — serialized execution state that enables orchestrator handover, resumption after interrupts, and audit trails. A built-in JSONPath service (RFC 9535 compliant) handles data extraction and mapping between nodes.
Dependencies¶
- flowdrop
- flowdrop_orchestration (orchestrator plugin system)
- flowdrop_interrupt (human-in-loop support)
- flowdrop_workflow (workflow definitions)
- flowdrop_pipeline (execution pipelines)
- flowdrop_job (job entities)
- flowdrop_node_type (node type definitions)
Configuration¶
Admin Pages¶
| Path | Description |
|---|---|
/admin/flowdrop/execute |
Execution overview |
/admin/flowdrop/execute/workflow |
Execute a workflow form |
/admin/flowdrop/execute/pipeline |
Execute a pipeline form |
/admin/flowdrop/execute/interact |
Pipeline interaction form |
/admin/flowdrop/snapshots/cleanup |
Cleanup old snapshots |
/admin/flowdrop/snapshots/{id}/view |
View a specific snapshot |
Permissions¶
| Permission | Description |
|---|---|
administer flowdrop_runtime |
Full access to runtime configuration and execution |
Queue Configuration¶
The module creates two queues for background processing:
| Queue | Description |
|---|---|
flowdrop_runtime_pipeline_execution |
Processes pipeline execution requests |
flowdrop_runtime_job_execution |
Processes individual job execution requests |
Tips and Tricks¶
Choosing an Orchestrator¶
| Orchestrator | Best For |
|---|---|
| Synchronous | Quick workflows where you need immediate results (e.g., form processing, API responses) |
| Synchronous Pipeline | Workflows that need per-node tracking but run within a single request |
| Asynchronous | Long-running workflows, background processing, workflows with many nodes |
JSONPath Data Extraction¶
The runtime includes an RFC 9535 compliant JSONPath service for extracting data from complex structures:
$jsonPath = \Drupal::service('flowdrop_runtime.json_path');
// Extract a single value
$name = $jsonPath->extract($data, '$.users[0].name');
// Extract all matching values
$allNames = $jsonPath->extractAll($data, '$.users[*].name');
// Check if a path exists
if ($jsonPath->exists($data, '$.config.enabled')) {
// ...
}
// Apply multiple mappings at once
$result = $jsonPath->applyMappings($sourceData, [
'user_email' => '$.entity.email',
'all_tags' => '$.entity.tags[*].name',
'static_value' => 'literal:my_value',
]);
Common JSONPath expressions:
| Expression | Description |
|---|---|
$.store.name |
Single value access |
$.users[*].email |
All elements of an array |
$.items[?(@.active)] |
Filter by condition |
$..name |
Recursive descent — all "name" fields |
$.books[-1] |
Last element |
$.items[0:5] |
Array slice |
Processing Queues¶
For asynchronous workflows, process the queues with drush:
drush queue:run flowdrop_runtime_pipeline_execution
drush queue:run flowdrop_runtime_job_execution
Snapshot Cleanup¶
Snapshots accumulate over time. Use the cleanup form at /admin/flowdrop/snapshots/cleanup or schedule periodic cleanup via cron to manage storage.
Pipeline vs WorkflowSnapshot¶
The runtime uses two complementary tracking systems:
- Pipeline + Jobs (user-facing) — Content entities for admin UI progress tracking, queue management, retry mechanics. Each node gets its own job entity.
- WorkflowSnapshot (internal) — Single entity with embedded node states for state serialization, orchestrator handover, and debugging. Includes an
injectedflag to track whether node state was actually executed or pre-loaded from a handover.
Developer API¶
All PHP classes in this module are
@internaland not part of the stable public API. They may change without notice in any release. See the BC Policy for details.
Services¶
Core Runtime:
| Service ID | Class | Description |
|---|---|---|
flowdrop_runtime.node_runtime |
NodeRuntime |
Executes individual workflow nodes |
flowdrop_runtime.execution_context |
ExecutionContext |
Manages execution state and data flow |
flowdrop_runtime.workflow_compiler |
WorkflowCompiler |
Validates and optimizes workflow definitions |
Real-Time Monitoring:
| Service ID | Class | Description |
|---|---|---|
flowdrop_runtime.real_time_manager |
RealTimeManager |
Manages real-time event streams |
flowdrop_runtime.status_tracker |
StatusTracker |
Tracks execution and node status |
flowdrop_runtime.event_broadcaster |
EventBroadcaster |
Broadcasts execution events |
Storage:
| Service ID | Class | Description |
|---|---|---|
flowdrop_runtime.storage.drupal_pipeline |
DrupalPipelineStorage |
Entity-based pipeline storage |
flowdrop_runtime.storage.drupal_job |
DrupalJobStorage |
Entity-based job storage |
flowdrop_runtime.storage.inmemory_pipeline |
InMemoryPipelineStorage |
In-memory pipeline storage for testing |
flowdrop_runtime.storage.inmemory_job |
InMemoryJobStorage |
In-memory job storage for testing |
Orchestrators:
| Service ID | Class | Description |
|---|---|---|
flowdrop_runtime.synchronous_orchestrator |
SynchronousOrchestrator |
Direct node execution |
flowdrop_runtime.synchronous_pipeline_orchestrator |
SynchronousPipelineOrchestrator |
Pipeline-based synchronous execution |
flowdrop_runtime.asynchronous_orchestrator |
AsynchronousOrchestrator |
Queue-based background execution |
Utilities:
| Service ID | Class | Description |
|---|---|---|
flowdrop_runtime.json_path |
JsonPathService |
RFC 9535 JSONPath querying |
flowdrop_runtime.parameter_resolver |
ParameterResolver |
Resolves node parameters |
flowdrop_runtime.data_flow_manager |
DataFlowManager |
Manages data flow between nodes |
flowdrop_runtime.workflow_state_manager |
WorkflowStateManager |
Unified state representation |
flowdrop_runtime.entity_snapshot_storage |
EntitySnapshotStorage |
Persistent snapshot storage |
flowdrop_runtime.error_handler |
ErrorHandler |
Error handling and recovery |
flowdrop_runtime.execution_monitor |
ExecutionMonitor |
Execution monitoring |
Entities¶
FlowDropWorkflowSnapshot (Content Entity)¶
Stores serialized execution state for resumption, handover, and audit.
Events¶
| Event | Description |
|---|---|
flowdrop.pipeline.created |
Pipeline created |
flowdrop.pipeline.started |
Pipeline execution started |
flowdrop.pipeline.completed |
Pipeline completed successfully |
flowdrop.pipeline.failed |
Pipeline execution failed |
flowdrop.pipeline.paused |
Pipeline paused (e.g., for interrupt) |
flowdrop.pipeline.cancelled |
Pipeline cancelled |
flowdrop.job.created |
Job created |
flowdrop.job.started |
Job execution started |
flowdrop.job.completed |
Job completed successfully |
flowdrop.job.failed |
Job execution failed |
Key DTOs¶
| DTO | Description |
|---|---|
OrchestrationRequest |
Request object containing workflow, pipeline, and input data |
OrchestrationResponse |
Response with execution results |
NodeExecutionContext |
Context passed to node processors during execution |
NodeExecutionResult |
Result from individual node execution |
ExecutionStatus |
Real-time execution status |
WorkflowSnapshot |
Complete serialized workflow execution state |
NodeSnapshot |
Individual node execution state within a snapshot |
API Endpoints¶
Execution Status:
| Method | Path | Description |
|---|---|---|
| GET | /api/flowdrop-runtime/status/{executionId} |
Get execution status |
| GET | /api/flowdrop-runtime/status/{executionId}/nodes |
Get node statuses |
| GET | /api/flowdrop-runtime/status/{executionId}/details |
Get execution details |
| GET | /api/flowdrop-runtime/health |
Health check |
| GET | /api/flowdrop-runtime/metrics |
System metrics |
Snapshots:
| Method | Path | Description |
|---|---|---|
| GET | /api/flowdrop-runtime/snapshot/{execution_id} |
Get snapshot |
| POST | /api/flowdrop-runtime/snapshot |
Save snapshot |
| DELETE | /api/flowdrop-runtime/snapshot/{execution_id} |
Delete snapshot |
| GET | /api/flowdrop-runtime/snapshots |
List snapshots |
References¶
- flowdrop_orchestration — orchestrator plugin system
- flowdrop_stategraph — alternative stateful graph orchestrator
- flowdrop_pipeline — pipeline entities
- flowdrop_job — job entities
- Workflow Snapshots and Orchestrators
- Execution Dependency Rules