External Tasks
ExternalTasks sind Service Tasks in einem BPMN-Prozess, deren Logik außerhalb der Engine ausgeführt wird. Die Engine stellt die Aufgabe bereit, ein externer Worker holt sie ab, verarbeitet sie und meldet das Ergebnis zurück.
Dieses Muster ermöglicht:
- Entkopplung — Die Geschäftslogik lebt im Python-Code, nicht in der Engine
- Skalierung — Mehrere Worker können dasselbe Topic parallel bedienen
- Technologiefreiheit — Der Worker kann beliebige Bibliotheken und Services nutzen
Fetch-and-Lock-Zyklus
Der ExternalTaskClient verwendet Long Polling, um auf neue Aufgaben zu warten.
Lock-Verlängerung
Wenn die Verarbeitung länger dauert als die Lock-Duration, verlängert der Worker den Lock automatisch. Der Timer wird bei 90% der Lock-Duration ausgelöst.
ExternalTask-Lebenszyklus
API
ExternalTaskClient
from processcube_client import ExternalTaskClient
client = ExternalTaskClient(url, session=None, identity=None, loop=None, **kwargs)| Parameter | Beschreibung |
|---|---|
url | Engine-URL, z.B. "http://localhost:56100" |
identity | Optional: Dict {"token": "..."} oder Callable |
loop | Optional: Eigener asyncio Event Loop |
worker_id | Optional (in kwargs): Eigene Worker-ID (Default: UUID) |
Topic abonnieren
client.subscribe_to_external_task_topic(topic, handler, **options)| Option | Beschreibung | Default |
|---|---|---|
max_tasks | Maximale Anzahl gleichzeitig abgeholter Tasks | 10 |
long_polling_timeout_in_ms | Timeout für Long Polling in Millisekunden | 10000 |
lock_duration_in_ms | Dauer des Locks in Millisekunden | 100000 |
payload_filter | Filter für den Task-Payload | None |
Starten und Stoppen
client.start() # Startet den Event Loop (blockierend)
client.stop() # Stoppt den ClientHandler
Der Handler ist eine Funktion, die für jeden ExternalTask aufgerufen wird.
Einfacher Handler (nur Payload)
def handle_task(payload):
name = payload.get("name", "Welt")
return {"greeting": f"Hallo {name}!"}Erweiterter Handler (Payload + ExternalTask)
Wenn der Handler einen zweiten Parameter akzeptiert, erhält er das gesamte ExternalTask-Objekt:
def handle_task(payload, external_task):
task_id = external_task["id"]
correlation_id = external_task["correlationId"]
process_instance_id = external_task["processInstanceId"]
print(f"Task {task_id} in Prozess {process_instance_id}")
return {"status": "verarbeitet"}Async-Handler
import aiohttp
async def handle_task(payload):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{payload['id']}") as resp:
data = await resp.json()
return {"result": data}Fehlerbehandlung
FunctionalError (BPMN-Fehler)
Ein FunctionalError signalisiert einen fachlichen Fehler, der im BPMN-Prozess behandelt werden kann (z.B. über einen Error Boundary Event):
from processcube_client.external_task import FunctionalError
def handle_task(payload):
if not payload.get("email"):
raise FunctionalError(
code="VALIDATION_ERROR",
message="E-Mail-Adresse fehlt",
details="Das Feld 'email' ist erforderlich"
)
return {"status": "ok"}Technische Fehler
Jede andere Exception wird als technischer Fehler an die Engine gemeldet:
def handle_task(payload):
result = 1 / 0 # ZeroDivisionError → technischer Fehler
return resultVollständiges Beispiel
import logging
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError
logging.basicConfig(level=logging.INFO)
ENGINE_URL = "http://localhost:56100"
def bestellung_pruefen(payload):
artikel = payload.get("artikel", [])
if not artikel:
raise FunctionalError(
code="LEERE_BESTELLUNG",
message="Bestellung enthält keine Artikel"
)
gesamtpreis = sum(a.get("preis", 0) * a.get("menge", 1) for a in artikel)
return {
"gesamtpreis": gesamtpreis,
"artikelanzahl": len(artikel),
"status": "geprueft"
}
def versand_vorbereiten(payload, external_task):
print(f"Versand für Prozess: {external_task['processInstanceId']}")
return {"versand_status": "vorbereitet"}
client = ExternalTaskClient(ENGINE_URL)
client.subscribe_to_external_task_topic(
"BestellungPruefen",
bestellung_pruefen,
max_tasks=5,
long_polling_timeout_in_ms=30000,
lock_duration_in_ms=60000
)
client.subscribe_to_external_task_topic(
"VersandVorbereiten",
versand_vorbereiten
)
client.start()