Skip to Content

External Tasks

ExternalTasks sind Service Tasks in einem BPMN-Prozess, deren Logik außerhalb der Engine ausgeführt wird. Die Engine stellt die Aufgabe bereit, ein externer Worker holt sie ab, verarbeitet sie und meldet das Ergebnis zurück.

Dieses Muster ermöglicht:

  • Entkopplung — Die Geschäftslogik lebt im Python-Code, nicht in der Engine
  • Skalierung — Mehrere Worker können dasselbe Topic parallel bedienen
  • Technologiefreiheit — Der Worker kann beliebige Bibliotheken und Services nutzen

Fetch-and-Lock-Zyklus

Der ExternalTaskClient verwendet Long Polling, um auf neue Aufgaben zu warten.

Lock-Verlängerung

Wenn die Verarbeitung länger dauert als die Lock-Duration, verlängert der Worker den Lock automatisch. Der Timer wird bei 90% der Lock-Duration ausgelöst.

ExternalTask-Lebenszyklus

API

ExternalTaskClient

from processcube_client import ExternalTaskClient client = ExternalTaskClient(url, session=None, identity=None, loop=None, **kwargs)
ParameterBeschreibung
urlEngine-URL, z.B. "http://localhost:56100"
identityOptional: Dict {"token": "..."} oder Callable
loopOptional: Eigener asyncio Event Loop
worker_idOptional (in kwargs): Eigene Worker-ID (Default: UUID)

Topic abonnieren

client.subscribe_to_external_task_topic(topic, handler, **options)
OptionBeschreibungDefault
max_tasksMaximale Anzahl gleichzeitig abgeholter Tasks10
long_polling_timeout_in_msTimeout für Long Polling in Millisekunden10000
lock_duration_in_msDauer des Locks in Millisekunden100000
payload_filterFilter für den Task-PayloadNone

Starten und Stoppen

client.start() # Startet den Event Loop (blockierend) client.stop() # Stoppt den Client

Handler

Der Handler ist eine Funktion, die für jeden ExternalTask aufgerufen wird.

Einfacher Handler (nur Payload)

def handle_task(payload): name = payload.get("name", "Welt") return {"greeting": f"Hallo {name}!"}

Erweiterter Handler (Payload + ExternalTask)

Wenn der Handler einen zweiten Parameter akzeptiert, erhält er das gesamte ExternalTask-Objekt:

def handle_task(payload, external_task): task_id = external_task["id"] correlation_id = external_task["correlationId"] process_instance_id = external_task["processInstanceId"] print(f"Task {task_id} in Prozess {process_instance_id}") return {"status": "verarbeitet"}

Async-Handler

import aiohttp async def handle_task(payload): async with aiohttp.ClientSession() as session: async with session.get(f"https://api.example.com/{payload['id']}") as resp: data = await resp.json() return {"result": data}

Fehlerbehandlung

FunctionalError (BPMN-Fehler)

Ein FunctionalError signalisiert einen fachlichen Fehler, der im BPMN-Prozess behandelt werden kann (z.B. über einen Error Boundary Event):

from processcube_client.external_task import FunctionalError def handle_task(payload): if not payload.get("email"): raise FunctionalError( code="VALIDATION_ERROR", message="E-Mail-Adresse fehlt", details="Das Feld 'email' ist erforderlich" ) return {"status": "ok"}

Technische Fehler

Jede andere Exception wird als technischer Fehler an die Engine gemeldet:

def handle_task(payload): result = 1 / 0 # ZeroDivisionError → technischer Fehler return result

Vollständiges Beispiel

import logging from processcube_client import ExternalTaskClient from processcube_client.external_task import FunctionalError logging.basicConfig(level=logging.INFO) ENGINE_URL = "http://localhost:56100" def bestellung_pruefen(payload): artikel = payload.get("artikel", []) if not artikel: raise FunctionalError( code="LEERE_BESTELLUNG", message="Bestellung enthält keine Artikel" ) gesamtpreis = sum(a.get("preis", 0) * a.get("menge", 1) for a in artikel) return { "gesamtpreis": gesamtpreis, "artikelanzahl": len(artikel), "status": "geprueft" } def versand_vorbereiten(payload, external_task): print(f"Versand für Prozess: {external_task['processInstanceId']}") return {"versand_status": "vorbereitet"} client = ExternalTaskClient(ENGINE_URL) client.subscribe_to_external_task_topic( "BestellungPruefen", bestellung_pruefen, max_tasks=5, long_polling_timeout_in_ms=30000, lock_duration_in_ms=60000 ) client.subscribe_to_external_task_topic( "VersandVorbereiten", versand_vorbereiten ) client.start()