Skip to Content

External Tasks

External Tasks sind ein zentrales Muster in BPMN 2.0 zur Auslagerung von Arbeitsschritten an externe Worker.

Konzept

Dieses Muster entkoppelt die Prozessausführung von der eigentlichen Geschäftslogik und ermöglicht:

  • Skalierung: Beliebig viele Worker können dieselben Topics verarbeiten
  • Sprachunabhängigkeit: Worker können in jeder Sprache implementiert werden
  • Fehlertoleranz: Locks verfallen automatisch, sodass fehlgeschlagene Tasks erneut verarbeitet werden
  • Langläufige Aufgaben: Locks werden automatisch verlängert

Lebenszyklus eines ExternalTasks

StateBeschreibung
pendingTask wartet auf Verarbeitung oder wird gerade verarbeitet
finishedTask wurde erfolgreich abgeschlossen oder mit Fehler beendet
expiredLock ist abgelaufen (nur bei Single-Try Tasks)

Single-Try vs. Multi-Try:

  • Multi-Try (Standard): Wenn der Lock abläuft, geht der Task zurück in pending und kann von einem anderen Worker abgeholt werden.
  • Single-Try: Wenn der Lock abläuft, geht der Task in expired und wird nicht erneut angeboten.

ExternalTaskWorker

Minimalbeispiel

import { ExternalTaskWorker } from '@5minds/processcube_engine_client'; const worker = new ExternalTaskWorker<MyPayload, MyResult>( 'http://localhost:10560', 'my_topic', async (payload) => { return { result: payload.value * 2 }; }, ); worker.start();

Vollständiges Beispiel mit Typen

import { ExternalTaskWorker, DataModels } from '@5minds/processcube_engine_client'; interface OrderPayload { orderId: string; items: Array<{ productId: string; quantity: number }>; } interface OrderResult { confirmationNumber: string; totalPrice: number; } const worker = new ExternalTaskWorker<OrderPayload, OrderResult>( 'http://localhost:10560', 'process_order', async (payload, externalTask, abortSignal) => { const order = await processOrder(payload.orderId, payload.items); if (abortSignal?.aborted) { throw new Error('Task wurde abgebrochen'); } return { confirmationNumber: order.confirmation, totalPrice: order.total, }; }, { maxTasks: 5, lockDuration: 60000, longpollingTimeout: 15000, }, ); worker.start();

Interner Ablauf des Workers

Worker-Konfiguration

import { ExternalTaskWorker, IExternalTaskWorkerConfig } from '@5minds/processcube_engine_client'; const config: IExternalTaskWorkerConfig = { workerId: 'order-worker-1', lockDuration: 30000, maxTasks: 10, idleTimeout: 500, longpollingTimeout: 10000, identity: { token: 'Bearer ...', userId: 'worker-user' }, payloadFilter: /urgent/, };
OptionTypStandardBeschreibung
workerIdstringUUIDEindeutige Worker-ID
lockDurationnumber30000Dauer des Locks in ms
maxTasksnumber10Max. Tasks pro Fetch-and-Lock
idleTimeoutnumber500Wartezeit in ms, wenn keine Tasks vorhanden
longpollingTimeoutnumber10000Long-Polling-Timeout in ms
identityIdentityLike-Authentifizierung
payloadFilterRegExp-Regex-Filter auf Task-Payload
loggerILogger-Eigener Logger

Processing Function

Die Processing Function erhält drei Parameter:

type HandleExternalTaskAction<TPayload, TResult> = ( payload: TPayload, externalTask?: ExternalTask<TPayload>, signal?: AbortSignal, ) => TResult | Promise<TResult>;
ParameterBeschreibung
payloadDie Nutzdaten des Tasks (generisch typisiert)
externalTaskDas vollständige ExternalTask-Objekt mit Metadaten
signalAbortSignal zum Reagieren auf Abbruch

Fehler gezielt melden

