Skip to Content
DocsClientsPythonExternal Tasks

External Tasks

Das External Task Pattern ermöglicht die Integration externer Systeme über ein Worker-Modell. Der Python Client bietet einen ExternalTaskClient mit integriertem Event-Loop.

Grundlegendes Beispiel

from processcube_client.external_task import ExternalTaskClient import logging logger = logging.getLogger(__name__) # Handler-Funktion def handle_payment(payload, task): """Verarbeitet Zahlungs-Tasks.""" logger.info(f"Verarbeite Zahlung: {payload}") # Business-Logik amount = payload.get('amount', 0) customer = payload.get('customer', 'Unknown') # Zahlung verarbeiten transaction_id = process_payment(customer, amount) # Ergebnis zurückgeben return { 'success': True, 'transactionId': transaction_id } # Worker erstellen client = ExternalTaskClient('http://localhost:8000') # Topic registrieren client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment) # Worker starten client.start()

Der ExternalTaskClient startet einen integrierten Event-Loop und pollt die Engine automatisch nach neuen Tasks.

Worker-Konfiguration

Mit Custom Config

from processcube_client.external_task import ExternalTaskClient, ExternalTaskConfig # Konfiguration erstellen config = ExternalTaskConfig( max_tasks=25, # Max. Tasks pro Poll lock_duration=60000, # Lock-Dauer in ms poll_interval=5000, # Poll-Intervall in ms worker_id='payment-worker' # Worker-ID ) # Client mit Config client = ExternalTaskClient('http://localhost:8000', config=config) client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment) client.start()

Mit Authentifizierung

# Client mit OAuth 2.0 client = ExternalTaskClient( 'http://localhost:8000', client_id='payment-worker', client_secret='secret-key', authority_url='http://localhost:11235' ) client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment) client.start()

Mehrere Worker

Sie können mehrere Topics gleichzeitig verarbeiten:

from processcube_client.external_task import ExternalTaskClient client = ExternalTaskClient('http://localhost:8000') # Email-Handler def handle_email(payload, task): recipient = payload['recipient'] subject = payload['subject'] send_email(recipient, subject, payload.get('body', '')) return {'sent': True} # SMS-Handler def handle_sms(payload, task): phone = payload['phone'] message = payload['message'] send_sms(phone, message) return {'sent': True} # Payment-Handler def handle_payment(payload, task): amount = payload['amount'] transaction_id = process_payment(amount) return {'transactionId': transaction_id} # Alle Topics registrieren client.subscribe_to_external_task_for_topic('SendEmail', handle_email) client.subscribe_to_external_task_for_topic('SendSMS', handle_sms) client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment) # Worker starten client.start()

Fehlerbehandlung

Standard Exception

def handle_external_task(payload, task): try: # Business-Logik result = process_data(payload) return {'success': True, 'result': result} except Exception as e: # Exception wird als technischer Fehler gemeldet raise Exception(f"Verarbeitung fehlgeschlagen: {str(e)}")

BPMN Fehler (FunctionalError)

from processcube_client.external_task import FunctionalError def handle_approval_task(payload, task): amount = payload.get('amount', 0) # Geschäftsregel prüfen if amount > 10000: # BPMN-Fehler werfen (für Error Boundary Events) raise FunctionalError( 'AMOUNT_TOO_HIGH', f'Betrag {amount} überschreitet Limit von 10000' ) # Normal verarbeiten return {'approved': True}

FunctionalError wird als BPMN-Fehler behandelt und kann von Error Boundary Events abgefangen werden.

Async Handler

Der External Task Client unterstützt auch asynchrone Handler:

import asyncio from processcube_client.external_task import ExternalTaskClient # Async Handler async def handle_async_task(payload, task): """Async Handler für I/O-intensive Operationen.""" # Async I/O-Operation result = await fetch_data_from_api(payload['url']) await asyncio.sleep(1) # Simulate async work return {'data': result} # Client erstellen client = ExternalTaskClient('http://localhost:8000') # Async Handler registrieren client.subscribe_to_external_task_for_topic('FetchData', handle_async_task) # Worker starten client.start()

Worker mit eigenem Event-Loop

Wenn Sie den Event-Loop selbst verwalten möchten:

import asyncio from processcube_client.external_task import ExternalTaskClient async def run_worker(): """Läuft in eigenem Event-Loop.""" client = ExternalTaskClient('http://localhost:8000') client.subscribe_to_external_task_for_topic('ProcessOrder', handle_order) # Starten ohne internen Event-Loop await client.start_async() # Eigener Event-Loop loop = asyncio.get_event_loop() loop.run_until_complete(run_worker())

