External Tasks
Das External Task Pattern ermöglicht die Integration externer Systeme über ein Worker-Modell. Der Python Client bietet einen ExternalTaskClient mit integriertem Event-Loop.
Grundlegendes Beispiel
from processcube_client.external_task import ExternalTaskClient
import logging
logger = logging.getLogger(__name__)
# Handler-Funktion
def handle_payment(payload, task):
"""Verarbeitet Zahlungs-Tasks."""
logger.info(f"Verarbeite Zahlung: {payload}")
# Business-Logik
amount = payload.get('amount', 0)
customer = payload.get('customer', 'Unknown')
# Zahlung verarbeiten
transaction_id = process_payment(customer, amount)
# Ergebnis zurückgeben
return {
'success': True,
'transactionId': transaction_id
}
# Worker erstellen
client = ExternalTaskClient('http://localhost:8000')
# Topic registrieren
client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment)
# Worker starten
client.start()Der ExternalTaskClient startet einen integrierten Event-Loop und pollt die Engine automatisch nach neuen Tasks.
Worker-Konfiguration
Mit Custom Config
from processcube_client.external_task import ExternalTaskClient, ExternalTaskConfig
# Konfiguration erstellen
config = ExternalTaskConfig(
max_tasks=25, # Max. Tasks pro Poll
lock_duration=60000, # Lock-Dauer in ms
poll_interval=5000, # Poll-Intervall in ms
worker_id='payment-worker' # Worker-ID
)
# Client mit Config
client = ExternalTaskClient('http://localhost:8000', config=config)
client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment)
client.start()Mit Authentifizierung
# Client mit OAuth 2.0
client = ExternalTaskClient(
'http://localhost:8000',
client_id='payment-worker',
client_secret='secret-key',
authority_url='http://localhost:11235'
)
client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment)
client.start()Mehrere Worker
Sie können mehrere Topics gleichzeitig verarbeiten:
from processcube_client.external_task import ExternalTaskClient
client = ExternalTaskClient('http://localhost:8000')
# Email-Handler
def handle_email(payload, task):
recipient = payload['recipient']
subject = payload['subject']
send_email(recipient, subject, payload.get('body', ''))
return {'sent': True}
# SMS-Handler
def handle_sms(payload, task):
phone = payload['phone']
message = payload['message']
send_sms(phone, message)
return {'sent': True}
# Payment-Handler
def handle_payment(payload, task):
amount = payload['amount']
transaction_id = process_payment(amount)
return {'transactionId': transaction_id}
# Alle Topics registrieren
client.subscribe_to_external_task_for_topic('SendEmail', handle_email)
client.subscribe_to_external_task_for_topic('SendSMS', handle_sms)
client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment)
# Worker starten
client.start()Fehlerbehandlung
Standard Exception
def handle_external_task(payload, task):
try:
# Business-Logik
result = process_data(payload)
return {'success': True, 'result': result}
except Exception as e:
# Exception wird als technischer Fehler gemeldet
raise Exception(f"Verarbeitung fehlgeschlagen: {str(e)}")BPMN Fehler (FunctionalError)
from processcube_client.external_task import FunctionalError
def handle_approval_task(payload, task):
amount = payload.get('amount', 0)
# Geschäftsregel prüfen
if amount > 10000:
# BPMN-Fehler werfen (für Error Boundary Events)
raise FunctionalError(
'AMOUNT_TOO_HIGH',
f'Betrag {amount} überschreitet Limit von 10000'
)
# Normal verarbeiten
return {'approved': True}FunctionalError wird als BPMN-Fehler behandelt und kann von Error Boundary Events abgefangen werden.
Async Handler
Der External Task Client unterstützt auch asynchrone Handler:
import asyncio
from processcube_client.external_task import ExternalTaskClient
# Async Handler
async def handle_async_task(payload, task):
"""Async Handler für I/O-intensive Operationen."""
# Async I/O-Operation
result = await fetch_data_from_api(payload['url'])
await asyncio.sleep(1) # Simulate async work
return {'data': result}
# Client erstellen
client = ExternalTaskClient('http://localhost:8000')
# Async Handler registrieren
client.subscribe_to_external_task_for_topic('FetchData', handle_async_task)
# Worker starten
client.start()Worker mit eigenem Event-Loop
Wenn Sie den Event-Loop selbst verwalten möchten:
import asyncio
from processcube_client.external_task import ExternalTaskClient
async def run_worker():
"""Läuft in eigenem Event-Loop."""
client = ExternalTaskClient('http://localhost:8000')
client.subscribe_to_external_task_for_topic('ProcessOrder', handle_order)
# Starten ohne internen Event-Loop
await client.start_async()
# Eigener Event-Loop
loop = asyncio.get_event_loop()
loop.run_until_complete(run_worker())Filter für Topics
Sie können Tasks nach Kriterien filtern:
from processcube_client.external_task import ExternalTaskClient
def handle_high_priority(payload, task):
"""Handler nur für High-Priority Tasks."""
return {'processed': True}
client = ExternalTaskClient('http://localhost:8000')
# Mit Filter subscriben
client.subscribe_to_external_task_for_topic(
'ProcessOrder',
handle_high_priority,
filter={'priority': 'high'}
)
client.start()Logging und Monitoring
import logging
from processcube_client.external_task import ExternalTaskClient
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def handle_task_with_logging(payload, task):
"""Handler mit umfangreichem Logging."""
logger.info(f"[{task['id']}] Start - Topic: {task['topic']}")
try:
result = process_task(payload)
logger.info(f"[{task['id']}] Erfolg")
return result
except Exception as e:
logger.error(f"[{task['id']}] Fehler: {str(e)}")
raise
client = ExternalTaskClient('http://localhost:8000')
client.subscribe_to_external_task_for_topic('ProcessOrder', handle_task_with_logging)
client.start()Graceful Shutdown
import signal
import sys
from processcube_client.external_task import ExternalTaskClient
client = ExternalTaskClient('http://localhost:8000')
def shutdown_handler(sig, frame):
"""Sauberes Herunterfahren."""
print('Stoppe Worker...')
client.stop()
sys.exit(0)
# Signal Handler registrieren
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
# Worker starten
client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment)
client.start()Praktische Beispiele
Email-Worker mit SMTP
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from processcube_client.external_task import ExternalTaskClient
def handle_email(payload, task):
"""Sendet E-Mails über SMTP."""
# SMTP-Verbindung
smtp_server = 'smtp.example.com'
smtp_port = 587
msg = MIMEMultipart()
msg['From'] = 'noreply@example.com'
msg['To'] = payload['recipient']
msg['Subject'] = payload['subject']
msg.attach(MIMEText(payload['body'], 'html'))
# E-Mail senden
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login('user@example.com', 'password')
server.send_message(msg)
return {
'sent': True,
'recipient': payload['recipient']
}
client = ExternalTaskClient('http://localhost:8000')
client.subscribe_to_external_task_for_topic('SendEmail', handle_email)
client.start()HTTP-Request-Worker
import requests
from processcube_client.external_task import ExternalTaskClient
def handle_http_request(payload, task):
"""Führt HTTP-Requests aus."""
method = payload.get('method', 'GET')
url = payload['url']
headers = payload.get('headers', {})
data = payload.get('data')
# Request ausführen
response = requests.request(
method,
url,
headers=headers,
json=data,
timeout=30
)
return {
'status': response.status_code,
'data': response.json() if response.ok else None,
'success': response.ok
}
client = ExternalTaskClient('http://localhost:8000')
client.subscribe_to_external_task_for_topic('HttpRequest', handle_http_request)
client.start()Datenbank-Worker
import psycopg2
from processcube_client.external_task import ExternalTaskClient
def handle_database_query(payload, task):
"""Führt Datenbank-Queries aus."""
# DB-Verbindung
conn = psycopg2.connect(
host='localhost',
database='mydb',
user='user',
password='password'
)
cursor = conn.cursor()
try:
# Query ausführen
cursor.execute(payload['query'], payload.get('params', []))
# Ergebnis abrufen
if cursor.description:
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [dict(zip(columns, row)) for row in rows]
else:
result = []
conn.commit()
return {
'success': True,
'rows': result,
'rowCount': len(result)
}
finally:
cursor.close()
conn.close()
client = ExternalTaskClient('http://localhost:8000')
client.subscribe_to_external_task_for_topic('DatabaseQuery', handle_database_query)
client.start()Best Practices
Idempotenz sicherstellen
def handle_idempotent_task(payload, task):
"""Idempotenter Task-Handler."""
task_id = task['id']
# Prüfen, ob bereits verarbeitet
if is_already_processed(task_id):
return {'skipped': True, 'reason': 'Already processed'}
# Verarbeiten
result = process_task(payload)
# Als verarbeitet markieren
mark_as_processed(task_id)
return resultTimeout-Handling
import time
from processcube_client.external_task import ExternalTaskClient
def handle_with_timeout(payload, task):
"""Handler mit Timeout-Check."""
start_time = time.time()
max_duration = 50 # Sekunden
result = process_task(payload)
# Prüfen, ob noch Zeit übrig ist
elapsed = time.time() - start_time
if elapsed > max_duration:
raise Exception(f"Task exceeded timeout: {elapsed}s")
return resultNächste Schritte
- Getting Started - Grundlagen
- REST API - Synchrone API
- User Tasks - User Task Pattern
Tipp: Für Production-Worker empfehlen wir Monitoring, Health-Checks und automatisches Retry-Handling.