docker.videokonverter/video-konverter/app/services/queue.py
data 4f151de78c feat: VideoKonverter v4.2 - TV Admin-Center, HLS-Streaming, Startseiten-Rubriken
- TV Admin-Center (/tv-admin): HLS-Settings, Session-Monitoring, User-Verwaltung
- HLS-Streaming: ffmpeg .ts-Segmente, hls.js, GPU VAAPI, SIGSTOP/SIGCONT
- Startseite: Rubriken (Weiterschauen, Neu, Serien, Filme, Schon gesehen)
- User-Settings: Startseiten-Rubriken konfigurierbar, Watch-Threshold
- UI: Amber/Gold Accent-Farbe, Focus-Ring-Fix, Player-Buttons einheitlich
- Cache-Busting: ?v= Timestamp auf allen CSS/JS Includes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:57:48 +01:00

671 lines
27 KiB
Python

"""Job-Queue mit Persistierung und paralleler Ausfuehrung"""
import asyncio
import json
import logging
import os
import signal
import time
from collections import OrderedDict
from decimal import Decimal
from typing import Optional, TYPE_CHECKING
import aiomysql
from app.config import Config
from app.models.job import ConversionJob, JobStatus
from app.models.media import MediaFile
from app.services.encoder import EncoderService
from app.services.probe import ProbeService
from app.services.progress import ProgressParser
from app.services.scanner import ScannerService
if TYPE_CHECKING:
from app.routes.ws import WebSocketManager
class QueueService:
"""Verwaltet die Konvertierungs-Queue mit Persistierung"""
def __init__(self, config: Config, ws_manager: 'WebSocketManager'):
self.config = config
self.ws_manager = ws_manager
self.encoder = EncoderService(config)
self.scanner = ScannerService(config)
self.jobs: OrderedDict[int, ConversionJob] = OrderedDict()
self._active_count: int = 0
self._running: bool = False
self._paused: bool = False
self._encoding_suspended: bool = False
self._queue_task: Optional[asyncio.Task] = None
self._queue_file = str(config.data_path / "queue.json")
self._db_pool: Optional[aiomysql.Pool] = None
async def start(self) -> None:
"""Startet den Queue-Worker und initialisiert die Datenbank"""
await self._init_db()
pending = self._load_queue()
self._running = True
self._queue_task = asyncio.create_task(self._process_loop())
logging.info(
f"Queue gestartet ({len(self.jobs)} Jobs geladen, "
f"max {self.config.max_parallel_jobs} parallel)"
)
# Gespeicherte Jobs asynchron wieder einreihen (mit allen Optionen)
if pending:
asyncio.create_task(self._restore_jobs(pending))
async def _restore_jobs(self, job_data: list[dict]) -> None:
"""Stellt Jobs aus gespeicherten Daten wieder her"""
for item in job_data:
media = await ProbeService.analyze(item["source_path"])
if media:
await self.add_job(
media,
preset_name=item.get("preset_name"),
delete_source=item.get("delete_source", False)
)
async def stop(self) -> None:
"""Stoppt den Queue-Worker"""
self._running = False
if self._queue_task:
self._queue_task.cancel()
try:
await self._queue_task
except asyncio.CancelledError:
pass
if self._db_pool is not None:
self._db_pool.close()
await self._db_pool.wait_closed()
logging.info("Queue gestoppt")
async def add_job(self, media: MediaFile,
preset_name: Optional[str] = None,
delete_source: bool = False) -> Optional[ConversionJob]:
"""Fuegt neuen Job zur Queue hinzu"""
if self._is_duplicate(media.source_path):
logging.info(f"Duplikat uebersprungen: {media.source_filename}")
return None
if not preset_name:
preset_name = self.config.default_preset_name
job = ConversionJob(media=media, preset_name=preset_name)
job.delete_source = delete_source
job.build_target_path(self.config)
self.jobs[job.id] = job
self._save_queue()
logging.info(
f"Job hinzugefuegt: {media.source_filename} "
f"-> {job.target_filename} (Preset: {preset_name})"
f"{' [delete_source]' if delete_source else ''}"
)
await self.ws_manager.broadcast_queue_update()
return job
async def add_paths(self, paths: list[str],
preset_name: Optional[str] = None,
recursive: Optional[bool] = None,
delete_source: bool = False) -> list[ConversionJob]:
"""Fuegt mehrere Pfade hinzu (Dateien und Ordner)"""
jobs = []
all_files = []
for path in paths:
path = path.strip()
if not path:
continue
scanned = self.scanner.scan_path(path, recursive)
all_files.extend(scanned)
logging.info(f"{len(all_files)} Dateien aus {len(paths)} Pfaden gefunden")
for file_path in all_files:
media = await ProbeService.analyze(file_path)
if media:
job = await self.add_job(media, preset_name, delete_source)
if job:
jobs.append(job)
return jobs
async def remove_job(self, job_id: int) -> bool:
"""Entfernt Job aus Queue, bricht laufende Konvertierung ab"""
job = self.jobs.get(job_id)
if not job:
return False
if job.status == JobStatus.ACTIVE and job.process:
try:
job.process.terminate()
await asyncio.sleep(0.5)
if job.process.returncode is None:
job.process.kill()
except ProcessLookupError:
pass
if job.task and not job.task.done():
job.task.cancel()
del self.jobs[job_id]
self._save_queue()
await self.ws_manager.broadcast_queue_update()
logging.info(f"Job entfernt: {job.media.source_filename}")
return True
async def cancel_job(self, job_id: int) -> bool:
"""Bricht laufenden Job ab"""
job = self.jobs.get(job_id)
if not job or job.status != JobStatus.ACTIVE:
return False
if job.process:
try:
job.process.terminate()
except ProcessLookupError:
pass
job.status = JobStatus.CANCELLED
job.finished_at = time.time()
self._active_count = max(0, self._active_count - 1)
self._save_queue()
await self.ws_manager.broadcast_queue_update()
logging.info(f"Job abgebrochen: {job.media.source_filename}")
return True
async def pause_queue(self) -> bool:
"""Pausiert die Queue - laufende Jobs werden fertig, keine neuen gestartet"""
if self._paused:
return False
self._paused = True
logging.info("Queue pausiert - keine neuen Jobs werden gestartet")
await self.ws_manager.broadcast_queue_update()
return True
async def resume_queue(self) -> bool:
"""Setzt die Queue fort"""
if not self._paused:
return False
self._paused = False
logging.info("Queue fortgesetzt")
await self.ws_manager.broadcast_queue_update()
return True
@property
def is_paused(self) -> bool:
return self._paused
@property
def encoding_suspended(self) -> bool:
"""Sind aktive ffmpeg-Prozesse gerade per SIGSTOP eingefroren?"""
return self._encoding_suspended
def suspend_encoding(self) -> int:
"""Friert alle aktiven ffmpeg-Konvertierungen per SIGSTOP ein.
Wird aufgerufen wenn ein HLS-Stream startet, damit der Server
volle Ressourcen fuers Streaming hat.
Gibt die Anzahl pausierter Prozesse zurueck."""
count = 0
for job in self.jobs.values():
if (job.status == JobStatus.ACTIVE and job.process
and job.process.returncode is None):
try:
os.kill(job.process.pid, signal.SIGSTOP)
count += 1
except (ProcessLookupError, PermissionError) as e:
logging.warning(f"SIGSTOP fehlgeschlagen fuer PID "
f"{job.process.pid}: {e}")
if count:
self._encoding_suspended = True
logging.info(f"Encoding pausiert: {count} ffmpeg-Prozess(e) "
f"per SIGSTOP eingefroren (HLS-Stream aktiv)")
return count
def resume_encoding(self) -> int:
"""Setzt alle eingefrorenen ffmpeg-Konvertierungen per SIGCONT fort.
Wird aufgerufen wenn der letzte HLS-Stream endet.
Gibt die Anzahl fortgesetzter Prozesse zurueck."""
count = 0
for job in self.jobs.values():
if (job.status == JobStatus.ACTIVE and job.process
and job.process.returncode is None):
try:
os.kill(job.process.pid, signal.SIGCONT)
count += 1
except (ProcessLookupError, PermissionError) as e:
logging.warning(f"SIGCONT fehlgeschlagen fuer PID "
f"{job.process.pid}: {e}")
if count:
logging.info(f"Encoding fortgesetzt: {count} ffmpeg-Prozess(e) "
f"per SIGCONT aufgeweckt")
self._encoding_suspended = False
return count
async def retry_job(self, job_id: int) -> bool:
"""Setzt fehlgeschlagenen Job zurueck auf QUEUED"""
job = self.jobs.get(job_id)
if not job or job.status not in (JobStatus.FAILED, JobStatus.CANCELLED):
return False
job.status = JobStatus.QUEUED
job.progress_percent = 0.0
job.progress_frames = 0
job.progress_fps = 0.0
job.progress_speed = 0.0
job.progress_size_bytes = 0
job.progress_time_sec = 0.0
job.progress_eta_sec = 0.0
job.started_at = None
job.finished_at = None
job._stat_fps = [0.0, 0]
job._stat_speed = [0.0, 0]
job._stat_bitrate = [0, 0]
self._save_queue()
await self.ws_manager.broadcast_queue_update()
logging.info(f"Job wiederholt: {job.media.source_filename}")
return True
def get_queue_state(self) -> dict:
"""Queue-Status fuer WebSocket"""
queue = {}
for job_id, job in self.jobs.items():
if job.status in (JobStatus.QUEUED, JobStatus.ACTIVE,
JobStatus.FAILED, JobStatus.CANCELLED):
queue[job_id] = job.to_dict_queue()
return {"data_queue": queue, "queue_paused": self._paused,
"encoding_suspended": self._encoding_suspended}
def get_active_jobs(self) -> dict:
"""Aktive Jobs fuer WebSocket"""
active = {}
for job_id, job in self.jobs.items():
if job.status == JobStatus.ACTIVE:
active[job_id] = job.to_dict_active()
return {"data_convert": active}
def get_all_jobs(self) -> list[dict]:
"""Alle Jobs als Liste fuer API"""
return [
{"id": jid, **job.to_dict_active(), "status_name": job.status.name}
for jid, job in self.jobs.items()
]
# --- Interner Queue-Worker ---
async def _process_loop(self) -> None:
"""Hauptschleife: Startet neue Jobs wenn Kapazitaet frei"""
while self._running:
try:
if (not self._paused and not self._encoding_suspended
and self._active_count < self.config.max_parallel_jobs):
next_job = self._get_next_queued()
if next_job:
asyncio.create_task(self._execute_job(next_job))
except Exception as e:
logging.error(f"Queue-Worker Fehler: {e}")
await asyncio.sleep(0.5)
async def _execute_job(self, job: ConversionJob) -> None:
"""Fuehrt einzelnen Konvertierungs-Job aus"""
self._active_count += 1
job.status = JobStatus.ACTIVE
job.started_at = time.time()
await self.ws_manager.broadcast_queue_update()
command = self.encoder.build_command(job)
logging.info(
f"Starte Konvertierung: {job.media.source_filename}\n"
f" Befehl: {' '.join(command)}"
)
try:
job.process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
progress = ProgressParser(self.ws_manager.broadcast_progress)
await progress.monitor(job)
await job.process.wait()
if job.process.returncode == 0:
job.status = JobStatus.FINISHED
# Tatsaechliche Dateigroesse von Disk lesen
if os.path.exists(job.target_path):
job.progress_size_bytes = os.path.getsize(job.target_path)
logging.info(
f"Konvertierung abgeschlossen: {job.media.source_filename} "
f"({MediaFile.format_time(time.time() - job.started_at)})"
)
await self._post_conversion_cleanup(job)
else:
job.status = JobStatus.FAILED
error_output = progress.get_error_output()
logging.error(
f"Konvertierung fehlgeschlagen (Code {job.process.returncode}): "
f"{job.media.source_filename}\n"
f" ffmpeg stderr:\n{error_output}"
)
except asyncio.CancelledError:
job.status = JobStatus.CANCELLED
logging.info(f"Konvertierung abgebrochen: {job.media.source_filename}")
except Exception as e:
job.status = JobStatus.FAILED
logging.error(f"Fehler bei Konvertierung: {e}")
finally:
job.finished_at = time.time()
self._active_count = max(0, self._active_count - 1)
self._save_queue()
await self._save_stats(job)
await self.ws_manager.broadcast_queue_update()
# Library-Seite zum Reload auffordern (Badges aktualisieren)
if job.status == JobStatus.FINISHED:
await self.ws_manager.broadcast({
"data_library_scan": {
"status": "idle", "current": "",
"total": 0, "done": 0
}
})
async def _post_conversion_cleanup(self, job: ConversionJob) -> None:
"""Cleanup nach erfolgreicher Konvertierung.
WICHTIG: Nur die Quelldatei dieses Jobs loeschen, NICHT
andere Dateien im Ordner die noch in der Queue warten!"""
files_cfg = self.config.files_config
# Quelldatei loeschen: Global per Config ODER per Job-Option
should_delete = files_cfg.get("delete_source", False) or \
job.delete_source
if should_delete:
target_exists = os.path.exists(job.target_path)
target_size = (os.path.getsize(job.target_path)
if target_exists else 0)
if target_exists and target_size > 0:
try:
os.remove(job.media.source_path)
logging.info(
f"Quelldatei geloescht: {job.media.source_path}")
except OSError as e:
logging.error(
f"Quelldatei loeschen fehlgeschlagen: {e}")
else:
logging.warning(
f"Quelldatei NICHT geloescht "
f"(Zieldatei fehlt/leer): "
f"{job.media.source_path}")
# SICHERHEIT: Ordner-Cleanup nur wenn KEINE weiteren
# Jobs aus diesem Ordner in der Queue warten!
cleanup_cfg = self.config.cleanup_config
if cleanup_cfg.get("enabled", False):
source_dir = job.media.source_dir
pending = [
j for j in self.jobs.values()
if j.media.source_dir == source_dir
and j.status in (JobStatus.QUEUED, JobStatus.ACTIVE)
and j.id != job.id
]
if pending:
logging.info(
f"Ordner-Cleanup uebersprungen "
f"({len(pending)} Jobs wartend): {source_dir}")
else:
deleted = self.scanner.cleanup_directory(source_dir)
if deleted:
logging.info(
f"{len(deleted)} Dateien bereinigt "
f"in {source_dir}"
)
def _get_next_queued(self) -> Optional[ConversionJob]:
"""Naechster Job mit Status QUEUED (FIFO)"""
for job in self.jobs.values():
if job.status == JobStatus.QUEUED:
return job
return None
def _is_duplicate(self, source_path: str) -> bool:
"""Prueft ob Pfad bereits in Queue (nur aktive/wartende)"""
for job in self.jobs.values():
if (job.media.source_path == source_path and
job.status in (JobStatus.QUEUED, JobStatus.ACTIVE)):
return True
return False
def _save_queue(self) -> None:
"""Persistiert Queue nach queue.json"""
queue_data = []
for job in self.jobs.values():
if job.status in (JobStatus.QUEUED, JobStatus.FAILED):
queue_data.append(job.to_json())
try:
with open(self._queue_file, "w", encoding="utf-8") as f:
json.dump(queue_data, f, indent=2)
except Exception as e:
logging.error(f"Queue speichern fehlgeschlagen: {e}")
def _load_queue(self) -> list[dict]:
"""Laedt Queue aus queue.json, gibt Job-Daten zurueck"""
if not os.path.exists(self._queue_file):
return []
try:
with open(self._queue_file, "r", encoding="utf-8") as f:
queue_data = json.load(f)
pending = [
{
"source_path": item["source_path"],
"preset_name": item.get("preset_name"),
"delete_source": item.get("delete_source", False),
}
for item in queue_data
if item.get("status", 0) == JobStatus.QUEUED
]
if pending:
logging.info(f"{len(pending)} Jobs aus Queue geladen")
return pending
except Exception as e:
logging.error(f"Queue laden fehlgeschlagen: {e}")
return []
# --- MariaDB Statistik-Datenbank ---
def _get_db_config(self) -> dict:
"""DB-Konfiguration aus Settings"""
db_cfg = self.config.settings.get("database", {})
return {
"host": db_cfg.get("host", "192.168.155.11"),
"port": db_cfg.get("port", 3306),
"user": db_cfg.get("user", "video"),
"password": db_cfg.get("password", "8715"),
"db": db_cfg.get("database", "video_converter"),
}
async def _get_pool(self) -> Optional[aiomysql.Pool]:
"""Gibt den Connection-Pool zurueck, erstellt ihn bei Bedarf"""
if self._db_pool is not None:
return self._db_pool
db_cfg = self._get_db_config()
self._db_pool = await aiomysql.create_pool(
host=db_cfg["host"],
port=db_cfg["port"],
user=db_cfg["user"],
password=db_cfg["password"],
db=db_cfg["db"],
charset="utf8mb4",
autocommit=True,
minsize=1,
maxsize=5,
connect_timeout=10,
)
return self._db_pool
async def _init_db(self) -> None:
"""Erstellt MariaDB-Tabelle falls nicht vorhanden"""
db_cfg = self._get_db_config()
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
CREATE TABLE IF NOT EXISTS conversions (
id INT AUTO_INCREMENT PRIMARY KEY,
source_path VARCHAR(1024) NOT NULL,
source_filename VARCHAR(512) NOT NULL,
source_size_bytes BIGINT,
source_duration_sec DOUBLE,
source_frame_rate DOUBLE,
source_frames_total INT,
target_path VARCHAR(1024),
target_filename VARCHAR(512),
target_size_bytes BIGINT,
target_container VARCHAR(10),
preset_name VARCHAR(64),
status INT,
started_at DOUBLE,
finished_at DOUBLE,
duration_sec DOUBLE,
avg_fps DOUBLE,
avg_speed DOUBLE,
avg_bitrate INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_created_at (created_at),
INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
logging.info(
f"MariaDB verbunden: {db_cfg['host']}:{db_cfg['port']}/"
f"{db_cfg['db']}"
)
except Exception as e:
logging.error(
f"MariaDB Initialisierung fehlgeschlagen "
f"({db_cfg['host']}:{db_cfg['port']}): {e}"
)
logging.warning("Statistiken werden ohne Datenbank ausgefuehrt")
self._db_pool = None
async def _save_stats(self, job: ConversionJob) -> None:
"""Speichert Konvertierungs-Ergebnis in MariaDB"""
if self._db_pool is None:
return
stats = job.to_dict_stats()
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
INSERT INTO conversions (
source_path, source_filename, source_size_bytes,
source_duration_sec, source_frame_rate, source_frames_total,
target_path, target_filename, target_size_bytes,
target_container, preset_name, status,
started_at, finished_at, duration_sec,
avg_fps, avg_speed, avg_bitrate
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
stats["source_path"], stats["source_filename"],
stats["source_size_bytes"], stats["source_duration_sec"],
stats["source_frame_rate"], stats["source_frames_total"],
stats["target_path"], stats["target_filename"],
stats["target_size_bytes"], stats["target_container"],
stats["preset_name"], stats["status"],
stats["started_at"], stats["finished_at"],
stats["duration_sec"], stats["avg_fps"],
stats["avg_speed"], stats["avg_bitrate"],
))
# Max-Eintraege bereinigen
max_entries = self.config.settings.get(
"statistics", {}
).get("max_entries", 5000)
await cur.execute(
"DELETE FROM conversions WHERE id NOT IN ("
"SELECT id FROM (SELECT id FROM conversions "
"ORDER BY created_at DESC LIMIT %s) AS tmp)",
(max_entries,)
)
except Exception as e:
logging.error(f"Statistik speichern fehlgeschlagen: {e}")
async def get_statistics(self, limit: int = 50,
offset: int = 0) -> list[dict]:
"""Liest Statistiken aus MariaDB"""
if self._db_pool is None:
return []
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM conversions ORDER BY created_at DESC "
"LIMIT %s OFFSET %s",
(limit, offset),
)
rows = await cur.fetchall()
# MariaDB-Typen JSON-kompatibel machen
result = []
for row in rows:
entry = {}
for k, v in row.items():
if isinstance(v, Decimal):
entry[k] = float(v)
elif hasattr(v, "isoformat"):
entry[k] = str(v)
else:
entry[k] = v
result.append(entry)
return result
except Exception as e:
logging.error(f"Statistik lesen fehlgeschlagen: {e}")
return []
async def get_statistics_summary(self) -> dict:
"""Zusammenfassung der Statistiken"""
if self._db_pool is None:
return {}
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END),
SUM(source_size_bytes),
SUM(target_size_bytes),
SUM(duration_sec),
AVG(avg_fps),
AVG(avg_speed)
FROM conversions
""")
row = await cur.fetchone()
if row:
# Decimal -> float/int fuer JSON
def _n(v, as_int=False):
v = v or 0
return int(v) if as_int else float(v)
return {
"total": _n(row[0], True),
"finished": _n(row[1], True),
"failed": _n(row[2], True),
"total_source_size": _n(row[3], True),
"total_target_size": _n(row[4], True),
"space_saved": _n(row[3], True) - _n(row[4], True),
"total_duration": _n(row[5]),
"avg_fps": round(float(row[6] or 0), 1),
"avg_speed": round(float(row[7] or 0), 2),
}
return {}
except Exception as e:
logging.error(f"Statistik-Zusammenfassung fehlgeschlagen: {e}")
return {}