Skip to content

FlowDrop Runtime

Execution engine for FlowDrop workflows with synchronous and asynchronous modes, real-time monitoring, snapshots, and JSONPath data extraction.

Overview

The flowdrop_runtime module is the primary execution engine for FlowDrop workflows. It provides three orchestrators (synchronous, synchronous-pipeline, and asynchronous), a node runtime service for executing individual nodes, a workflow compiler for validation and optimization, and real-time monitoring capabilities.

The module also manages workflow snapshots — serialized execution state that enables orchestrator handover, resumption after interrupts, and audit trails. A built-in JSONPath service (RFC 9535 compliant) handles data extraction and mapping between nodes.

Dependencies

Configuration

Admin Pages

Path Description
/admin/flowdrop/execute Execution overview
/admin/flowdrop/execute/workflow Execute a workflow form
/admin/flowdrop/execute/pipeline Execute a pipeline form
/admin/flowdrop/execute/interact Pipeline interaction form
/admin/flowdrop/snapshots/cleanup Cleanup old snapshots
/admin/flowdrop/snapshots/{id}/view View a specific snapshot

Permissions

Permission Description
administer flowdrop_runtime Full access to runtime configuration and execution

Queue Configuration

The module creates two queues for background processing:

Queue Description
flowdrop_runtime_pipeline_execution Processes pipeline execution requests
flowdrop_runtime_job_execution Processes individual job execution requests

Tips and Tricks

Choosing an Orchestrator

Orchestrator Best For
Synchronous Quick workflows where you need immediate results (e.g., form processing, API responses)
Synchronous Pipeline Workflows that need per-node tracking but run within a single request
Asynchronous Long-running workflows, background processing, workflows with many nodes

JSONPath Data Extraction

The runtime includes an RFC 9535 compliant JSONPath service for extracting data from complex structures:

$jsonPath = \Drupal::service('flowdrop_runtime.json_path');

// Extract a single value
$name = $jsonPath->extract($data, '$.users[0].name');

// Extract all matching values
$allNames = $jsonPath->extractAll($data, '$.users[*].name');

// Check if a path exists
if ($jsonPath->exists($data, '$.config.enabled')) {
  // ...
}

// Apply multiple mappings at once
$result = $jsonPath->applyMappings($sourceData, [
  'user_email' => '$.entity.email',
  'all_tags' => '$.entity.tags[*].name',
  'static_value' => 'literal:my_value',
]);

Common JSONPath expressions:

Expression Description
$.store.name Single value access
$.users[*].email All elements of an array
$.items[?(@.active)] Filter by condition
$..name Recursive descent — all "name" fields
$.books[-1] Last element
$.items[0:5] Array slice

Processing Queues

For asynchronous workflows, process the queues with drush:

drush queue:run flowdrop_runtime_pipeline_execution
drush queue:run flowdrop_runtime_job_execution

Snapshot Cleanup

Snapshots accumulate over time. Use the cleanup form at /admin/flowdrop/snapshots/cleanup or schedule periodic cleanup via cron to manage storage.

Pipeline vs WorkflowSnapshot

The runtime uses two complementary tracking systems:

  • Pipeline + Jobs (user-facing) — Content entities for admin UI progress tracking, queue management, retry mechanics. Each node gets its own job entity.
  • WorkflowSnapshot (internal) — Single entity with embedded node states for state serialization, orchestrator handover, and debugging. Includes an injected flag to track whether node state was actually executed or pre-loaded from a handover.

Developer API

All PHP classes in this module are @internal and not part of the stable public API. They may change without notice in any release. See the BC Policy for details.

Services

Core Runtime:

Service ID Class Description
flowdrop_runtime.node_runtime NodeRuntime Executes individual workflow nodes
flowdrop_runtime.execution_context ExecutionContext Manages execution state and data flow
flowdrop_runtime.workflow_compiler WorkflowCompiler Validates and optimizes workflow definitions

Real-Time Monitoring:

Service ID Class Description
flowdrop_runtime.real_time_manager RealTimeManager Manages real-time event streams
flowdrop_runtime.status_tracker StatusTracker Tracks execution and node status
flowdrop_runtime.event_broadcaster EventBroadcaster Broadcasts execution events

Storage:

Service ID Class Description
flowdrop_runtime.storage.drupal_pipeline DrupalPipelineStorage Entity-based pipeline storage
flowdrop_runtime.storage.drupal_job DrupalJobStorage Entity-based job storage
flowdrop_runtime.storage.inmemory_pipeline InMemoryPipelineStorage In-memory pipeline storage for testing
flowdrop_runtime.storage.inmemory_job InMemoryJobStorage In-memory job storage for testing

Orchestrators:

Service ID Class Description
flowdrop_runtime.synchronous_orchestrator SynchronousOrchestrator Direct node execution
flowdrop_runtime.synchronous_pipeline_orchestrator SynchronousPipelineOrchestrator Pipeline-based synchronous execution
flowdrop_runtime.asynchronous_orchestrator AsynchronousOrchestrator Queue-based background execution

Utilities:

Service ID Class Description
flowdrop_runtime.json_path JsonPathService RFC 9535 JSONPath querying
flowdrop_runtime.parameter_resolver ParameterResolver Resolves node parameters
flowdrop_runtime.data_flow_manager DataFlowManager Manages data flow between nodes
flowdrop_runtime.workflow_state_manager WorkflowStateManager Unified state representation
flowdrop_runtime.entity_snapshot_storage EntitySnapshotStorage Persistent snapshot storage
flowdrop_runtime.error_handler ErrorHandler Error handling and recovery
flowdrop_runtime.execution_monitor ExecutionMonitor Execution monitoring

Entities

FlowDropWorkflowSnapshot (Content Entity)

Stores serialized execution state for resumption, handover, and audit.

Events

Event Description
flowdrop.pipeline.created Pipeline created
flowdrop.pipeline.started Pipeline execution started
flowdrop.pipeline.completed Pipeline completed successfully
flowdrop.pipeline.failed Pipeline execution failed
flowdrop.pipeline.paused Pipeline paused (e.g., for interrupt)
flowdrop.pipeline.cancelled Pipeline cancelled
flowdrop.job.created Job created
flowdrop.job.started Job execution started
flowdrop.job.completed Job completed successfully
flowdrop.job.failed Job execution failed

Key DTOs

DTO Description
OrchestrationRequest Request object containing workflow, pipeline, and input data
OrchestrationResponse Response with execution results
NodeExecutionContext Context passed to node processors during execution
NodeExecutionResult Result from individual node execution
ExecutionStatus Real-time execution status
WorkflowSnapshot Complete serialized workflow execution state
NodeSnapshot Individual node execution state within a snapshot

API Endpoints

Execution Status:

Method Path Description
GET /api/flowdrop-runtime/status/{executionId} Get execution status
GET /api/flowdrop-runtime/status/{executionId}/nodes Get node statuses
GET /api/flowdrop-runtime/status/{executionId}/details Get execution details
GET /api/flowdrop-runtime/health Health check
GET /api/flowdrop-runtime/metrics System metrics

Snapshots:

Method Path Description
GET /api/flowdrop-runtime/snapshot/{execution_id} Get snapshot
POST /api/flowdrop-runtime/snapshot Save snapshot
DELETE /api/flowdrop-runtime/snapshot/{execution_id} Delete snapshot
GET /api/flowdrop-runtime/snapshots List snapshots

References