Skip to content

Creating Custom Orchestrators

An orchestrator controls how FlowDrop executes a workflow — whether synchronously in the current request, asynchronously via a queue, or with state management and checkpointing. You can create a custom orchestrator when none of the built-in strategies fit your needs.

When to create a custom orchestrator

Most use cases are covered by the four built-in orchestrators (Synchronous, Synchronous Pipeline, Asynchronous, StateGraph). Build a custom one if you need: a different execution backend (e.g., AWS Lambda, a job runner), custom retry/circuit-breaker logic, or integration with an external orchestration system.

API Stability

OrchestratorPluginInterface, OrchestratorPluginBase, and the Orchestrator attribute are @api in the 1.x release line and covered by the BC policy.

Plugin Discovery

FlowDrop discovers orchestrator plugins from any enabled module. Place your class in src/Plugin/Orchestrator/ within your module and annotate it with #[Orchestrator].

Step 1: Create the Plugin Class

<?php

declare(strict_types=1);

namespace Drupal\my_module\Plugin\Orchestrator;

use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\flowdrop_orchestration\Attribute\Orchestrator;
use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationRequest;
use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationResponse;
use Drupal\flowdrop_orchestration\Plugin\Orchestrator\OrchestratorPluginBase;

/**
 * A custom orchestrator that logs every execution to a remote webhook.
 *
 * @internal This class is not part of the public API.
 */
#[Orchestrator(
  id: 'my_module:webhook_async',
  label: new TranslatableMarkup('Webhook Async'),
  description: 'Executes workflows asynchronously and posts results to a webhook.',
  capabilities: [
    'async_execution',
    'pipeline_based',
    'job_tracking',
  ],
)]
class WebhookAsyncOrchestrator extends OrchestratorPluginBase {

  /**
   * {@inheritdoc}
   */
  public function orchestrate(OrchestrationRequest $request): OrchestrationResponse {
    // $request->workflowSnapshot — the compiled workflow (nodes + edges)
    // $request->initialData     — data passed to the first node
    // $request->options         — orchestrator settings from trigger config

    // Your execution logic here. This example delegates to the built-in
    // async orchestrator and adds a webhook notification.

    // Typically you would:
    // 1. Create a Pipeline entity.
    // 2. Queue each node as a Job.
    // 3. Return the pipeline ID for status tracking.

    return new OrchestrationResponse(
      success: TRUE,
      executionId: $this->generateExecutionId(),
      data: [],
    );
  }

}

#[Orchestrator] Attribute Parameters

Parameter Type Required Description
id string Yes Unique plugin ID. Namespace with your module: my_module:my_orchestrator
label TranslatableMarkup Yes Shown in the orchestrator selector dropdown
description string No Describes the execution strategy
capabilities array No String list of capabilities (see below)

Capabilities

Capabilities are string flags that advertise what your orchestrator supports. FlowDrop uses them for compatibility checks.

Capability Meaning
synchronous_execution Runs in the current HTTP request and returns results immediately
async_execution Runs via background queue
parallel_execution Can run multiple jobs simultaneously
pipeline_based Creates Pipeline and Job entities (enables admin UI tracking)
job_tracking Tracks individual job statuses
stateful Maintains execution state with checkpointing
retry_support Can retry failed jobs
interrupt_support Supports human-in-the-loop pauses
checkpointing Creates state checkpoints during execution

Step 2: Understand OrchestrationRequest

// Key properties of OrchestrationRequest:
$request->workflowSnapshot;  // WorkflowSnapshot DTO — the compiled workflow graph
$request->initialData;       // array — input data for the workflow
$request->options;           // array — orchestrator settings (priority, timeout, etc.)
$request->sessionId;         // string|null — session ID for interactive workflows

The workflowSnapshot contains the resolved node graph. Access nodes and edges:

$snapshot = $request->workflowSnapshot;
$nodes    = $snapshot->getNodes();   // array of NodeSnapshot objects
$edges    = $snapshot->getEdges();   // array of edge definitions

Step 3: Understand OrchestrationResponse

use Drupal\flowdrop_orchestration\DTO\Orchestrator\OrchestrationResponse;

// Successful execution with data:
return new OrchestrationResponse(
  success: TRUE,
  executionId: $this->generateExecutionId(),
  data: ['result' => 'value'],
);

// Failed execution:
return new OrchestrationResponse(
  success: FALSE,
  executionId: $this->generateExecutionId(),
  error: 'Reason the orchestration failed',
  data: [],
);

Step 4: Add a Configuration Form (Optional)

Override buildConfigurationForm() to expose settings in the trigger configuration UI:

use Drupal\Core\Form\FormStateInterface;

public function buildConfigurationForm(
  array $form,
  FormStateInterface $form_state,
  array $settings,
): array {
  // Include the base form fields (priority, timeout).
  $form = parent::buildConfigurationForm($form, $form_state, $settings);

  $form['webhook_url'] = [
    '#type'          => 'url',
    '#title'         => $this->t('Webhook URL'),
    '#description'   => $this->t('URL to POST results to on completion.'),
    '#default_value' => $settings['webhook_url'] ?? '',
    '#required'      => TRUE,
  ];

  return $form;
}

public function submitConfigurationForm(
  array $form,
  FormStateInterface $form_state,
): array {
  $values = parent::submitConfigurationForm($form, $form_state);

  $formValues = $form_state->getValue('orchestrator_settings') ?? [];
  $values['webhook_url'] = $formValues['webhook_url'] ?? '';

  return $values;
}

public function getDefaultConfiguration(): array {
  return array_merge(parent::getDefaultConfiguration(), [
    'webhook_url' => '',
  ]);
}

Step 5: Handling Interrupts

If your orchestrator supports interrupt_support, catch FlowDropInterruptException during execution:

use Drupal\flowdrop_orchestration\Exception\FlowDropInterruptException;
use Drupal\flowdrop_orchestration\Exception\InterruptExceptionInterface;

public function orchestrate(OrchestrationRequest $request): OrchestrationResponse {
  try {
    // ... execute nodes ...
  }
  catch (InterruptExceptionInterface $e) {
    // Workflow paused for user input. Save checkpoint and return.
    return new OrchestrationResponse(
      success: TRUE,
      executionId: $e->getExecutionId(),
      data: ['status' => 'paused', 'interrupt_id' => $e->getInterruptId()],
    );
  }
}

Verifying Your Orchestrator Is Discovered

After enabling your module:

drush php:eval "
  \$manager = \Drupal::service('flowdrop_orchestration.plugin_manager');
  print_r(array_keys(\$manager->getDefinitions()));
"

Your orchestrator ID should appear in the list. It will also appear in the orchestrator selector when configuring trigger nodes in the visual editor.

Next Steps