"""Job-Queue mit Persistierung und paralleler Ausfuehrung""" import asyncio import json import logging import os import signal import time from collections import OrderedDict from decimal import Decimal from typing import Optional, TYPE_CHECKING import aiomysql from app.config import Config from app.models.job import ConversionJob, JobStatus from app.models.media import MediaFile from app.services.encoder import EncoderService from app.services.probe import ProbeService from app.services.progress import ProgressParser from app.services.scanner import ScannerService if TYPE_CHECKING: from app.routes.ws import WebSocketManager class QueueService: """Verwaltet die Konvertierungs-Queue mit Persistierung""" def __init__(self, config: Config, ws_manager: 'WebSocketManager'): self.config = config self.ws_manager = ws_manager self.encoder = EncoderService(config) self.scanner = ScannerService(config) self.jobs: OrderedDict[int, ConversionJob] = OrderedDict() self._active_count: int = 0 self._running: bool = False self._paused: bool = False self._encoding_suspended: bool = False self._queue_task: Optional[asyncio.Task] = None self._queue_file = str(config.data_path / "queue.json") self._db_pool: Optional[aiomysql.Pool] = None async def start(self) -> None: """Startet den Queue-Worker und initialisiert die Datenbank""" await self._init_db() pending = self._load_queue() self._running = True self._queue_task = asyncio.create_task(self._process_loop()) logging.info( f"Queue gestartet ({len(self.jobs)} Jobs geladen, " f"max {self.config.max_parallel_jobs} parallel)" ) # Gespeicherte Jobs asynchron wieder einreihen (mit allen Optionen) if pending: asyncio.create_task(self._restore_jobs(pending)) async def _restore_jobs(self, job_data: list[dict]) -> None: """Stellt Jobs aus gespeicherten Daten wieder her""" for item in job_data: media = await ProbeService.analyze(item["source_path"]) if media: await self.add_job( media, preset_name=item.get("preset_name"), delete_source=item.get("delete_source", False) ) async def stop(self) -> None: """Stoppt den Queue-Worker""" self._running = False if self._queue_task: self._queue_task.cancel() try: await self._queue_task except asyncio.CancelledError: pass if self._db_pool is not None: self._db_pool.close() await self._db_pool.wait_closed() logging.info("Queue gestoppt") async def add_job(self, media: MediaFile, preset_name: Optional[str] = None, delete_source: bool = False) -> Optional[ConversionJob]: """Fuegt neuen Job zur Queue hinzu""" if self._is_duplicate(media.source_path): logging.info(f"Duplikat uebersprungen: {media.source_filename}") return None if not preset_name: preset_name = self.config.default_preset_name job = ConversionJob(media=media, preset_name=preset_name) job.delete_source = delete_source job.build_target_path(self.config) self.jobs[job.id] = job self._save_queue() logging.info( f"Job hinzugefuegt: {media.source_filename} " f"-> {job.target_filename} (Preset: {preset_name})" f"{' [delete_source]' if delete_source else ''}" ) await self.ws_manager.broadcast_queue_update() return job async def add_paths(self, paths: list[str], preset_name: Optional[str] = None, recursive: Optional[bool] = None, delete_source: bool = False) -> list[ConversionJob]: """Fuegt mehrere Pfade hinzu (Dateien und Ordner)""" jobs = [] all_files = [] for path in paths: path = path.strip() if not path: continue scanned = self.scanner.scan_path(path, recursive) all_files.extend(scanned) logging.info(f"{len(all_files)} Dateien aus {len(paths)} Pfaden gefunden") for file_path in all_files: media = await ProbeService.analyze(file_path) if media: job = await self.add_job(media, preset_name, delete_source) if job: jobs.append(job) return jobs async def remove_job(self, job_id: int) -> bool: """Entfernt Job aus Queue, bricht laufende Konvertierung ab""" job = self.jobs.get(job_id) if not job: return False if job.status == JobStatus.ACTIVE and job.process: try: job.process.terminate() await asyncio.sleep(0.5) if job.process.returncode is None: job.process.kill() except ProcessLookupError: pass if job.task and not job.task.done(): job.task.cancel() del self.jobs[job_id] self._save_queue() await self.ws_manager.broadcast_queue_update() logging.info(f"Job entfernt: {job.media.source_filename}") return True async def cancel_job(self, job_id: int) -> bool: """Bricht laufenden Job ab""" job = self.jobs.get(job_id) if not job or job.status != JobStatus.ACTIVE: return False if job.process: try: job.process.terminate() except ProcessLookupError: pass job.status = JobStatus.CANCELLED job.finished_at = time.time() self._active_count = max(0, self._active_count - 1) self._save_queue() await self.ws_manager.broadcast_queue_update() logging.info(f"Job abgebrochen: {job.media.source_filename}") return True async def pause_queue(self) -> bool: """Pausiert die Queue - laufende Jobs werden fertig, keine neuen gestartet""" if self._paused: return False self._paused = True logging.info("Queue pausiert - keine neuen Jobs werden gestartet") await self.ws_manager.broadcast_queue_update() return True async def resume_queue(self) -> bool: """Setzt die Queue fort""" if not self._paused: return False self._paused = False logging.info("Queue fortgesetzt") await self.ws_manager.broadcast_queue_update() return True @property def is_paused(self) -> bool: return self._paused @property def encoding_suspended(self) -> bool: """Sind aktive ffmpeg-Prozesse gerade per SIGSTOP eingefroren?""" return self._encoding_suspended def suspend_encoding(self) -> int: """Friert alle aktiven ffmpeg-Konvertierungen per SIGSTOP ein. Wird aufgerufen wenn ein HLS-Stream startet, damit der Server volle Ressourcen fuers Streaming hat. Gibt die Anzahl pausierter Prozesse zurueck.""" count = 0 for job in self.jobs.values(): if (job.status == JobStatus.ACTIVE and job.process and job.process.returncode is None): try: os.kill(job.process.pid, signal.SIGSTOP) count += 1 except (ProcessLookupError, PermissionError) as e: logging.warning(f"SIGSTOP fehlgeschlagen fuer PID " f"{job.process.pid}: {e}") if count: self._encoding_suspended = True logging.info(f"Encoding pausiert: {count} ffmpeg-Prozess(e) " f"per SIGSTOP eingefroren (HLS-Stream aktiv)") return count def resume_encoding(self) -> int: """Setzt alle eingefrorenen ffmpeg-Konvertierungen per SIGCONT fort. Wird aufgerufen wenn der letzte HLS-Stream endet. Gibt die Anzahl fortgesetzter Prozesse zurueck.""" count = 0 for job in self.jobs.values(): if (job.status == JobStatus.ACTIVE and job.process and job.process.returncode is None): try: os.kill(job.process.pid, signal.SIGCONT) count += 1 except (ProcessLookupError, PermissionError) as e: logging.warning(f"SIGCONT fehlgeschlagen fuer PID " f"{job.process.pid}: {e}") if count: logging.info(f"Encoding fortgesetzt: {count} ffmpeg-Prozess(e) " f"per SIGCONT aufgeweckt") self._encoding_suspended = False return count async def retry_job(self, job_id: int) -> bool: """Setzt fehlgeschlagenen Job zurueck auf QUEUED""" job = self.jobs.get(job_id) if not job or job.status not in (JobStatus.FAILED, JobStatus.CANCELLED): return False job.status = JobStatus.QUEUED job.progress_percent = 0.0 job.progress_frames = 0 job.progress_fps = 0.0 job.progress_speed = 0.0 job.progress_size_bytes = 0 job.progress_time_sec = 0.0 job.progress_eta_sec = 0.0 job.started_at = None job.finished_at = None job._stat_fps = [0.0, 0] job._stat_speed = [0.0, 0] job._stat_bitrate = [0, 0] self._save_queue() await self.ws_manager.broadcast_queue_update() logging.info(f"Job wiederholt: {job.media.source_filename}") return True def get_queue_state(self) -> dict: """Queue-Status fuer WebSocket""" queue = {} for job_id, job in self.jobs.items(): if job.status in (JobStatus.QUEUED, JobStatus.ACTIVE, JobStatus.FAILED, JobStatus.CANCELLED): queue[job_id] = job.to_dict_queue() return {"data_queue": queue, "queue_paused": self._paused, "encoding_suspended": self._encoding_suspended} def get_active_jobs(self) -> dict: """Aktive Jobs fuer WebSocket""" active = {} for job_id, job in self.jobs.items(): if job.status == JobStatus.ACTIVE: active[job_id] = job.to_dict_active() return {"data_convert": active} def get_all_jobs(self) -> list[dict]: """Alle Jobs als Liste fuer API""" return [ {"id": jid, **job.to_dict_active(), "status_name": job.status.name} for jid, job in self.jobs.items() ] # --- Interner Queue-Worker --- async def _process_loop(self) -> None: """Hauptschleife: Startet neue Jobs wenn Kapazitaet frei""" while self._running: try: if (not self._paused and not self._encoding_suspended and self._active_count < self.config.max_parallel_jobs): next_job = self._get_next_queued() if next_job: asyncio.create_task(self._execute_job(next_job)) except Exception as e: logging.error(f"Queue-Worker Fehler: {e}") await asyncio.sleep(0.5) async def _execute_job(self, job: ConversionJob) -> None: """Fuehrt einzelnen Konvertierungs-Job aus""" self._active_count += 1 job.status = JobStatus.ACTIVE job.started_at = time.time() await self.ws_manager.broadcast_queue_update() command = self.encoder.build_command(job) logging.info( f"Starte Konvertierung: {job.media.source_filename}\n" f" Befehl: {' '.join(command)}" ) try: job.process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) progress = ProgressParser(self.ws_manager.broadcast_progress) await progress.monitor(job) await job.process.wait() if job.process.returncode == 0: job.status = JobStatus.FINISHED # Tatsaechliche Dateigroesse von Disk lesen if os.path.exists(job.target_path): job.progress_size_bytes = os.path.getsize(job.target_path) logging.info( f"Konvertierung abgeschlossen: {job.media.source_filename} " f"({MediaFile.format_time(time.time() - job.started_at)})" ) await self._post_conversion_cleanup(job) else: job.status = JobStatus.FAILED error_output = progress.get_error_output() logging.error( f"Konvertierung fehlgeschlagen (Code {job.process.returncode}): " f"{job.media.source_filename}\n" f" ffmpeg stderr:\n{error_output}" ) except asyncio.CancelledError: job.status = JobStatus.CANCELLED logging.info(f"Konvertierung abgebrochen: {job.media.source_filename}") except Exception as e: job.status = JobStatus.FAILED logging.error(f"Fehler bei Konvertierung: {e}") finally: job.finished_at = time.time() self._active_count = max(0, self._active_count - 1) self._save_queue() await self._save_stats(job) await self.ws_manager.broadcast_queue_update() # Library-Seite zum Reload auffordern (Badges aktualisieren) if job.status == JobStatus.FINISHED: await self.ws_manager.broadcast({ "data_library_scan": { "status": "idle", "current": "", "total": 0, "done": 0 } }) async def _post_conversion_cleanup(self, job: ConversionJob) -> None: """Cleanup nach erfolgreicher Konvertierung. WICHTIG: Nur die Quelldatei dieses Jobs loeschen, NICHT andere Dateien im Ordner die noch in der Queue warten!""" files_cfg = self.config.files_config # Quelldatei loeschen: Global per Config ODER per Job-Option should_delete = files_cfg.get("delete_source", False) or \ job.delete_source if should_delete: target_exists = os.path.exists(job.target_path) target_size = (os.path.getsize(job.target_path) if target_exists else 0) if target_exists and target_size > 0: try: os.remove(job.media.source_path) logging.info( f"Quelldatei geloescht: {job.media.source_path}") except OSError as e: logging.error( f"Quelldatei loeschen fehlgeschlagen: {e}") else: logging.warning( f"Quelldatei NICHT geloescht " f"(Zieldatei fehlt/leer): " f"{job.media.source_path}") # SICHERHEIT: Ordner-Cleanup nur wenn KEINE weiteren # Jobs aus diesem Ordner in der Queue warten! cleanup_cfg = self.config.cleanup_config if cleanup_cfg.get("enabled", False): source_dir = job.media.source_dir pending = [ j for j in self.jobs.values() if j.media.source_dir == source_dir and j.status in (JobStatus.QUEUED, JobStatus.ACTIVE) and j.id != job.id ] if pending: logging.info( f"Ordner-Cleanup uebersprungen " f"({len(pending)} Jobs wartend): {source_dir}") else: deleted = self.scanner.cleanup_directory(source_dir) if deleted: logging.info( f"{len(deleted)} Dateien bereinigt " f"in {source_dir}" ) def _get_next_queued(self) -> Optional[ConversionJob]: """Naechster Job mit Status QUEUED (FIFO)""" for job in self.jobs.values(): if job.status == JobStatus.QUEUED: return job return None def _is_duplicate(self, source_path: str) -> bool: """Prueft ob Pfad bereits in Queue (nur aktive/wartende)""" for job in self.jobs.values(): if (job.media.source_path == source_path and job.status in (JobStatus.QUEUED, JobStatus.ACTIVE)): return True return False def _save_queue(self) -> None: """Persistiert Queue nach queue.json""" queue_data = [] for job in self.jobs.values(): if job.status in (JobStatus.QUEUED, JobStatus.FAILED): queue_data.append(job.to_json()) try: with open(self._queue_file, "w", encoding="utf-8") as f: json.dump(queue_data, f, indent=2) except Exception as e: logging.error(f"Queue speichern fehlgeschlagen: {e}") def _load_queue(self) -> list[dict]: """Laedt Queue aus queue.json, gibt Job-Daten zurueck""" if not os.path.exists(self._queue_file): return [] try: with open(self._queue_file, "r", encoding="utf-8") as f: queue_data = json.load(f) pending = [ { "source_path": item["source_path"], "preset_name": item.get("preset_name"), "delete_source": item.get("delete_source", False), } for item in queue_data if item.get("status", 0) == JobStatus.QUEUED ] if pending: logging.info(f"{len(pending)} Jobs aus Queue geladen") return pending except Exception as e: logging.error(f"Queue laden fehlgeschlagen: {e}") return [] # --- MariaDB Statistik-Datenbank --- def _get_db_config(self) -> dict: """DB-Konfiguration aus Settings""" db_cfg = self.config.settings.get("database", {}) return { "host": db_cfg.get("host", "192.168.155.11"), "port": db_cfg.get("port", 3306), "user": db_cfg.get("user", "video"), "password": db_cfg.get("password", "8715"), "db": db_cfg.get("database", "video_converter"), } async def _get_pool(self) -> Optional[aiomysql.Pool]: """Gibt den Connection-Pool zurueck, erstellt ihn bei Bedarf""" if self._db_pool is not None: return self._db_pool db_cfg = self._get_db_config() self._db_pool = await aiomysql.create_pool( host=db_cfg["host"], port=db_cfg["port"], user=db_cfg["user"], password=db_cfg["password"], db=db_cfg["db"], charset="utf8mb4", autocommit=True, minsize=1, maxsize=5, connect_timeout=10, ) return self._db_pool async def _init_db(self) -> None: """Erstellt MariaDB-Tabelle falls nicht vorhanden""" db_cfg = self._get_db_config() try: pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" CREATE TABLE IF NOT EXISTS conversions ( id INT AUTO_INCREMENT PRIMARY KEY, source_path VARCHAR(1024) NOT NULL, source_filename VARCHAR(512) NOT NULL, source_size_bytes BIGINT, source_duration_sec DOUBLE, source_frame_rate DOUBLE, source_frames_total INT, target_path VARCHAR(1024), target_filename VARCHAR(512), target_size_bytes BIGINT, target_container VARCHAR(10), preset_name VARCHAR(64), status INT, started_at DOUBLE, finished_at DOUBLE, duration_sec DOUBLE, avg_fps DOUBLE, avg_speed DOUBLE, avg_bitrate INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_created_at (created_at), INDEX idx_status (status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) logging.info( f"MariaDB verbunden: {db_cfg['host']}:{db_cfg['port']}/" f"{db_cfg['db']}" ) except Exception as e: logging.error( f"MariaDB Initialisierung fehlgeschlagen " f"({db_cfg['host']}:{db_cfg['port']}): {e}" ) logging.warning("Statistiken werden ohne Datenbank ausgefuehrt") self._db_pool = None async def _save_stats(self, job: ConversionJob) -> None: """Speichert Konvertierungs-Ergebnis in MariaDB""" if self._db_pool is None: return stats = job.to_dict_stats() try: pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" INSERT INTO conversions ( source_path, source_filename, source_size_bytes, source_duration_sec, source_frame_rate, source_frames_total, target_path, target_filename, target_size_bytes, target_container, preset_name, status, started_at, finished_at, duration_sec, avg_fps, avg_speed, avg_bitrate ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( stats["source_path"], stats["source_filename"], stats["source_size_bytes"], stats["source_duration_sec"], stats["source_frame_rate"], stats["source_frames_total"], stats["target_path"], stats["target_filename"], stats["target_size_bytes"], stats["target_container"], stats["preset_name"], stats["status"], stats["started_at"], stats["finished_at"], stats["duration_sec"], stats["avg_fps"], stats["avg_speed"], stats["avg_bitrate"], )) # Max-Eintraege bereinigen max_entries = self.config.settings.get( "statistics", {} ).get("max_entries", 5000) await cur.execute( "DELETE FROM conversions WHERE id NOT IN (" "SELECT id FROM (SELECT id FROM conversions " "ORDER BY created_at DESC LIMIT %s) AS tmp)", (max_entries,) ) except Exception as e: logging.error(f"Statistik speichern fehlgeschlagen: {e}") async def get_statistics(self, limit: int = 50, offset: int = 0) -> list[dict]: """Liest Statistiken aus MariaDB""" if self._db_pool is None: return [] try: pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT * FROM conversions ORDER BY created_at DESC " "LIMIT %s OFFSET %s", (limit, offset), ) rows = await cur.fetchall() # MariaDB-Typen JSON-kompatibel machen result = [] for row in rows: entry = {} for k, v in row.items(): if isinstance(v, Decimal): entry[k] = float(v) elif hasattr(v, "isoformat"): entry[k] = str(v) else: entry[k] = v result.append(entry) return result except Exception as e: logging.error(f"Statistik lesen fehlgeschlagen: {e}") return [] async def get_statistics_summary(self) -> dict: """Zusammenfassung der Statistiken""" if self._db_pool is None: return {} try: pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END), SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END), SUM(source_size_bytes), SUM(target_size_bytes), SUM(duration_sec), AVG(avg_fps), AVG(avg_speed) FROM conversions """) row = await cur.fetchone() if row: # Decimal -> float/int fuer JSON def _n(v, as_int=False): v = v or 0 return int(v) if as_int else float(v) return { "total": _n(row[0], True), "finished": _n(row[1], True), "failed": _n(row[2], True), "total_source_size": _n(row[3], True), "total_target_size": _n(row[4], True), "space_saved": _n(row[3], True) - _n(row[4], True), "total_duration": _n(row[5]), "avg_fps": round(float(row[6] or 0), 1), "avg_speed": round(float(row[7] or 0), 2), } return {} except Exception as e: logging.error(f"Statistik-Zusammenfassung fehlgeschlagen: {e}") return {}