Skip to Content

User Tasks

Das User Task Pattern ermöglicht die Verarbeitung von Benutzer-Interaktionen in BPMN-Prozessen. Der Python Client bietet einen asynchronen UserTaskClient für effiziente I/O-Operationen.

Grundlegendes Beispiel

import asyncio from processcube_client.user_task import UserTaskClient async def process_user_tasks(engine_url: str): """Verarbeitet User Tasks asynchron.""" async with UserTaskClient(engine_url) as client: # User Tasks abrufen user_tasks = await client.get_user_tasks() for task in user_tasks: print(f"User Task: {task['userTaskInstanceId']}") print(f"Task-Name: {task['flowNodeId']}") print(f"Token: {task['token']}") # Task abschließen result = await client.finish_user_task( task['userTaskInstanceId'], {'approved': True, 'comment': 'Genehmigt'} ) print(f"Task abgeschlossen: {result}") # Event Loop starten asyncio.run(process_user_tasks('http://localhost:8000'))

Der UserTaskClient ist vollständig asynchron und nutzt async/await für nicht-blockierende I/O-Operationen.

User Tasks abrufen

Alle User Tasks

import asyncio from processcube_client.user_task import UserTaskClient async def get_all_user_tasks(): async with UserTaskClient('http://localhost:8000') as client: # Alle verfügbaren User Tasks user_tasks = await client.get_user_tasks() if not user_tasks: print("Keine User Tasks verfügbar") return for task in user_tasks: print(f"Task-ID: {task['userTaskInstanceId']}") print(f"FlowNode: {task['flowNodeId']}") print(f"Prozess: {task['processInstanceId']}") print(f"Token: {task['token']}") print("---") asyncio.run(get_all_user_tasks())

Mit Filter

async def get_filtered_tasks(): async with UserTaskClient('http://localhost:8000') as client: # Alle Tasks abrufen all_tasks = await client.get_user_tasks() # Nach Prozess filtern approval_tasks = [ task for task in all_tasks if task['processModelId'] == 'ApprovalProcess' ] for task in approval_tasks: print(f"Approval Task: {task['userTaskInstanceId']}") asyncio.run(get_filtered_tasks())

User Task reservieren

import asyncio from processcube_client.user_task import UserTaskClient async def reserve_user_task(): async with UserTaskClient('http://localhost:8000') as client: # Tasks abrufen user_tasks = await client.get_user_tasks() if not user_tasks: print("Keine User Tasks verfügbar") return # Ersten Task reservieren task = user_tasks[0] owner_id = 'user@example.com' result = await client.reserve_user_task( task['userTaskInstanceId'], owner_id ) print(f"Task reserviert für {owner_id}") print(f"Result: {result}") asyncio.run(reserve_user_task())

Reservierte Tasks können nur vom gleichen Benutzer abgeschlossen oder die Reservierung muss aufgehoben werden.

Reservierung aufheben

async def cancel_reservation(): async with UserTaskClient('http://localhost:8000') as client: # Tasks abrufen user_tasks = await client.get_user_tasks() # Reservierung aufheben for task in user_tasks: if task.get('reservedFor'): await client.cancel_reservation_user_task( task['userTaskInstanceId'] ) print(f"Reservierung aufgehoben: {task['userTaskInstanceId']}") asyncio.run(cancel_reservation())

User Task abschließen

Einfaches Beispiel

async def finish_user_task(): async with UserTaskClient('http://localhost:8000') as client: # Tasks abrufen user_tasks = await client.get_user_tasks() if not user_tasks: return task = user_tasks[0] # Task-Ergebnis result = { 'approved': True, 'approver': 'manager@example.com', 'comment': 'Genehmigt am 2024-01-15', 'amount': 1500 } # Task abschließen await client.finish_user_task( task['userTaskInstanceId'], result ) print(f"Task abgeschlossen: {task['userTaskInstanceId']}") asyncio.run(finish_user_task())

Mit Validierung

async def finish_with_validation(): async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() for task in user_tasks: # Token-Daten prüfen token = task.get('token', {}) # Validierung if 'amount' in token and token['amount'] > 10000: result = { 'approved': False, 'reason': 'Betrag zu hoch' } else: result = { 'approved': True } # Task abschließen await client.finish_user_task( task['userTaskInstanceId'], result ) print(f"Task verarbeitet: {task['flowNodeId']}") asyncio.run(finish_with_validation())

Mehrere Tasks parallel verarbeiten

import asyncio from processcube_client.user_task import UserTaskClient async def process_task(client: UserTaskClient, task: dict): """Verarbeitet einen einzelnen Task.""" print(f"Verarbeite Task: {task['userTaskInstanceId']}") # Business-Logik result = { 'processed': True, 'timestamp': '2024-01-15T10:30:00Z' } # Task abschließen await client.finish_user_task( task['userTaskInstanceId'], result ) print(f"Task abgeschlossen: {task['userTaskInstanceId']}") async def process_all_tasks(): async with UserTaskClient('http://localhost:8000') as client: # Alle Tasks abrufen user_tasks = await client.get_user_tasks() # Parallel verarbeiten tasks = [ process_task(client, task) for task in user_tasks ] # Alle Tasks awaiten await asyncio.gather(*tasks) print(f"{len(user_tasks)} Tasks verarbeitet") asyncio.run(process_all_tasks())