Filter für Topics

Sie können Tasks nach Kriterien filtern:

from processcube_client.external_task import ExternalTaskClient def handle_high_priority(payload, task): """Handler nur für High-Priority Tasks.""" return {'processed': True} client = ExternalTaskClient('http://localhost:8000') # Mit Filter subscriben client.subscribe_to_external_task_for_topic( 'ProcessOrder', handle_high_priority, filter={'priority': 'high'} ) client.start()

Logging und Monitoring

import logging from processcube_client.external_task import ExternalTaskClient # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def handle_task_with_logging(payload, task): """Handler mit umfangreichem Logging.""" logger.info(f"[{task['id']}] Start - Topic: {task['topic']}") try: result = process_task(payload) logger.info(f"[{task['id']}] Erfolg") return result except Exception as e: logger.error(f"[{task['id']}] Fehler: {str(e)}") raise client = ExternalTaskClient('http://localhost:8000') client.subscribe_to_external_task_for_topic('ProcessOrder', handle_task_with_logging) client.start()

Graceful Shutdown

import signal import sys from processcube_client.external_task import ExternalTaskClient client = ExternalTaskClient('http://localhost:8000') def shutdown_handler(sig, frame): """Sauberes Herunterfahren.""" print('Stoppe Worker...') client.stop() sys.exit(0) # Signal Handler registrieren signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) # Worker starten client.subscribe_to_external_task_for_topic('ProcessPayment', handle_payment) client.start()

Praktische Beispiele

Email-Worker mit SMTP

import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from processcube_client.external_task import ExternalTaskClient def handle_email(payload, task): """Sendet E-Mails über SMTP.""" # SMTP-Verbindung smtp_server = 'smtp.example.com' smtp_port = 587 msg = MIMEMultipart() msg['From'] = 'noreply@example.com' msg['To'] = payload['recipient'] msg['Subject'] = payload['subject'] msg.attach(MIMEText(payload['body'], 'html')) # E-Mail senden with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() server.login('user@example.com', 'password') server.send_message(msg) return { 'sent': True, 'recipient': payload['recipient'] } client = ExternalTaskClient('http://localhost:8000') client.subscribe_to_external_task_for_topic('SendEmail', handle_email) client.start()

HTTP-Request-Worker

import requests from processcube_client.external_task import ExternalTaskClient def handle_http_request(payload, task): """Führt HTTP-Requests aus.""" method = payload.get('method', 'GET') url = payload['url'] headers = payload.get('headers', {}) data = payload.get('data') # Request ausführen response = requests.request( method, url, headers=headers, json=data, timeout=30 ) return { 'status': response.status_code, 'data': response.json() if response.ok else None, 'success': response.ok } client = ExternalTaskClient('http://localhost:8000') client.subscribe_to_external_task_for_topic('HttpRequest', handle_http_request) client.start()

Datenbank-Worker

import psycopg2 from processcube_client.external_task import ExternalTaskClient def handle_database_query(payload, task): """Führt Datenbank-Queries aus.""" # DB-Verbindung conn = psycopg2.connect( host='localhost', database='mydb', user='user', password='password' ) cursor = conn.cursor() try: # Query ausführen cursor.execute(payload['query'], payload.get('params', [])) # Ergebnis abrufen if cursor.description: columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [dict(zip(columns, row)) for row in rows] else: result = [] conn.commit() return { 'success': True, 'rows': result, 'rowCount': len(result) } finally: cursor.close() conn.close() client = ExternalTaskClient('http://localhost:8000') client.subscribe_to_external_task_for_topic('DatabaseQuery', handle_database_query) client.start()

Best Practices

Idempotenz sicherstellen

def handle_idempotent_task(payload, task): """Idempotenter Task-Handler.""" task_id = task['id'] # Prüfen, ob bereits verarbeitet if is_already_processed(task_id): return {'skipped': True, 'reason': 'Already processed'} # Verarbeiten result = process_task(payload) # Als verarbeitet markieren mark_as_processed(task_id) return result

Timeout-Handling

import time from processcube_client.external_task import ExternalTaskClient def handle_with_timeout(payload, task): """Handler mit Timeout-Check.""" start_time = time.time() max_duration = 50 # Sekunden result = process_task(payload) # Prüfen, ob noch Zeit übrig ist elapsed = time.time() - start_time if elapsed > max_duration: raise Exception(f"Task exceeded timeout: {elapsed}s") return result

Nächste Schritte

Tipp: Für Production-Worker empfehlen wir Monitoring, Health-Checks und automatisches Retry-Handling.