Kompletter Video-Konverter mit Web-UI, GPU-Beschleunigung (Intel VAAPI), Video-Bibliothek mit Serien/Film-Erkennung und TVDB-Integration. Features: - AV1/HEVC/H.264 Encoding (GPU + CPU) - Video-Bibliothek mit ffprobe-Analyse und Filtern - TVDB-Integration mit Review-Modal und Sprachkonfiguration - Film-Scanning und TVDB-Zuordnung - Import- und Clean-Service (Grundgeruest) - WebSocket Live-Updates, Queue-Management - Docker mit GPU/CPU-Profilen Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
121 lines
4.2 KiB
Python
121 lines
4.2 KiB
Python
"""WebSocket Handler fuer Echtzeit-Updates"""
|
|
import json
|
|
import logging
|
|
from typing import Optional, Set, TYPE_CHECKING
|
|
from aiohttp import web
|
|
|
|
if TYPE_CHECKING:
|
|
from app.services.queue import QueueService
|
|
|
|
|
|
class WebSocketManager:
|
|
"""Verwaltet WebSocket-Verbindungen und Broadcasts"""
|
|
|
|
def __init__(self):
|
|
self.clients: Set[web.WebSocketResponse] = set()
|
|
self.queue_service: Optional['QueueService'] = None
|
|
|
|
def set_queue_service(self, queue_service: 'QueueService') -> None:
|
|
"""Setzt Referenz auf QueueService"""
|
|
self.queue_service = queue_service
|
|
|
|
async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
|
|
"""WebSocket-Endpoint Handler"""
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
self.clients.add(ws)
|
|
|
|
client_ip = request.remote
|
|
logging.info(f"WebSocket Client verbunden: {client_ip} "
|
|
f"({len(self.clients)} aktiv)")
|
|
|
|
# Initialen Status senden
|
|
if self.queue_service:
|
|
try:
|
|
await ws.send_json(self.queue_service.get_active_jobs())
|
|
await ws.send_json(self.queue_service.get_queue_state())
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
async for msg in ws:
|
|
if msg.type == web.WSMsgType.TEXT:
|
|
try:
|
|
data = json.loads(msg.data)
|
|
await self._handle_message(data)
|
|
except json.JSONDecodeError:
|
|
logging.warning(f"Ungueltige JSON-Nachricht: {msg.data[:100]}")
|
|
elif msg.type == web.WSMsgType.ERROR:
|
|
logging.error(f"WebSocket Fehler: {ws.exception()}")
|
|
except Exception as e:
|
|
logging.error(f"WebSocket Handler Fehler: {e}")
|
|
finally:
|
|
self.clients.discard(ws)
|
|
logging.info(f"WebSocket Client getrennt: {client_ip} "
|
|
f"({len(self.clients)} aktiv)")
|
|
|
|
return ws
|
|
|
|
async def broadcast(self, message: dict) -> None:
|
|
"""Sendet Nachricht an alle verbundenen Clients"""
|
|
if not self.clients:
|
|
return
|
|
|
|
msg_json = json.dumps(message)
|
|
dead_clients = set()
|
|
|
|
for client in self.clients.copy():
|
|
try:
|
|
await client.send_str(msg_json)
|
|
except Exception:
|
|
dead_clients.add(client)
|
|
|
|
self.clients -= dead_clients
|
|
|
|
async def broadcast_queue_update(self) -> None:
|
|
"""Sendet aktuelle Queue an alle Clients"""
|
|
if not self.queue_service:
|
|
return
|
|
await self.broadcast(self.queue_service.get_active_jobs())
|
|
await self.broadcast(self.queue_service.get_queue_state())
|
|
|
|
async def broadcast_progress(self, job) -> None:
|
|
"""Sendet Fortschritts-Update fuer einen Job"""
|
|
await self.broadcast({"data_flow": job.to_dict_progress()})
|
|
|
|
async def _handle_message(self, data: dict) -> None:
|
|
"""Verarbeitet eingehende WebSocket-Nachrichten"""
|
|
if not self.queue_service:
|
|
return
|
|
|
|
if "data_path" in data:
|
|
# Pfade empfangen (Legacy-Kompatibilitaet + neues Format)
|
|
paths = data["data_path"]
|
|
if isinstance(paths, str):
|
|
# Einzelner Pfad als String
|
|
paths = [paths]
|
|
elif isinstance(paths, dict) and "paths" in paths:
|
|
# Altes Format: {"paths": [...]}
|
|
paths = paths["paths"]
|
|
elif not isinstance(paths, list):
|
|
paths = [str(paths)]
|
|
|
|
await self.queue_service.add_paths(paths)
|
|
|
|
elif "data_command" in data:
|
|
cmd = data["data_command"]
|
|
cmd_type = cmd.get("cmd", "")
|
|
job_id = cmd.get("id")
|
|
|
|
if not job_id:
|
|
return
|
|
|
|
if cmd_type == "delete":
|
|
await self.queue_service.remove_job(int(job_id))
|
|
elif cmd_type == "cancel":
|
|
await self.queue_service.cancel_job(int(job_id))
|
|
elif cmd_type == "retry":
|
|
await self.queue_service.retry_job(int(job_id))
|
|
|
|
elif "data_message" in data:
|
|
logging.info(f"Client-Nachricht: {data['data_message']}")
|