Creating Custom Orchestrators¶
An orchestrator controls how FlowDrop executes a workflow — whether synchronously in the current request, asynchronously via a queue, or with state management and checkpointing. You can create a custom orchestrator when none of the built-in strategies fit your needs.
When to create a custom orchestrator
Most use cases are covered by the four built-in orchestrators (Synchronous, Synchronous Pipeline, Asynchronous, StateGraph). Build a custom one if you need: a different execution backend (e.g., AWS Lambda, a job runner), custom retry/circuit-breaker logic, or integration with an external orchestration system.
API Stability
OrchestratorPluginInterface, OrchestratorPluginBase, and the Orchestrator attribute are @api in the 1.x release line and covered by the BC policy.
Plugin Discovery¶
FlowDrop discovers orchestrator plugins from any enabled module. Place your class in src/Plugin/Orchestrator/ within your module and annotate it with #[Orchestrator].
Step 1: Create the Plugin Class¶
<?php
declare(strict_types=1);
namespace Drupal\my_module\Plugin\Orchestrator;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\flowdrop_orchestration\Attribute\Orchestrator;
use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationRequest;
use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationResponse;
use Drupal\flowdrop_orchestration\Plugin\Orchestrator\OrchestratorPluginBase;
/**
* A custom orchestrator that logs every execution to a remote webhook.
*
* @internal This class is not part of the public API.
*/
#[Orchestrator(
id: 'my_module:webhook_async',
label: new TranslatableMarkup('Webhook Async'),
description: 'Executes workflows asynchronously and posts results to a webhook.',
capabilities: [
'async_execution',
'pipeline_based',
'job_tracking',
],
)]
class WebhookAsyncOrchestrator extends OrchestratorPluginBase {
/**
* {@inheritdoc}
*/
public function orchestrate(OrchestrationRequest $request): OrchestrationResponse {
// $request->workflowSnapshot — the compiled workflow (nodes + edges)
// $request->initialData — data passed to the first node
// $request->options — orchestrator settings from trigger config
// Your execution logic here. This example delegates to the built-in
// async orchestrator and adds a webhook notification.
// Typically you would:
// 1. Create a Pipeline entity.
// 2. Queue each node as a Job.
// 3. Return the pipeline ID for status tracking.
return new OrchestrationResponse(
success: TRUE,
executionId: $this->generateExecutionId(),
data: [],
);
}
}
#[Orchestrator] Attribute Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
id |
string | Yes | Unique plugin ID. Namespace with your module: my_module:my_orchestrator |
label |
TranslatableMarkup | Yes | Shown in the orchestrator selector dropdown |
description |
string | No | Describes the execution strategy |
capabilities |
array | No | String list of capabilities (see below) |
Capabilities¶
Capabilities are string flags that advertise what your orchestrator supports. FlowDrop uses them for compatibility checks.
| Capability | Meaning |
|---|---|
synchronous_execution |
Runs in the current HTTP request and returns results immediately |
async_execution |
Runs via background queue |
parallel_execution |
Can run multiple jobs simultaneously |
pipeline_based |
Creates Pipeline and Job entities (enables admin UI tracking) |
job_tracking |
Tracks individual job statuses |
stateful |
Maintains execution state with checkpointing |
retry_support |
Can retry failed jobs |
interrupt_support |
Supports human-in-the-loop pauses |
checkpointing |
Creates state checkpoints during execution |
Step 2: Understand OrchestrationRequest¶
// Key properties of OrchestrationRequest:
$request->workflowSnapshot; // WorkflowSnapshot DTO — the compiled workflow graph
$request->initialData; // array — input data for the workflow
$request->options; // array — orchestrator settings (priority, timeout, etc.)
$request->sessionId; // string|null — session ID for interactive workflows
The workflowSnapshot contains the resolved node graph. Access nodes and edges:
$snapshot = $request->workflowSnapshot;
$nodes = $snapshot->getNodes(); // array of NodeSnapshot objects
$edges = $snapshot->getEdges(); // array of edge definitions
Step 3: Understand OrchestrationResponse¶
use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationResponse;
// Successful execution with data:
return new OrchestrationResponse(
success: TRUE,
executionId: $this->generateExecutionId(),
data: ['result' => 'value'],
);
// Failed execution:
return new OrchestrationResponse(
success: FALSE,
executionId: $this->generateExecutionId(),
error: 'Reason the orchestration failed',
data: [],
);
Step 4: Add a Configuration Form (Optional)¶
Override buildConfigurationForm() to expose settings in the trigger configuration UI:
use Drupal\Core\Form\FormStateInterface;
public function buildConfigurationForm(
array $form,
FormStateInterface $form_state,
array $settings,
): array {
// Include the base form fields (priority, timeout).
$form = parent::buildConfigurationForm($form, $form_state, $settings);
$form['webhook_url'] = [
'#type' => 'url',
'#title' => $this->t('Webhook URL'),
'#description' => $this->t('URL to POST results to on completion.'),
'#default_value' => $settings['webhook_url'] ?? '',
'#required' => TRUE,
];
return $form;
}
public function submitConfigurationForm(
array $form,
FormStateInterface $form_state,
): array {
$values = parent::submitConfigurationForm($form, $form_state);
$formValues = $form_state->getValue('orchestrator_settings') ?? [];
$values['webhook_url'] = $formValues['webhook_url'] ?? '';
return $values;
}
public function getDefaultConfiguration(): array {
return array_merge(parent::getDefaultConfiguration(), [
'webhook_url' => '',
]);
}
Step 5: Handling Interrupts¶
If your orchestrator supports interrupt_support, catch FlowDropInterruptException during execution:
use Drupal\flowdrop_orchestration\Exception\FlowDropInterruptException;
use Drupal\flowdrop_orchestration\Exception\InterruptExceptionInterface;
public function orchestrate(OrchestrationRequest $request): OrchestrationResponse {
try {
// ... execute nodes ...
}
catch (InterruptExceptionInterface $e) {
// Workflow paused for user input. Save checkpoint and return.
return new OrchestrationResponse(
success: TRUE,
executionId: $e->getExecutionId(),
data: ['status' => 'paused', 'interrupt_id' => $e->getInterruptId()],
);
}
}
Verifying Your Orchestrator Is Discovered¶
After enabling your module:
drush php:eval "
\$manager = \Drupal::service('flowdrop_orchestration.plugin_manager');
print_r(array_keys(\$manager->getDefinitions()));
"
Your orchestrator ID should appear in the list. It will also appear in the orchestrator selector when configuring trigger nodes in the visual editor.
Next Steps¶
- Create a Custom Trigger — Fire workflows on custom events
- Node Processor Plugin System — Create custom nodes
- BC Policy — What is and isn't covered by backward compatibility guarantees