"""REST API Endpoints""" import asyncio import logging import os from pathlib import Path from aiohttp import web from app.config import Config from app.services.queue import QueueService from app.services.scanner import ScannerService from app.services.encoder import EncoderService from app.routes.ws import WebSocketManager def setup_api_routes(app: web.Application, config: Config, queue_service: QueueService, scanner: ScannerService, ws_manager: WebSocketManager = None) -> None: """Registriert alle API-Routes""" # --- Job-Management --- async def post_convert(request: web.Request) -> web.Response: """ POST /api/convert Body: {"files": ["/pfad/datei.mkv", "/pfad/ordner/"]} Optional: {"files": [...], "preset": "gpu_av1", "recursive": true} Hauptendpoint fuer KDE Dolphin Integration. """ try: data = await request.json() except Exception: return web.json_response( {"error": "Ungueltiges JSON"}, status=400 ) files = data.get("files", []) if not files: return web.json_response( {"error": "Keine Dateien angegeben"}, status=400 ) preset = data.get("preset") recursive = data.get("recursive") logging.info(f"POST /api/convert: {len(files)} Pfade empfangen") jobs = await queue_service.add_paths(files, preset, recursive) return web.json_response({ "message": f"{len(jobs)} Jobs erstellt", "jobs": [{"id": j.id, "file": j.media.source_filename} for j in jobs], }) async def get_jobs(request: web.Request) -> web.Response: """GET /api/jobs - Alle Jobs mit Status""" return web.json_response({"jobs": queue_service.get_all_jobs()}) async def delete_job(request: web.Request) -> web.Response: """DELETE /api/jobs/{job_id}""" job_id = int(request.match_info["job_id"]) success = await queue_service.remove_job(job_id) if success: return web.json_response({"message": "Job geloescht"}) return web.json_response({"error": "Job nicht gefunden"}, status=404) async def post_cancel(request: web.Request) -> web.Response: """POST /api/jobs/{job_id}/cancel""" job_id = int(request.match_info["job_id"]) success = await queue_service.cancel_job(job_id) if success: return web.json_response({"message": "Job abgebrochen"}) return web.json_response({"error": "Job nicht aktiv"}, status=400) async def post_retry(request: web.Request) -> web.Response: """POST /api/jobs/{job_id}/retry""" job_id = int(request.match_info["job_id"]) success = await queue_service.retry_job(job_id) if success: return web.json_response({"message": "Job wiederholt"}) return web.json_response({"error": "Job nicht fehlgeschlagen"}, status=400) # --- Queue Pause/Resume --- async def post_queue_pause(request: web.Request) -> web.Response: """POST /api/queue/pause - Queue pausieren""" success = await queue_service.pause_queue() if success: return web.json_response({"message": "Queue pausiert", "paused": True}) return web.json_response({"message": "Queue bereits pausiert", "paused": True}) async def post_queue_resume(request: web.Request) -> web.Response: """POST /api/queue/resume - Queue fortsetzen""" success = await queue_service.resume_queue() if success: return web.json_response({"message": "Queue fortgesetzt", "paused": False}) return web.json_response({"message": "Queue laeuft bereits", "paused": False}) async def get_queue_status(request: web.Request) -> web.Response: """GET /api/queue/status - Pause-Status abfragen""" return web.json_response({"paused": queue_service.is_paused}) # --- Settings --- async def get_settings(request: web.Request) -> web.Response: """GET /api/settings""" return web.json_response(config.settings) async def put_settings(request: web.Request) -> web.Response: """PUT /api/settings - Settings aktualisieren""" try: new_settings = await request.json() except Exception: return web.json_response( {"error": "Ungueltiges JSON"}, status=400 ) # Settings zusammenfuehren (deep merge) _deep_merge(config.settings, new_settings) config.save_settings() logging.info("Settings aktualisiert via API") return web.json_response({"message": "Settings gespeichert"}) # --- Presets --- async def get_presets(request: web.Request) -> web.Response: """GET /api/presets""" return web.json_response(config.presets) async def put_preset(request: web.Request) -> web.Response: """PUT /api/presets/{preset_name}""" preset_name = request.match_info["preset_name"] try: preset_data = await request.json() except Exception: return web.json_response( {"error": "Ungueltiges JSON"}, status=400 ) config.presets[preset_name] = preset_data config.save_presets() logging.info(f"Preset '{preset_name}' aktualisiert") return web.json_response({"message": f"Preset '{preset_name}' gespeichert"}) async def post_preset(request: web.Request) -> web.Response: """POST /api/presets - Neues Preset erstellen""" try: data = await request.json() except Exception: return web.json_response( {"error": "Ungueltiges JSON"}, status=400 ) key = data.get("key", "").strip() if not key: return web.json_response( {"error": "Preset-Key fehlt"}, status=400 ) if key in config.presets: return web.json_response( {"error": f"Preset '{key}' existiert bereits"}, status=409 ) import re if not re.match(r'^[a-z][a-z0-9_]*$', key): return web.json_response( {"error": "Key: nur Kleinbuchstaben, Zahlen, Unterstriche"}, status=400 ) preset = data.get("preset", { "name": key, "video_codec": "libx264", "container": "mp4", "quality_param": "crf", "quality_value": 23, "gop_size": 240, "video_filter": "", "hw_init": False, "extra_params": {} }) config.presets[key] = preset config.save_presets() logging.info(f"Neues Preset '{key}' erstellt") return web.json_response({"message": f"Preset '{key}' erstellt"}) async def delete_preset(request: web.Request) -> web.Response: """DELETE /api/presets/{preset_name} - Preset loeschen""" preset_name = request.match_info["preset_name"] if preset_name == config.default_preset_name: return web.json_response( {"error": "Standard-Preset kann nicht geloescht werden"}, status=400 ) if preset_name not in config.presets: return web.json_response( {"error": f"Preset '{preset_name}' nicht gefunden"}, status=404 ) del config.presets[preset_name] config.save_presets() logging.info(f"Preset '{preset_name}' geloescht") return web.json_response( {"message": f"Preset '{preset_name}' geloescht"} ) # --- Statistics --- async def get_statistics(request: web.Request) -> web.Response: """GET /api/statistics?limit=50&offset=0""" limit = int(request.query.get("limit", 50)) offset = int(request.query.get("offset", 0)) stats = await queue_service.get_statistics(limit, offset) summary = await queue_service.get_statistics_summary() return web.json_response({"entries": stats, "summary": summary}) # --- System --- async def get_system_info(request: web.Request) -> web.Response: """GET /api/system - GPU-Status, verfuegbare Devices""" gpu_available = EncoderService.detect_gpu_available() devices = EncoderService.get_available_render_devices() return web.json_response({ "encoding_mode": config.encoding_mode, "gpu_available": gpu_available, "gpu_devices": devices, "gpu_device_configured": config.gpu_device, "default_preset": config.default_preset_name, "max_parallel_jobs": config.max_parallel_jobs, "active_jobs": len([ j for j in queue_service.jobs.values() if j.status.value == 1 ]), "queued_jobs": len([ j for j in queue_service.jobs.values() if j.status.value == 0 ]), }) async def get_ws_config(request: web.Request) -> web.Response: """GET /api/ws-config - WebSocket-URL fuer Client""" srv = config.server_config ext_url = srv.get("external_url", "") use_https = srv.get("use_https", False) port = srv.get("port", 8080) if ext_url: ws_url = ext_url else: ws_url = f"{request.host}" return web.json_response({ "websocket_url": ws_url, "websocket_path": srv.get("websocket_path", "/ws"), "use_https": use_https, "port": port, }) # --- Filebrowser --- # Erlaubte Basispfade (Sicherheit: nur unter /mnt navigierbar) _BROWSE_ROOTS = ["/mnt"] def _is_path_allowed(path: str) -> bool: """Prueft ob Pfad unter einem erlaubten Root liegt""" real = os.path.realpath(path) return any(real.startswith(root) for root in _BROWSE_ROOTS) async def get_browse(request: web.Request) -> web.Response: """ GET /api/browse?path=/mnt Gibt Ordner und Videodateien im Verzeichnis zurueck. """ path = request.query.get("path", "/mnt") if not _is_path_allowed(path): return web.json_response( {"error": "Zugriff verweigert"}, status=403 ) if not os.path.isdir(path): return web.json_response( {"error": "Verzeichnis nicht gefunden"}, status=404 ) scan_ext = set(config.files_config.get("scan_extensions", [])) dirs = [] files = [] try: for entry in sorted(os.listdir(path)): # Versteckte Dateien ueberspringen if entry.startswith("."): continue full = os.path.join(path, entry) if os.path.isdir(full): # Anzahl Videodateien im Unterordner zaehlen video_count = 0 try: for f in os.listdir(full): if os.path.splitext(f)[1].lower() in scan_ext: video_count += 1 except PermissionError: pass dirs.append({ "name": entry, "path": full, "video_count": video_count, }) elif os.path.isfile(full): ext = os.path.splitext(entry)[1].lower() if ext in scan_ext: size = os.path.getsize(full) files.append({ "name": entry, "path": full, "size": size, "size_human": _format_size(size), }) except PermissionError: return web.json_response( {"error": "Keine Leseberechtigung"}, status=403 ) # Eltern-Pfad (zum Navigieren nach oben) parent = os.path.dirname(path) if not _is_path_allowed(parent): parent = None return web.json_response({ "path": path, "parent": parent, "dirs": dirs, "files": files, "total_files": len(files), }) def _format_size(size_bytes: int) -> str: """Kompakte Groessenangabe""" if size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.0f} KiB" if size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.1f} MiB" return f"{size_bytes / (1024 * 1024 * 1024):.2f} GiB" # --- Upload --- # Upload-Verzeichnis _UPLOAD_DIR = "/mnt/uploads" async def post_upload(request: web.Request) -> web.Response: """ POST /api/upload (multipart/form-data) Laedt eine Videodatei hoch und startet die Konvertierung. """ os.makedirs(_UPLOAD_DIR, exist_ok=True) reader = await request.multipart() preset = None saved_files = [] while True: part = await reader.next() if part is None: break if part.name == "preset": preset = (await part.text()).strip() or None continue if part.name == "files": filename = part.filename if not filename: continue # Sicherheit: Nur Dateiname ohne Pfad filename = os.path.basename(filename) ext = os.path.splitext(filename)[1].lower() scan_ext = set(config.files_config.get("scan_extensions", [])) if ext not in scan_ext: logging.warning(f"Upload abgelehnt (Extension {ext}): {filename}") continue # Datei speichern dest = os.path.join(_UPLOAD_DIR, filename) # Konfliktvermeidung if os.path.exists(dest): base, extension = os.path.splitext(filename) counter = 1 while os.path.exists(dest): dest = os.path.join( _UPLOAD_DIR, f"{base}_{counter}{extension}" ) counter += 1 size = 0 with open(dest, "wb") as f: while True: chunk = await part.read_chunk() if not chunk: break f.write(chunk) size += len(chunk) saved_files.append(dest) logging.info(f"Upload: {filename} ({_format_size(size)})") if not saved_files: return web.json_response( {"error": "Keine gueltigen Videodateien hochgeladen"}, status=400 ) jobs = await queue_service.add_paths(saved_files, preset) return web.json_response({ "message": f"{len(saved_files)} Datei(en) hochgeladen, {len(jobs)} Jobs erstellt", "jobs": [{"id": j.id, "file": j.media.source_filename} for j in jobs], }) # --- Logs via WebSocket --- class WebSocketLogHandler(logging.Handler): """Pusht Logs direkt per WebSocket an alle Clients""" def __init__(self, ws_mgr: WebSocketManager): super().__init__() self._ws_manager = ws_mgr def emit(self, record): if not self._ws_manager or not self._ws_manager.clients: return try: loop = asyncio.get_running_loop() loop.create_task( self._ws_manager.broadcast_log( record.levelname, record.getMessage() ) ) except RuntimeError: pass if ws_manager: ws_log_handler = WebSocketLogHandler(ws_manager) ws_log_handler.setLevel(logging.INFO) logging.getLogger().addHandler(ws_log_handler) # --- Server-Log lesen --- async def get_log(request: web.Request) -> web.Response: """ GET /api/log?lines=100&level=INFO Gibt die letzten N Zeilen des Server-Logs zurueck. """ lines = int(request.query.get("lines", 100)) level_filter = request.query.get("level", "").upper() lines = min(lines, 5000) # Max 5000 Zeilen log_dir = Path(__file__).parent.parent.parent / "logs" log_file = log_dir / "server.log" # Fallback: Aus dem logging-Handler lesen log_entries = [] if log_file.exists(): try: with open(log_file, "r", encoding="utf-8", errors="replace") as f: all_lines = f.readlines() # Letzte N Zeilen recent = all_lines[-lines:] if len(all_lines) > lines else all_lines for line in recent: line = line.rstrip() if level_filter and level_filter not in line: continue log_entries.append(line) except Exception as e: return web.json_response( {"error": f"Log lesen fehlgeschlagen: {e}"}, status=500 ) else: # Kein Log-File: aus dem MemoryHandler lesen (falls vorhanden) for handler in logging.getLogger().handlers: if isinstance(handler, _MemoryLogHandler): entries = handler.get_entries(lines) for entry in entries: if level_filter and level_filter not in entry: continue log_entries.append(entry) break if not log_entries: log_entries.append("Keine Log-Datei gefunden unter: " + str(log_file)) return web.json_response({ "lines": log_entries, "count": len(log_entries), "source": str(log_file) if log_file.exists() else "memory", }) # In-Memory Log-Handler (fuer Zugriff ohne Datei) class _MemoryLogHandler(logging.Handler): """Speichert die letzten N Log-Eintraege im Speicher""" def __init__(self, max_entries: int = 2000): super().__init__() self._entries = [] self._max = max_entries def emit(self, record): msg = self.format(record) self._entries.append(msg) if len(self._entries) > self._max: self._entries = self._entries[-self._max:] def get_entries(self, n: int = 100) -> list[str]: return self._entries[-n:] # Memory-Handler installieren _mem_handler = _MemoryLogHandler(2000) _mem_handler.setLevel(logging.DEBUG) _mem_handler.setFormatter(logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s" )) logging.getLogger().addHandler(_mem_handler) # --- Routes registrieren --- app.router.add_get("/api/log", get_log) app.router.add_get("/api/browse", get_browse) app.router.add_post("/api/upload", post_upload) app.router.add_post("/api/convert", post_convert) app.router.add_get("/api/jobs", get_jobs) app.router.add_delete("/api/jobs/{job_id}", delete_job) app.router.add_post("/api/jobs/{job_id}/cancel", post_cancel) app.router.add_post("/api/jobs/{job_id}/retry", post_retry) app.router.add_post("/api/queue/pause", post_queue_pause) app.router.add_post("/api/queue/resume", post_queue_resume) app.router.add_get("/api/queue/status", get_queue_status) app.router.add_get("/api/settings", get_settings) app.router.add_put("/api/settings", put_settings) app.router.add_get("/api/presets", get_presets) app.router.add_post("/api/presets", post_preset) app.router.add_put("/api/presets/{preset_name}", put_preset) app.router.add_delete("/api/presets/{preset_name}", delete_preset) app.router.add_get("/api/statistics", get_statistics) app.router.add_get("/api/system", get_system_info) app.router.add_get("/api/ws-config", get_ws_config) def _deep_merge(base: dict, override: dict) -> None: """Rekursives Zusammenfuehren zweier Dicts""" for key, value in override.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): _deep_merge(base[key], value) else: base[key] = value