Mit Authentifizierung

import asyncio from processcube_client.user_task import UserTaskClient async def process_with_auth(): # Client mit OAuth 2.0 async with UserTaskClient( 'http://localhost:8000', client_id='user-task-worker', client_secret='secret-key', authority_url='http://localhost:11235' ) as client: user_tasks = await client.get_user_tasks() for task in user_tasks: await client.finish_user_task( task['userTaskInstanceId'], {'approved': True} ) asyncio.run(process_with_auth())

Event Loop Management

Mit bestehendem Event Loop

import asyncio from processcube_client.user_task import UserTaskClient from processcube_client.core.loop_helper import get_or_create_loop async def main(): async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() for task in user_tasks: print(f"Task: {task['userTaskInstanceId']}") # Event Loop holen oder erstellen loop = get_or_create_loop() loop.run_until_complete(main()) loop.close()

In bestehendem Async-Context

from fastapi import FastAPI from processcube_client.user_task import UserTaskClient app = FastAPI() @app.post("/process-tasks") async def process_tasks(): """FastAPI Endpoint - läuft bereits in Event Loop.""" async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() results = [] for task in user_tasks: await client.finish_user_task( task['userTaskInstanceId'], {'approved': True} ) results.append(task['userTaskInstanceId']) return { 'processed': len(results), 'tasks': results }

Fehlerbehandlung

import asyncio from processcube_client.user_task import UserTaskClient from processcube_client.exceptions import ProcessCubeException async def safe_process_tasks(): try: async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() for task in user_tasks: try: await client.finish_user_task( task['userTaskInstanceId'], {'approved': True} ) except ProcessCubeException as e: print(f"Fehler bei Task {task['userTaskInstanceId']}: {e}") continue except Exception as e: print(f"Allgemeiner Fehler: {e}") asyncio.run(safe_process_tasks())

Praktische Beispiele

Approval-Workflow

import asyncio from processcube_client.user_task import UserTaskClient async def approval_workflow(): """Automatisierter Approval-Workflow.""" async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() for task in user_tasks: token = task.get('token', {}) # Geschäftsregeln anwenden amount = token.get('amount', 0) customer_type = token.get('customerType', 'regular') # Auto-Approve für kleine Beträge if amount < 1000: result = { 'approved': True, 'autoApproved': True, 'reason': 'Betrag unter Schwellwert' } # VIP-Kunden bevorzugt behandeln elif customer_type == 'vip': result = { 'approved': True, 'priority': 'high', 'reason': 'VIP-Kunde' } # Manuelle Review erforderlich else: result = { 'approved': False, 'requiresReview': True, 'reason': 'Manuelle Prüfung erforderlich' } await client.finish_user_task( task['userTaskInstanceId'], result ) print(f"Task {task['flowNodeId']} verarbeitet: {result['approved']}") asyncio.run(approval_workflow())

Mit Datenbank-Integration

import asyncio import asyncpg from processcube_client.user_task import UserTaskClient async def process_with_database(): """User Tasks mit Datenbank-Integration.""" # DB-Pool erstellen pool = await asyncpg.create_pool( 'postgresql://user:password@localhost/mydb' ) async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() for task in user_tasks: token = task.get('token', {}) # Daten aus DB laden async with pool.acquire() as conn: customer = await conn.fetchrow( 'SELECT * FROM customers WHERE id = $1', token.get('customerId') ) # Business-Logik result = { 'approved': customer['credit_limit'] > token.get('amount', 0), 'customerName': customer['name'] } # Task abschließen await client.finish_user_task( task['userTaskInstanceId'], result ) await pool.close() asyncio.run(process_with_database())

Best Practices

Context Manager verwenden

# ✅ Empfohlen async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() # ❌ Nicht empfohlen (manuelle Ressourcen-Verwaltung) client = UserTaskClient('http://localhost:8000') user_tasks = await client.get_user_tasks() await client.close()

Logging hinzufügen

import logging import asyncio from processcube_client.user_task import UserTaskClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def process_with_logging(): async with UserTaskClient('http://localhost:8000') as client: user_tasks = await client.get_user_tasks() logger.info(f"Gefundene Tasks: {len(user_tasks)}") for task in user_tasks: logger.debug(f"Verarbeite Task: {task['userTaskInstanceId']}") await client.finish_user_task( task['userTaskInstanceId'], {'approved': True} ) logger.info(f"Task abgeschlossen: {task['flowNodeId']}") asyncio.run(process_with_logging())

Retry-Logic implementieren

import asyncio from processcube_client.user_task import UserTaskClient async def finish_with_retry(client, task_id, result, max_retries=3): """Finish mit automatischem Retry.""" for attempt in range(max_retries): try: await client.finish_user_task(task_id, result) return True except Exception as e: if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff continue raise return False

Nächste Schritte

Tipp: Der UserTaskClient ist ideal für asynchrone Verarbeitung und kann problemlos in FastAPI, aiohttp oder anderen async Frameworks verwendet werden.