"""HLS Session Manager - HTTP Live Streaming per ffmpeg Verwaltet HLS-Sessions: ffmpeg erzeugt m3u8 + fMP4-Segmente (.m4s), die dann per HTTP ausgeliefert werden. Vorteile gegenueber Pipe-Streaming: - Sofortiger Playback-Start (erstes Segment in ~1s verfuegbar) - Natives Seeking ueber Segmente - fMP4 statt mpegts (bessere Codec-Kompatibilitaet) - Automatische Codec-Erkennung: Client meldet unterstuetzte Codecs, Server entscheidet copy vs. H.264-Transcoding (CPU oder GPU/VAAPI) - hls.js Polyfill fuer Browser ohne native HLS-Unterstuetzung - Samsung Tizen hat native HLS-Unterstuetzung """ import asyncio import json import logging import os import shutil import time import uuid from pathlib import Path from typing import Optional import aiomysql from app.services.library import LibraryService # Browser-kompatible Audio-Codecs (kein Transcoding noetig) BROWSER_AUDIO_CODECS = {"aac", "mp3", "opus", "vorbis", "flac"} # Video-Codec-Normalisierung (DB-Werte -> einfache Namen fuer Client-Abgleich) CODEC_NORMALIZE = { "av1": "av1", "libaom-av1": "av1", "libsvtav1": "av1", "av1_vaapi": "av1", "hevc": "hevc", "h265": "hevc", "libx265": "hevc", "hevc_vaapi": "hevc", "h264": "h264", "avc": "h264", "libx264": "h264", "h264_vaapi": "h264", "vp9": "vp9", "libvpx-vp9": "vp9", "vp8": "vp8", "mpeg4": "mpeg4", "mpeg2video": "mpeg2", } # HLS Konfiguration (Defaults, werden von Config ueberschrieben) HLS_BASE_DIR = Path("/tmp/hls") # Qualitaets-Stufen (Ziel-Hoehe) QUALITY_HEIGHTS = {"uhd": 2160, "hd": 1080, "sd": 720, "low": 480} class HLSSession: """Einzelne HLS-Streaming-Session""" def __init__(self, session_id: str, video_id: int, quality: str, audio_idx: int, sound_mode: str, seek_sec: float): self.session_id = session_id self.video_id = video_id self.quality = quality self.audio_idx = audio_idx self.sound_mode = sound_mode self.seek_sec = seek_sec self.process: Optional[asyncio.subprocess.Process] = None self.created_at = time.time() self.last_access = time.time() self.ready = False self.error: Optional[str] = None @property def dir(self) -> Path: return HLS_BASE_DIR / self.session_id @property def playlist_path(self) -> Path: return self.dir / "stream.m3u8" def touch(self): """Letzten Zugriff aktualisieren""" self.last_access = time.time() def is_expired(self, timeout_sec: int = 300) -> bool: return (time.time() - self.last_access) > timeout_sec class HLSSessionManager: """Verwaltet alle HLS-Sessions mit Auto-Cleanup""" def __init__(self, library_service: LibraryService, gpu_device: str = "/dev/dri/renderD128", queue_service=None, config=None): self._library = library_service self._sessions: dict[str, HLSSession] = {} self._cleanup_task: Optional[asyncio.Task] = None self._gpu_device = gpu_device self._gpu_available = os.path.exists(gpu_device) self._queue_service = queue_service self._config = config def _tv_setting(self, key: str, default): """TV-Einstellung aus Config lesen (mit Fallback)""" if self._config: return self._config.tv_config.get(key, default) return default async def start(self): """Cleanup-Task starten und Verzeichnis vorbereiten""" HLS_BASE_DIR.mkdir(parents=True, exist_ok=True) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) gpu_status = (f"GPU verfuegbar ({self._gpu_device})" if self._gpu_available else "Nur CPU-Transcoding") logging.info(f"HLS Session Manager gestartet - {gpu_status}") async def stop(self): """Alle Sessions beenden und aufraumen""" if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass for sid in list(self._sessions): await self.destroy_session(sid) logging.info("HLS Session Manager gestoppt") async def create_session(self, video_id: int, quality: str = "hd", audio_idx: int = 0, sound_mode: str = "stereo", seek_sec: float = 0, client_codecs: list[str] = None, ) -> Optional[HLSSession]: """Neue HLS-Session erstellen und ffmpeg starten client_codecs: Liste der vom Client unterstuetzten Video-Codecs (z.B. ["h264", "hevc", "av1"]). Wenn der Quell-Codec nicht drin ist, wird automatisch zu H.264 transkodiert (GPU wenn verfuegbar). """ # Max. gleichzeitige Sessions pruefen max_sessions = self._tv_setting("hls_max_sessions", 5) if len(self._sessions) >= max_sessions: logging.warning(f"HLS: Max. Sessions ({max_sessions}) erreicht - " f"neue Session abgelehnt") return None pool = await self._library._get_pool() if not pool: return None # Video-Info aus DB laden try: async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT file_path, width, height, video_codec, " "audio_tracks, container, duration_sec " "FROM library_videos WHERE id = %s", (video_id,)) video = await cur.fetchone() if not video: return None except Exception as e: logging.error(f"HLS Session DB-Fehler: {e}") return None file_path = video["file_path"] if not os.path.isfile(file_path): logging.error(f"HLS: Datei nicht gefunden: {file_path}") return None # Session erstellen session_id = uuid.uuid4().hex[:16] session = HLSSession(session_id, video_id, quality, audio_idx, sound_mode, seek_sec) session.dir.mkdir(parents=True, exist_ok=True) # Audio-Tracks parsen audio_tracks = video.get("audio_tracks") or "[]" if isinstance(audio_tracks, str): audio_tracks = json.loads(audio_tracks) if audio_idx >= len(audio_tracks): audio_idx = 0 audio_info = audio_tracks[audio_idx] if audio_tracks else {} audio_codec = audio_info.get("codec", "unknown") audio_channels = audio_info.get("channels", 2) # Video-Codec: Kann der Client das direkt abspielen? video_codec = video.get("video_codec", "unknown") codec_name = CODEC_NORMALIZE.get(video_codec, video_codec) # Fallback: wenn keine Codecs gemeldet -> nur H.264 annehmen supported = client_codecs or ["h264"] client_can_play = codec_name in supported # Ziel-Aufloesung orig_h = video.get("height") or 1080 target_h = QUALITY_HEIGHTS.get(quality, 1080) needs_video_scale = orig_h > target_h and quality != "uhd" needs_video_transcode = not client_can_play # Audio-Transcoding noetig? needs_audio_transcode = audio_codec not in BROWSER_AUDIO_CODECS # Force-Transcode: Immer transcodieren fuer maximale Kompatibilitaet if self._tv_setting("force_transcode", False): needs_video_transcode = True if audio_codec not in BROWSER_AUDIO_CODECS: needs_audio_transcode = True # Sound-Modus if sound_mode == "stereo": out_channels = 2 elif sound_mode == "surround": out_channels = min(audio_channels, 8) else: out_channels = audio_channels if out_channels != audio_channels: needs_audio_transcode = True # ffmpeg starten (GPU-Versuch mit CPU-Fallback) use_gpu = (self._gpu_available and (needs_video_transcode or needs_video_scale)) cmd = self._build_ffmpeg_cmd( file_path, session, seek_sec, audio_idx, quality, needs_video_transcode, needs_video_scale, target_h, needs_audio_transcode, out_channels, use_gpu) vmode = "copy" if not (needs_video_transcode or needs_video_scale) else ( "h264_vaapi" if use_gpu else "libx264") logging.info(f"HLS Session {session_id}: starte ffmpeg fuer " f"Video {video_id} (q={quality}, v={vmode}, " f"src={video_codec}, audio={audio_idx})") logging.debug(f"HLS ffmpeg cmd: {' '.join(cmd)}") try: session.process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) except Exception as e: logging.error(f"HLS ffmpeg Start fehlgeschlagen: {e}") shutil.rmtree(session.dir, ignore_errors=True) return None # GPU-Fallback: Wenn ffmpeg sofort scheitert (z.B. h264_vaapi nicht # unterstuetzt) -> automatisch mit CPU-Encoding neu starten if use_gpu: await asyncio.sleep(1.0) if session.process.returncode is not None: stderr = await session.process.stderr.read() err_msg = stderr.decode("utf-8", errors="replace")[:300] logging.warning( f"HLS GPU-Encoding fehlgeschlagen (Code " f"{session.process.returncode}): {err_msg}") logging.info("HLS: Fallback auf CPU-Encoding (libx264)") self._gpu_available = False # GPU fuer HLS deaktivieren # Dateien aufraumen und neu starten for f in session.dir.iterdir(): f.unlink(missing_ok=True) cmd = self._build_ffmpeg_cmd( file_path, session, seek_sec, audio_idx, quality, needs_video_transcode, needs_video_scale, target_h, needs_audio_transcode, out_channels, use_gpu=False) logging.debug(f"HLS CPU-Fallback cmd: {' '.join(cmd)}") try: session.process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) except Exception as e: logging.error(f"HLS CPU-Fallback Start fehlgeschlagen: {e}") shutil.rmtree(session.dir, ignore_errors=True) return None self._sessions[session_id] = session # Laufende Batch-Konvertierungen einfrieren (Ressourcen fuer Stream) if self._queue_service and self._tv_setting("pause_batch_on_stream", True): count = self._queue_service.suspend_encoding() logging.info(f"HLS: {count} Konvertierung(en) eingefroren") # Kurz warten ob erstes Segment schnell kommt (Copy-Modus: <1s) # Bei Transcoding nicht lange blockieren - hls.js/native HLS # haben eigene Retry-Logik fuer noch nicht verfuegbare Segmente timeout = 3.0 if (needs_video_transcode or needs_video_scale) else 2.0 await self._wait_for_ready(session, timeout=timeout) return session def _build_ffmpeg_cmd(self, file_path: str, session: HLSSession, seek_sec: float, audio_idx: int, quality: str, needs_video_transcode: bool, needs_video_scale: bool, target_h: int, needs_audio_transcode: bool, out_channels: int, use_gpu: bool) -> list[str]: """Baut das ffmpeg-Kommando fuer HLS-Streaming. GPU-Modus: Software-Decode -> NV12 -> hwupload -> h264_vaapi (zuverlaessiger als Full-HW-Pipeline, funktioniert mit allen Quell-Codecs)""" cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"] # Schnellere Datei-Analyse cmd += ["-analyzeduration", "3000000", # 3 Sekunden "-probesize", "3000000"] # 3 MB # VAAPI-Device (KEIN hwaccel - Software-Decode ist zuverlaessiger # fuer beliebige Quell-Codecs wie AV1/VP9/HEVC) if use_gpu: cmd += ["-vaapi_device", self._gpu_device] if seek_sec > 0: cmd += ["-ss", str(seek_sec)] cmd += ["-i", file_path] # Video-Codec Entscheidung cmd += ["-map", "0:v:0"] if needs_video_scale or needs_video_transcode: crf = {"sd": "23", "low": "28"}.get(quality, "20") if use_gpu: # VAAPI Hardware-Encoding (Intel A380): # format=nv12 (CPU) -> hwupload (VAAPI) -> h264_vaapi vf_parts = ["format=nv12"] if needs_video_scale: # CPU-seitig skalieren, dann hochladen vf_parts.insert(0, f"scale=-2:{target_h}") vf_parts.append("hwupload") cmd += ["-vf", ",".join(vf_parts), "-c:v", "h264_vaapi", "-qp", crf, "-low_power", "1"] else: # CPU Software-Encoding vf_parts = [] if needs_video_scale: vf_parts.append(f"scale=-2:{target_h}") cmd += ["-c:v", "libx264", "-preset", "veryfast", "-crf", crf] if vf_parts: cmd += ["-vf", ",".join(vf_parts)] else: cmd += ["-c:v", "copy"] # Audio cmd += ["-map", f"0:a:{audio_idx}"] if needs_audio_transcode: bitrate = {1: "96k", 2: "192k"}.get( out_channels, f"{out_channels * 64}k") cmd += ["-c:a", "aac", "-ac", str(out_channels), "-b:a", bitrate] else: cmd += ["-c:a", "copy"] # HLS Output - fMP4 Segmente seg_dur = self._tv_setting("hls_segment_duration", 4) init_dur = self._tv_setting("hls_init_duration", 1) cmd += [ "-f", "hls", "-hls_time", str(seg_dur), "-hls_init_time", str(init_dur), "-hls_list_size", "0", "-hls_segment_type", "fmp4", "-hls_fmp4_init_filename", "init.mp4", "-hls_flags", "append_list+independent_segments", "-hls_segment_filename", str(session.dir / "seg%05d.m4s"), "-start_number", "0", str(session.playlist_path), ] return cmd async def _wait_for_ready(self, session: HLSSession, timeout: float = 15.0): """Warten bis Playlist und erstes Segment existieren""" start = time.time() while (time.time() - start) < timeout: if session.playlist_path.exists(): # Pruefen ob Init-Segment + mindestens ein Media-Segment da ist init_seg = session.dir / "init.mp4" segments = list(session.dir.glob("seg*.m4s")) if init_seg.exists() and segments: session.ready = True logging.info( f"HLS Session {session.session_id}: bereit " f"({len(segments)} Segmente, " f"{time.time() - start:.1f}s)") return # ffmpeg beendet? if session.process and session.process.returncode is not None: stderr = await session.process.stderr.read() session.error = stderr.decode("utf-8", errors="replace") logging.error( f"HLS ffmpeg beendet mit Code " f"{session.process.returncode}: {session.error[:500]}") return await asyncio.sleep(0.3) logging.warning( f"HLS Session {session.session_id}: Timeout nach {timeout}s") def get_session(self, session_id: str) -> Optional[HLSSession]: """Session anhand ID holen und Zugriff aktualisieren""" session = self._sessions.get(session_id) if session: session.touch() return session async def destroy_session(self, session_id: str): """Session beenden: ffmpeg stoppen + Dateien loeschen""" session = self._sessions.pop(session_id, None) if not session: return # ffmpeg-Prozess beenden if session.process and session.process.returncode is None: session.process.terminate() try: await asyncio.wait_for(session.process.wait(), timeout=5) except asyncio.TimeoutError: session.process.kill() await session.process.wait() # Dateien aufraumen if session.dir.exists(): shutil.rmtree(session.dir, ignore_errors=True) logging.info(f"HLS Session {session_id} beendet") # Wenn keine Sessions mehr aktiv -> Batch-Konvertierung fortsetzen if (not self._sessions and self._queue_service and self._tv_setting("pause_batch_on_stream", True)): count = self._queue_service.resume_encoding() logging.info(f"HLS: Alle Sessions beendet, " f"{count} Konvertierung(en) fortgesetzt") async def _cleanup_loop(self): """Periodisch abgelaufene Sessions entfernen""" while True: try: await asyncio.sleep(30) timeout_sec = self._tv_setting("hls_session_timeout_min", 5) * 60 expired = [ sid for sid, s in self._sessions.items() if s.is_expired(timeout_sec) ] for sid in expired: logging.info( f"HLS Session {sid} abgelaufen (Timeout)") await self.destroy_session(sid) # Verwaiste Verzeichnisse aufraumen if HLS_BASE_DIR.exists(): for d in HLS_BASE_DIR.iterdir(): if d.is_dir() and d.name not in self._sessions: shutil.rmtree(d, ignore_errors=True) except asyncio.CancelledError: raise except Exception as e: logging.error(f"HLS Cleanup Fehler: {e}") @property def active_sessions(self) -> int: return len(self._sessions) def get_all_sessions_info(self) -> list[dict]: """Alle Sessions als Info-Liste (fuer Debug/Admin)""" return [ { "session_id": s.session_id, "video_id": s.video_id, "quality": s.quality, "ready": s.ready, "age_sec": int(time.time() - s.created_at), "idle_sec": int(time.time() - s.last_access), } for s in self._sessions.values() ]