User Tasks
Das User Task Pattern ermöglicht die Verarbeitung von Benutzer-Interaktionen in BPMN-Prozessen. Der Python Client bietet einen asynchronen UserTaskClient für effiziente I/O-Operationen.
Grundlegendes Beispiel
import asyncio
from processcube_client.user_task import UserTaskClient
async def process_user_tasks(engine_url: str):
"""Verarbeitet User Tasks asynchron."""
async with UserTaskClient(engine_url) as client:
# User Tasks abrufen
user_tasks = await client.get_user_tasks()
for task in user_tasks:
print(f"User Task: {task['userTaskInstanceId']}")
print(f"Task-Name: {task['flowNodeId']}")
print(f"Token: {task['token']}")
# Task abschließen
result = await client.finish_user_task(
task['userTaskInstanceId'],
{'approved': True, 'comment': 'Genehmigt'}
)
print(f"Task abgeschlossen: {result}")
# Event Loop starten
asyncio.run(process_user_tasks('http://localhost:8000'))Der UserTaskClient ist vollständig asynchron und nutzt async/await für nicht-blockierende I/O-Operationen.
User Tasks abrufen
Alle User Tasks
import asyncio
from processcube_client.user_task import UserTaskClient
async def get_all_user_tasks():
async with UserTaskClient('http://localhost:8000') as client:
# Alle verfügbaren User Tasks
user_tasks = await client.get_user_tasks()
if not user_tasks:
print("Keine User Tasks verfügbar")
return
for task in user_tasks:
print(f"Task-ID: {task['userTaskInstanceId']}")
print(f"FlowNode: {task['flowNodeId']}")
print(f"Prozess: {task['processInstanceId']}")
print(f"Token: {task['token']}")
print("---")
asyncio.run(get_all_user_tasks())Mit Filter
async def get_filtered_tasks():
async with UserTaskClient('http://localhost:8000') as client:
# Alle Tasks abrufen
all_tasks = await client.get_user_tasks()
# Nach Prozess filtern
approval_tasks = [
task for task in all_tasks
if task['processModelId'] == 'ApprovalProcess'
]
for task in approval_tasks:
print(f"Approval Task: {task['userTaskInstanceId']}")
asyncio.run(get_filtered_tasks())User Task reservieren
import asyncio
from processcube_client.user_task import UserTaskClient
async def reserve_user_task():
async with UserTaskClient('http://localhost:8000') as client:
# Tasks abrufen
user_tasks = await client.get_user_tasks()
if not user_tasks:
print("Keine User Tasks verfügbar")
return
# Ersten Task reservieren
task = user_tasks[0]
owner_id = 'user@example.com'
result = await client.reserve_user_task(
task['userTaskInstanceId'],
owner_id
)
print(f"Task reserviert für {owner_id}")
print(f"Result: {result}")
asyncio.run(reserve_user_task())Reservierte Tasks können nur vom gleichen Benutzer abgeschlossen oder die Reservierung muss aufgehoben werden.
Reservierung aufheben
async def cancel_reservation():
async with UserTaskClient('http://localhost:8000') as client:
# Tasks abrufen
user_tasks = await client.get_user_tasks()
# Reservierung aufheben
for task in user_tasks:
if task.get('reservedFor'):
await client.cancel_reservation_user_task(
task['userTaskInstanceId']
)
print(f"Reservierung aufgehoben: {task['userTaskInstanceId']}")
asyncio.run(cancel_reservation())User Task abschließen
Einfaches Beispiel
async def finish_user_task():
async with UserTaskClient('http://localhost:8000') as client:
# Tasks abrufen
user_tasks = await client.get_user_tasks()
if not user_tasks:
return
task = user_tasks[0]
# Task-Ergebnis
result = {
'approved': True,
'approver': 'manager@example.com',
'comment': 'Genehmigt am 2024-01-15',
'amount': 1500
}
# Task abschließen
await client.finish_user_task(
task['userTaskInstanceId'],
result
)
print(f"Task abgeschlossen: {task['userTaskInstanceId']}")
asyncio.run(finish_user_task())Mit Validierung
async def finish_with_validation():
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
# Token-Daten prüfen
token = task.get('token', {})
# Validierung
if 'amount' in token and token['amount'] > 10000:
result = {
'approved': False,
'reason': 'Betrag zu hoch'
}
else:
result = {
'approved': True
}
# Task abschließen
await client.finish_user_task(
task['userTaskInstanceId'],
result
)
print(f"Task verarbeitet: {task['flowNodeId']}")
asyncio.run(finish_with_validation())Mehrere Tasks parallel verarbeiten
import asyncio
from processcube_client.user_task import UserTaskClient
async def process_task(client: UserTaskClient, task: dict):
"""Verarbeitet einen einzelnen Task."""
print(f"Verarbeite Task: {task['userTaskInstanceId']}")
# Business-Logik
result = {
'processed': True,
'timestamp': '2024-01-15T10:30:00Z'
}
# Task abschließen
await client.finish_user_task(
task['userTaskInstanceId'],
result
)
print(f"Task abgeschlossen: {task['userTaskInstanceId']}")
async def process_all_tasks():
async with UserTaskClient('http://localhost:8000') as client:
# Alle Tasks abrufen
user_tasks = await client.get_user_tasks()
# Parallel verarbeiten
tasks = [
process_task(client, task)
for task in user_tasks
]
# Alle Tasks awaiten
await asyncio.gather(*tasks)
print(f"{len(user_tasks)} Tasks verarbeitet")
asyncio.run(process_all_tasks())Mit Authentifizierung
import asyncio
from processcube_client.user_task import UserTaskClient
async def process_with_auth():
# Client mit OAuth 2.0
async with UserTaskClient(
'http://localhost:8000',
client_id='user-task-worker',
client_secret='secret-key',
authority_url='http://localhost:11235'
) as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
await client.finish_user_task(
task['userTaskInstanceId'],
{'approved': True}
)
asyncio.run(process_with_auth())Event Loop Management
Mit bestehendem Event Loop
import asyncio
from processcube_client.user_task import UserTaskClient
from processcube_client.core.loop_helper import get_or_create_loop
async def main():
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
print(f"Task: {task['userTaskInstanceId']}")
# Event Loop holen oder erstellen
loop = get_or_create_loop()
loop.run_until_complete(main())
loop.close()In bestehendem Async-Context
from fastapi import FastAPI
from processcube_client.user_task import UserTaskClient
app = FastAPI()
@app.post("/process-tasks")
async def process_tasks():
"""FastAPI Endpoint - läuft bereits in Event Loop."""
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
results = []
for task in user_tasks:
await client.finish_user_task(
task['userTaskInstanceId'],
{'approved': True}
)
results.append(task['userTaskInstanceId'])
return {
'processed': len(results),
'tasks': results
}Fehlerbehandlung
import asyncio
from processcube_client.user_task import UserTaskClient
from processcube_client.exceptions import ProcessCubeException
async def safe_process_tasks():
try:
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
try:
await client.finish_user_task(
task['userTaskInstanceId'],
{'approved': True}
)
except ProcessCubeException as e:
print(f"Fehler bei Task {task['userTaskInstanceId']}: {e}")
continue
except Exception as e:
print(f"Allgemeiner Fehler: {e}")
asyncio.run(safe_process_tasks())Praktische Beispiele
Approval-Workflow
import asyncio
from processcube_client.user_task import UserTaskClient
async def approval_workflow():
"""Automatisierter Approval-Workflow."""
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
token = task.get('token', {})
# Geschäftsregeln anwenden
amount = token.get('amount', 0)
customer_type = token.get('customerType', 'regular')
# Auto-Approve für kleine Beträge
if amount < 1000:
result = {
'approved': True,
'autoApproved': True,
'reason': 'Betrag unter Schwellwert'
}
# VIP-Kunden bevorzugt behandeln
elif customer_type == 'vip':
result = {
'approved': True,
'priority': 'high',
'reason': 'VIP-Kunde'
}
# Manuelle Review erforderlich
else:
result = {
'approved': False,
'requiresReview': True,
'reason': 'Manuelle Prüfung erforderlich'
}
await client.finish_user_task(
task['userTaskInstanceId'],
result
)
print(f"Task {task['flowNodeId']} verarbeitet: {result['approved']}")
asyncio.run(approval_workflow())Mit Datenbank-Integration
import asyncio
import asyncpg
from processcube_client.user_task import UserTaskClient
async def process_with_database():
"""User Tasks mit Datenbank-Integration."""
# DB-Pool erstellen
pool = await asyncpg.create_pool(
'postgresql://user:password@localhost/mydb'
)
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
for task in user_tasks:
token = task.get('token', {})
# Daten aus DB laden
async with pool.acquire() as conn:
customer = await conn.fetchrow(
'SELECT * FROM customers WHERE id = $1',
token.get('customerId')
)
# Business-Logik
result = {
'approved': customer['credit_limit'] > token.get('amount', 0),
'customerName': customer['name']
}
# Task abschließen
await client.finish_user_task(
task['userTaskInstanceId'],
result
)
await pool.close()
asyncio.run(process_with_database())Best Practices
Context Manager verwenden
# ✅ Empfohlen
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
# ❌ Nicht empfohlen (manuelle Ressourcen-Verwaltung)
client = UserTaskClient('http://localhost:8000')
user_tasks = await client.get_user_tasks()
await client.close()Logging hinzufügen
import logging
import asyncio
from processcube_client.user_task import UserTaskClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def process_with_logging():
async with UserTaskClient('http://localhost:8000') as client:
user_tasks = await client.get_user_tasks()
logger.info(f"Gefundene Tasks: {len(user_tasks)}")
for task in user_tasks:
logger.debug(f"Verarbeite Task: {task['userTaskInstanceId']}")
await client.finish_user_task(
task['userTaskInstanceId'],
{'approved': True}
)
logger.info(f"Task abgeschlossen: {task['flowNodeId']}")
asyncio.run(process_with_logging())Retry-Logic implementieren
import asyncio
from processcube_client.user_task import UserTaskClient
async def finish_with_retry(client, task_id, result, max_retries=3):
"""Finish mit automatischem Retry."""
for attempt in range(max_retries):
try:
await client.finish_user_task(task_id, result)
return True
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise
return FalseNächste Schritte
- Getting Started - Grundlagen
- REST API - Synchrone API
- External Tasks - Worker-Pattern
Tipp: Der UserTaskClient ist ideal für asynchrone Verarbeitung und kann problemlos in FastAPI, aiohttp oder anderen async Frameworks verwendet werden.