"""WebSocket Handler fuer Echtzeit-Updates""" import json import logging from typing import Optional, Set, TYPE_CHECKING from aiohttp import web if TYPE_CHECKING: from app.services.queue import QueueService class WebSocketManager: """Verwaltet WebSocket-Verbindungen und Broadcasts""" def __init__(self): self.clients: Set[web.WebSocketResponse] = set() self.queue_service: Optional['QueueService'] = None def set_queue_service(self, queue_service: 'QueueService') -> None: """Setzt Referenz auf QueueService""" self.queue_service = queue_service async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse: """WebSocket-Endpoint Handler""" ws = web.WebSocketResponse() await ws.prepare(request) self.clients.add(ws) client_ip = request.remote logging.info(f"WebSocket Client verbunden: {client_ip} " f"({len(self.clients)} aktiv)") # Initialen Status senden if self.queue_service: try: await ws.send_json(self.queue_service.get_active_jobs()) await ws.send_json(self.queue_service.get_queue_state()) except Exception: pass try: async for msg in ws: if msg.type == web.WSMsgType.TEXT: try: data = json.loads(msg.data) await self._handle_message(data) except json.JSONDecodeError: logging.warning(f"Ungueltige JSON-Nachricht: {msg.data[:100]}") elif msg.type == web.WSMsgType.ERROR: logging.error(f"WebSocket Fehler: {ws.exception()}") except Exception as e: logging.error(f"WebSocket Handler Fehler: {e}") finally: self.clients.discard(ws) logging.info(f"WebSocket Client getrennt: {client_ip} " f"({len(self.clients)} aktiv)") return ws async def broadcast(self, message: dict) -> None: """Sendet Nachricht an alle verbundenen Clients""" if not self.clients: return msg_json = json.dumps(message) dead_clients = set() for client in self.clients.copy(): try: await client.send_str(msg_json) except Exception: dead_clients.add(client) self.clients -= dead_clients async def broadcast_queue_update(self) -> None: """Sendet aktuelle Queue an alle Clients""" if not self.queue_service: return await self.broadcast(self.queue_service.get_active_jobs()) await self.broadcast(self.queue_service.get_queue_state()) async def broadcast_progress(self, job) -> None: """Sendet Fortschritts-Update fuer einen Job""" await self.broadcast({"data_flow": job.to_dict_progress()}) async def broadcast_log(self, level: str, message: str) -> None: """Sendet Log-Nachricht an alle Clients""" await self.broadcast({ "data_log": {"level": level, "message": message} }) async def _handle_message(self, data: dict) -> None: """Verarbeitet eingehende WebSocket-Nachrichten""" if not self.queue_service: return if "data_path" in data: # Pfade empfangen (Legacy-Kompatibilitaet + neues Format) paths = data["data_path"] if isinstance(paths, str): # Einzelner Pfad als String paths = [paths] elif isinstance(paths, dict) and "paths" in paths: # Altes Format: {"paths": [...]} paths = paths["paths"] elif not isinstance(paths, list): paths = [str(paths)] await self.queue_service.add_paths(paths) elif "data_command" in data: cmd = data["data_command"] cmd_type = cmd.get("cmd", "") job_id = cmd.get("id") if not job_id: return if cmd_type == "delete": await self.queue_service.remove_job(int(job_id)) elif cmd_type == "cancel": await self.queue_service.cancel_job(int(job_id)) elif cmd_type == "retry": await self.queue_service.retry_job(int(job_id)) elif "data_message" in data: logging.info(f"Client-Nachricht: {data['data_message']}")