docker.videokonverter/video-konverter/app/services/probe.py
data 37dff4de69 feat: VideoKonverter v2.9 - Projekt-Reset aus Docker-Image
Projekt aus Docker-Image videoconverter:2.9 extrahiert.
Enthält zweiphasigen Import-Workflow mit Serien-Zuordnung.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-27 11:41:48 +01:00

177 lines
6.5 KiB
Python

"""Asynchrone ffprobe Media-Analyse"""
import asyncio
import json
import logging
import os
from typing import Optional
from app.models.media import MediaFile, VideoStream, AudioStream, SubtitleStream
class ProbeService:
"""ffprobe-basierte Media-Analyse - vollstaendig asynchron"""
@staticmethod
async def analyze(file_path: str) -> Optional[MediaFile]:
"""
Analysiert eine Mediendatei mit ffprobe.
Fuehrt 3 Aufrufe parallel aus (Video/Audio/Subtitle).
Gibt None zurueck bei Fehler.
"""
if not os.path.exists(file_path):
logging.error(f"Datei nicht gefunden: {file_path}")
return None
try:
# Alle drei Stream-Typen parallel abfragen
video_task = ProbeService._probe_streams(file_path, "v")
audio_task = ProbeService._probe_streams(file_path, "a")
subtitle_task = ProbeService._probe_streams(file_path, "s")
video_data, audio_data, subtitle_data = await asyncio.gather(
video_task, audio_task, subtitle_task
)
# Streams parsen
video_streams = ProbeService._parse_video_streams(video_data)
audio_streams = ProbeService._parse_audio_streams(audio_data)
subtitle_streams = ProbeService._parse_subtitle_streams(subtitle_data)
# Format-Informationen aus Video-Abfrage
size_bytes, duration_sec, bitrate = ProbeService._parse_format(video_data)
media = MediaFile(
source_path=file_path,
source_size_bytes=size_bytes,
source_duration_sec=duration_sec,
source_bitrate=bitrate,
video_streams=video_streams,
audio_streams=audio_streams,
subtitle_streams=subtitle_streams,
)
logging.info(
f"Analysiert: {media.source_filename} "
f"({media.source_size_human[0]} {media.source_size_human[1]}, "
f"{MediaFile.format_time(duration_sec)}, "
f"{len(video_streams)}V/{len(audio_streams)}A/{len(subtitle_streams)}S)"
)
return media
except Exception as e:
logging.error(f"ffprobe Analyse fehlgeschlagen fuer {file_path}: {e}")
return None
@staticmethod
async def _probe_streams(file_path: str, stream_type: str) -> dict:
"""
Einzelner ffprobe-Aufruf (async).
stream_type: 'v' (Video), 'a' (Audio), 's' (Subtitle)
"""
command = [
"ffprobe", "-v", "error",
"-select_streams", stream_type,
"-show_entries",
"stream=index,channels,codec_name,codec_type,pix_fmt,level,"
"r_frame_rate,bit_rate,sample_rate,width,height"
":stream_tags=language",
"-show_entries",
"format=size,bit_rate,nb_streams,duration",
"-of", "json",
file_path,
]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
logging.warning(
f"ffprobe Fehler (stream={stream_type}): "
f"{stderr.decode(errors='replace').strip()}"
)
return {"streams": [], "format": {}}
try:
return json.loads(stdout.decode())
except json.JSONDecodeError:
logging.error(f"ffprobe JSON-Parsing fehlgeschlagen (stream={stream_type})")
return {"streams": [], "format": {}}
@staticmethod
def _parse_video_streams(data: dict) -> list[VideoStream]:
"""Parst ffprobe JSON in VideoStream-Objekte"""
streams = []
for s in data.get("streams", []):
# Framerate berechnen (z.B. "24000/1001" -> 23.976)
fr_str = s.get("r_frame_rate", "0/1")
parts = fr_str.split("/")
if len(parts) == 2 and int(parts[1]) > 0:
frame_rate = round(int(parts[0]) / int(parts[1]), 3)
else:
frame_rate = 0.0
streams.append(VideoStream(
index=s.get("index", 0),
codec_name=s.get("codec_name", "unknown"),
width=s.get("width", 0),
height=s.get("height", 0),
pix_fmt=s.get("pix_fmt", ""),
frame_rate=frame_rate,
level=s.get("level"),
bit_rate=int(s["bit_rate"]) if s.get("bit_rate") else None,
))
return streams
@staticmethod
def _parse_audio_streams(data: dict) -> list[AudioStream]:
"""Parst ffprobe JSON in AudioStream-Objekte"""
streams = []
for s in data.get("streams", []):
language = s.get("tags", {}).get("language")
streams.append(AudioStream(
index=s.get("index", 0),
codec_name=s.get("codec_name", "unknown"),
channels=s.get("channels", 2),
sample_rate=int(s.get("sample_rate", 48000)),
language=language,
bit_rate=int(s["bit_rate"]) if s.get("bit_rate") else None,
))
return streams
@staticmethod
def _parse_subtitle_streams(data: dict) -> list[SubtitleStream]:
"""Parst ffprobe JSON in SubtitleStream-Objekte"""
streams = []
for s in data.get("streams", []):
language = s.get("tags", {}).get("language")
streams.append(SubtitleStream(
index=s.get("index", 0),
codec_name=s.get("codec_name", "unknown"),
language=language,
))
return streams
@staticmethod
def _parse_format(data: dict) -> tuple[int, float, int]:
"""Parst Format-Informationen: (size_bytes, duration_sec, bitrate)"""
fmt = data.get("format", {})
if isinstance(fmt, list):
fmt = fmt[0] if fmt else {}
size_bytes = int(fmt.get("size", 0))
bitrate = int(fmt.get("bit_rate", 0))
# Duration kann HH:MM:SS oder Sekunden sein
duration_raw = fmt.get("duration", "0")
try:
if ":" in str(duration_raw):
duration_sec = MediaFile.time_to_seconds(str(duration_raw))
else:
duration_sec = float(duration_raw)
except (ValueError, TypeError):
duration_sec = 0.0
return size_bytes, duration_sec, bitrate