import { ExternalTaskWorker, DataModels } from '@5minds/processcube_engine_client'; const worker = new ExternalTaskWorker<MyPayload, MyResult>( 'http://localhost:10560', 'validate_data', async (payload) => { if (!payload.email) { return new DataModels.ExternalTasks.ExternalTaskError( 'VALIDATION_ERROR', 'E-Mail-Adresse fehlt', { field: 'email' }, ); } return { valid: true }; }, );

Fehlerbehandlung

Error-Handler registrieren

worker.onWorkerError((errorType, error, externalTask) => { console.error(`[${errorType}]`, error.message); });
ErrorTypeBeschreibung
fetchAndLockFehler beim Abholen von Tasks
extendLockFehler bei der Lock-Verlängerung
processExternalTaskFehler in der Processing Function
finishExternalTaskFehler beim Abschließen des Tasks
unprocessableExternalTaskTask konnte nicht verarbeitet werden
identityRefreshFehler bei der Token-Erneuerung
criticalIdentityFailureToken-Erneuerung endgültig fehlgeschlagen
permanentAuthorizationErrorDauerhafter 401/403-Fehler
unexpectedLoopErrorUnerwarteter Fehler in der Polling-Schleife

Lock-Verlängerung

Die Lock-Verlängerung wird 5 Sekunden vor Ablauf des aktuellen Locks ausgelöst und verlängert jeweils um die konfigurierte lockDuration.

Health Monitoring

const health = worker.getHealth();
EigenschaftTypBeschreibung
isPollingActivebooleanOb die Polling-Schleife aktiv ist
consecutiveCriticalErrorsnumberAnzahl aufeinanderfolgender kritischer Fehler
maxConsecutiveCriticalErrorsnumberMaximum (5), danach stoppt der Worker
activeTaskCountnumberAnzahl aktuell verarbeiteter Tasks
identityRefreshFailedCriticallybooleanOb die Token-Erneuerung fehlgeschlagen ist
if (!worker.isHealthy()) { // Worker neu starten oder Alarm auslösen }

Heartbeat Monitoring

worker.onHeartbeat((operation, externalTaskId) => { metrics.recordHeartbeat(operation, externalTaskId); });

Gemeldete Operationen: fetchAndLock, processExternalTask, extendLock, finishExternalTask, handleError

Graceful Shutdown

process.on('SIGTERM', () => { worker.stop(); });

stop() bewirkt:

  1. Polling-Schleife wird beendet
  2. Alle laufenden Tasks erhalten ein Abort-Signal
  3. Lock-Verlängerungen werden gestoppt

Für vollständiges Aufräumen:

worker.dispose();

Multi-Topic Worker

import { ClientFactory } from '@5minds/processcube_engine_client'; const externalTaskApiClient = ClientFactory.createExternalTaskClient('http://localhost:10560'); const tasks = await externalTaskApiClient.fetchAndLockExternalTasks( 'my-worker-id', ['topic_a', 'topic_b', 'topic_c'], 10, 10000, 30000, );

Payload-Filter

const worker = new ExternalTaskWorker<MyPayload, MyResult>( 'http://localhost:10560', 'notifications', handleNotification, { payloadFilter: /priority.*high/ }, );

ExternalTask REST API

MethodeEndpunktBeschreibung
GET/atlas_engine/api/v1/external_tasks/deployed_topicsAlle aktiven Topics abrufen
POST/atlas_engine/api/v1/external_tasks/fetch_and_lockTasks abholen und sperren
PUT/atlas_engine/api/v1/external_tasks/{id}/extend_lockLock verlängern
PUT/atlas_engine/api/v1/external_tasks/{id}/finishTask abschließen
PUT/atlas_engine/api/v1/external_tasks/{id}/errorFehler melden

ExternalTask-Datenmodell

type ExternalTask<TPayload> = { id: string; workerId: string; topic: string; isSingleTry: boolean; flowNodeInstanceId: string; correlationId: string; processDefinitionId: string; processModelId?: string; processInstanceId: string; ownerId: string; payload: TPayload; lockExpirationTime: Date; state: ExternalTaskState; // 'pending' | 'finished' | 'expired' finishedAt?: Date; result?: any; error?: any; createdAt?: Date; };