External Tasks
External Tasks sind ein zentrales Muster in BPMN 2.0 zur Auslagerung von Arbeitsschritten an externe Worker.
Konzept
Dieses Muster entkoppelt die Prozessausführung von der eigentlichen Geschäftslogik und ermöglicht:
- Skalierung: Beliebig viele Worker können dieselben Topics verarbeiten
- Sprachunabhängigkeit: Worker können in jeder Sprache implementiert werden
- Fehlertoleranz: Locks verfallen automatisch, sodass fehlgeschlagene Tasks erneut verarbeitet werden
- Langläufige Aufgaben: Locks werden automatisch verlängert
Lebenszyklus eines ExternalTasks
| State | Beschreibung |
|---|---|
pending | Task wartet auf Verarbeitung oder wird gerade verarbeitet |
finished | Task wurde erfolgreich abgeschlossen oder mit Fehler beendet |
expired | Lock ist abgelaufen (nur bei Single-Try Tasks) |
Single-Try vs. Multi-Try:
- Multi-Try (Standard): Wenn der Lock abläuft, geht der Task zurück in
pendingund kann von einem anderen Worker abgeholt werden. - Single-Try: Wenn der Lock abläuft, geht der Task in
expiredund wird nicht erneut angeboten.
ExternalTaskWorker
Minimalbeispiel
import { ExternalTaskWorker } from '@5minds/processcube_engine_client';
const worker = new ExternalTaskWorker<MyPayload, MyResult>(
'http://localhost:10560',
'my_topic',
async (payload) => {
return { result: payload.value * 2 };
},
);
worker.start();Vollständiges Beispiel mit Typen
import { ExternalTaskWorker, DataModels } from '@5minds/processcube_engine_client';
interface OrderPayload {
orderId: string;
items: Array<{ productId: string; quantity: number }>;
}
interface OrderResult {
confirmationNumber: string;
totalPrice: number;
}
const worker = new ExternalTaskWorker<OrderPayload, OrderResult>(
'http://localhost:10560',
'process_order',
async (payload, externalTask, abortSignal) => {
const order = await processOrder(payload.orderId, payload.items);
if (abortSignal?.aborted) {
throw new Error('Task wurde abgebrochen');
}
return {
confirmationNumber: order.confirmation,
totalPrice: order.total,
};
},
{
maxTasks: 5,
lockDuration: 60000,
longpollingTimeout: 15000,
},
);
worker.start();Interner Ablauf des Workers
Worker-Konfiguration
import { ExternalTaskWorker, IExternalTaskWorkerConfig } from '@5minds/processcube_engine_client';
const config: IExternalTaskWorkerConfig = {
workerId: 'order-worker-1',
lockDuration: 30000,
maxTasks: 10,
idleTimeout: 500,
longpollingTimeout: 10000,
identity: { token: 'Bearer ...', userId: 'worker-user' },
payloadFilter: /urgent/,
};| Option | Typ | Standard | Beschreibung |
|---|---|---|---|
workerId | string | UUID | Eindeutige Worker-ID |
lockDuration | number | 30000 | Dauer des Locks in ms |
maxTasks | number | 10 | Max. Tasks pro Fetch-and-Lock |
idleTimeout | number | 500 | Wartezeit in ms, wenn keine Tasks vorhanden |
longpollingTimeout | number | 10000 | Long-Polling-Timeout in ms |
identity | IdentityLike | - | Authentifizierung |
payloadFilter | RegExp | - | Regex-Filter auf Task-Payload |
logger | ILogger | - | Eigener Logger |
Processing Function
Die Processing Function erhält drei Parameter:
type HandleExternalTaskAction<TPayload, TResult> = (
payload: TPayload,
externalTask?: ExternalTask<TPayload>,
signal?: AbortSignal,
) => TResult | Promise<TResult>;| Parameter | Beschreibung |
|---|---|
payload | Die Nutzdaten des Tasks (generisch typisiert) |
externalTask | Das vollständige ExternalTask-Objekt mit Metadaten |
signal | AbortSignal zum Reagieren auf Abbruch |
Fehler gezielt melden
import { ExternalTaskWorker, DataModels } from '@5minds/processcube_engine_client';
const worker = new ExternalTaskWorker<MyPayload, MyResult>(
'http://localhost:10560',
'validate_data',
async (payload) => {
if (!payload.email) {
return new DataModels.ExternalTasks.ExternalTaskError(
'VALIDATION_ERROR',
'E-Mail-Adresse fehlt',
{ field: 'email' },
);
}
return { valid: true };
},
);Fehlerbehandlung
Error-Handler registrieren
worker.onWorkerError((errorType, error, externalTask) => {
console.error(`[${errorType}]`, error.message);
});| ErrorType | Beschreibung |
|---|---|
fetchAndLock | Fehler beim Abholen von Tasks |
extendLock | Fehler bei der Lock-Verlängerung |
processExternalTask | Fehler in der Processing Function |
finishExternalTask | Fehler beim Abschließen des Tasks |
unprocessableExternalTask | Task konnte nicht verarbeitet werden |
identityRefresh | Fehler bei der Token-Erneuerung |
criticalIdentityFailure | Token-Erneuerung endgültig fehlgeschlagen |
permanentAuthorizationError | Dauerhafter 401/403-Fehler |
unexpectedLoopError | Unerwarteter Fehler in der Polling-Schleife |
Lock-Verlängerung
Die Lock-Verlängerung wird 5 Sekunden vor Ablauf des aktuellen Locks ausgelöst und verlängert jeweils um die konfigurierte lockDuration.
Health Monitoring
const health = worker.getHealth();| Eigenschaft | Typ | Beschreibung |
|---|---|---|
isPollingActive | boolean | Ob die Polling-Schleife aktiv ist |
consecutiveCriticalErrors | number | Anzahl aufeinanderfolgender kritischer Fehler |
maxConsecutiveCriticalErrors | number | Maximum (5), danach stoppt der Worker |
activeTaskCount | number | Anzahl aktuell verarbeiteter Tasks |
identityRefreshFailedCritically | boolean | Ob die Token-Erneuerung fehlgeschlagen ist |
if (!worker.isHealthy()) {
// Worker neu starten oder Alarm auslösen
}Heartbeat Monitoring
worker.onHeartbeat((operation, externalTaskId) => {
metrics.recordHeartbeat(operation, externalTaskId);
});Gemeldete Operationen: fetchAndLock, processExternalTask, extendLock, finishExternalTask, handleError
Graceful Shutdown
process.on('SIGTERM', () => {
worker.stop();
});stop() bewirkt:
- Polling-Schleife wird beendet
- Alle laufenden Tasks erhalten ein Abort-Signal
- Lock-Verlängerungen werden gestoppt
Für vollständiges Aufräumen:
worker.dispose();Multi-Topic Worker
import { ClientFactory } from '@5minds/processcube_engine_client';
const externalTaskApiClient = ClientFactory.createExternalTaskClient('http://localhost:10560');
const tasks = await externalTaskApiClient.fetchAndLockExternalTasks(
'my-worker-id',
['topic_a', 'topic_b', 'topic_c'],
10,
10000,
30000,
);Payload-Filter
const worker = new ExternalTaskWorker<MyPayload, MyResult>(
'http://localhost:10560',
'notifications',
handleNotification,
{ payloadFilter: /priority.*high/ },
);ExternalTask REST API
| Methode | Endpunkt | Beschreibung |
|---|---|---|
GET | /atlas_engine/api/v1/external_tasks/deployed_topics | Alle aktiven Topics abrufen |
POST | /atlas_engine/api/v1/external_tasks/fetch_and_lock | Tasks abholen und sperren |
PUT | /atlas_engine/api/v1/external_tasks/{id}/extend_lock | Lock verlängern |
PUT | /atlas_engine/api/v1/external_tasks/{id}/finish | Task abschließen |
PUT | /atlas_engine/api/v1/external_tasks/{id}/error | Fehler melden |
ExternalTask-Datenmodell
type ExternalTask<TPayload> = {
id: string;
workerId: string;
topic: string;
isSingleTry: boolean;
flowNodeInstanceId: string;
correlationId: string;
processDefinitionId: string;
processModelId?: string;
processInstanceId: string;
ownerId: string;
payload: TPayload;
lockExpirationTime: Date;
state: ExternalTaskState; // 'pending' | 'finished' | 'expired'
finishedAt?: Date;
result?: any;
error?: any;
createdAt?: Date;
};