Kompletter Video-Konverter mit Web-UI, GPU-Beschleunigung (Intel VAAPI), Video-Bibliothek mit Serien/Film-Erkennung und TVDB-Integration. Features: - AV1/HEVC/H.264 Encoding (GPU + CPU) - Video-Bibliothek mit ffprobe-Analyse und Filtern - TVDB-Integration mit Review-Modal und Sprachkonfiguration - Film-Scanning und TVDB-Zuordnung - Import- und Clean-Service (Grundgeruest) - WebSocket Live-Updates, Queue-Management - Docker mit GPU/CPU-Profilen Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
166 lines
5.1 KiB
Python
166 lines
5.1 KiB
Python
"""Media-Datei-Modell mit Stream-Informationen"""
|
|
import os
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
|
|
@dataclass
|
|
class VideoStream:
|
|
"""Einzelner Video-Stream"""
|
|
index: int
|
|
codec_name: str
|
|
width: int = 0
|
|
height: int = 0
|
|
pix_fmt: str = ""
|
|
frame_rate: float = 0.0
|
|
level: Optional[int] = None
|
|
bit_rate: Optional[int] = None
|
|
|
|
@property
|
|
def is_10bit(self) -> bool:
|
|
"""Erkennt ob der Stream 10-Bit ist"""
|
|
return "10" in self.pix_fmt or "p010" in self.pix_fmt
|
|
|
|
@property
|
|
def resolution(self) -> str:
|
|
return f"{self.width}x{self.height}"
|
|
|
|
|
|
@dataclass
|
|
class AudioStream:
|
|
"""Einzelner Audio-Stream"""
|
|
index: int
|
|
codec_name: str
|
|
channels: int = 2
|
|
sample_rate: int = 48000
|
|
language: Optional[str] = None
|
|
bit_rate: Optional[int] = None
|
|
|
|
@property
|
|
def channel_layout(self) -> str:
|
|
"""Menschenlesbares Kanal-Layout"""
|
|
layouts = {1: "Mono", 2: "Stereo", 3: "2.1", 6: "5.1", 8: "7.1"}
|
|
return layouts.get(self.channels, f"{self.channels}ch")
|
|
|
|
|
|
@dataclass
|
|
class SubtitleStream:
|
|
"""Einzelner Untertitel-Stream"""
|
|
index: int
|
|
codec_name: str
|
|
language: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class MediaFile:
|
|
"""Analysierte Mediendatei mit allen Stream-Informationen"""
|
|
source_path: str
|
|
source_dir: str = field(init=False)
|
|
source_filename: str = field(init=False)
|
|
source_extension: str = field(init=False)
|
|
source_size_bytes: int = 0
|
|
source_duration_sec: float = 0.0
|
|
source_bitrate: int = 0
|
|
|
|
video_streams: list[VideoStream] = field(default_factory=list)
|
|
audio_streams: list[AudioStream] = field(default_factory=list)
|
|
subtitle_streams: list[SubtitleStream] = field(default_factory=list)
|
|
|
|
def __post_init__(self) -> None:
|
|
self.source_dir = os.path.dirname(self.source_path)
|
|
self.source_filename = os.path.basename(self.source_path)
|
|
self.source_extension = os.path.splitext(self.source_filename)[1].lower()
|
|
|
|
@property
|
|
def frame_rate(self) -> float:
|
|
"""Framerate des ersten Video-Streams"""
|
|
if self.video_streams:
|
|
return self.video_streams[0].frame_rate
|
|
return 0.0
|
|
|
|
@property
|
|
def total_frames(self) -> int:
|
|
"""Gesamtanzahl Frames"""
|
|
return int(self.frame_rate * self.source_duration_sec)
|
|
|
|
@property
|
|
def is_10bit(self) -> bool:
|
|
"""Prueft ob der erste Video-Stream 10-Bit ist"""
|
|
if self.video_streams:
|
|
return self.video_streams[0].is_10bit
|
|
return False
|
|
|
|
@property
|
|
def source_size_human(self) -> tuple[float, str]:
|
|
"""Menschenlesbare Groesse"""
|
|
return self.format_size(self.source_size_bytes)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Serialisiert fuer WebSocket/API"""
|
|
size = self.source_size_human
|
|
return {
|
|
"source_path": self.source_path,
|
|
"source_dir": self.source_dir,
|
|
"source_filename": self.source_filename,
|
|
"source_extension": self.source_extension,
|
|
"source_size": [size[0], size[1]],
|
|
"source_duration": self.source_duration_sec,
|
|
"source_duration_human": self.format_time(self.source_duration_sec),
|
|
"source_frame_rate": self.frame_rate,
|
|
"source_frames_total": self.total_frames,
|
|
"video_streams": len(self.video_streams),
|
|
"audio_streams": [
|
|
{"index": a.index, "codec": a.codec_name,
|
|
"channels": a.channels, "layout": a.channel_layout,
|
|
"language": a.language}
|
|
for a in self.audio_streams
|
|
],
|
|
"subtitle_streams": [
|
|
{"index": s.index, "codec": s.codec_name,
|
|
"language": s.language}
|
|
for s in self.subtitle_streams
|
|
],
|
|
}
|
|
|
|
@staticmethod
|
|
def format_size(size_bytes: int) -> tuple[float, str]:
|
|
"""Konvertiert Bytes in menschenlesbare Groesse"""
|
|
units = ["B", "KiB", "MiB", "GiB", "TiB"]
|
|
size = float(size_bytes)
|
|
unit_idx = 0
|
|
while size >= 1024.0 and unit_idx < len(units) - 1:
|
|
size /= 1024.0
|
|
unit_idx += 1
|
|
return round(size, 1), units[unit_idx]
|
|
|
|
@staticmethod
|
|
def format_time(seconds: float) -> str:
|
|
"""Formatiert Sekunden in lesbares Format"""
|
|
if seconds <= 0:
|
|
return "0 Min"
|
|
days = int(seconds // 86400)
|
|
seconds %= 86400
|
|
hours = int(seconds // 3600)
|
|
seconds %= 3600
|
|
minutes = math.ceil(seconds / 60)
|
|
|
|
parts = []
|
|
if days:
|
|
parts.append(f"{days} Tage")
|
|
if hours:
|
|
parts.append(f"{hours} Std")
|
|
if minutes:
|
|
parts.append(f"{minutes} Min")
|
|
return " ".join(parts) if parts else "< 1 Min"
|
|
|
|
@staticmethod
|
|
def time_to_seconds(time_str: str) -> float:
|
|
"""Konvertiert HH:MM:SS oder Sekunden-String in float"""
|
|
parts = time_str.split(":")
|
|
if len(parts) == 1:
|
|
return float(parts[0])
|
|
if len(parts) == 3:
|
|
h, m, s = map(float, parts)
|
|
return h * 3600 + m * 60 + s
|
|
return 0.0
|