Kompletter Video-Konverter mit Web-UI, GPU-Beschleunigung (Intel VAAPI), Video-Bibliothek mit Serien/Film-Erkennung und TVDB-Integration. Features: - AV1/HEVC/H.264 Encoding (GPU + CPU) - Video-Bibliothek mit ffprobe-Analyse und Filtern - TVDB-Integration mit Review-Modal und Sprachkonfiguration - Film-Scanning und TVDB-Zuordnung - Import- und Clean-Service (Grundgeruest) - WebSocket Live-Updates, Queue-Management - Docker mit GPU/CPU-Profilen Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
149 lines
5.2 KiB
Python
149 lines
5.2 KiB
Python
"""Rekursives Ordner-Scanning und Cleanup-Service"""
|
|
import os
|
|
import logging
|
|
import fnmatch
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from app.config import Config
|
|
|
|
|
|
# Sicherheits-Blacklist: Diese Pfade duerfen NIE bereinigt werden
|
|
_PROTECTED_PATHS = {"/", "/home", "/root", "/mnt", "/media", "/tmp", "/var", "/etc"}
|
|
|
|
|
|
class ScannerService:
|
|
"""Scannt Ordner nach Videodateien und fuehrt optionalen Cleanup durch"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
|
|
def scan_path(self, path: str, recursive: Optional[bool] = None) -> list[str]:
|
|
"""
|
|
Scannt einen Pfad nach Videodateien.
|
|
- Einzelne Datei: Gibt [path] zurueck wenn gueltige Extension
|
|
- Ordner: Scannt nach konfigurierten Extensions
|
|
"""
|
|
path = path.strip()
|
|
if not path or not os.path.exists(path):
|
|
logging.warning(f"Pfad nicht gefunden: {path}")
|
|
return []
|
|
|
|
files_cfg = self.config.files_config
|
|
extensions = set(files_cfg.get("scan_extensions", []))
|
|
|
|
# Einzelne Datei
|
|
if os.path.isfile(path):
|
|
ext = os.path.splitext(path)[1].lower()
|
|
if ext in extensions:
|
|
return [path]
|
|
else:
|
|
logging.warning(f"Dateiendung {ext} nicht in scan_extensions: {path}")
|
|
return []
|
|
|
|
# Ordner scannen
|
|
if os.path.isdir(path):
|
|
use_recursive = recursive if recursive is not None else \
|
|
files_cfg.get("recursive_scan", True)
|
|
return self._scan_directory(path, use_recursive, extensions)
|
|
|
|
return []
|
|
|
|
def _scan_directory(self, directory: str, recursive: bool,
|
|
extensions: set) -> list[str]:
|
|
"""Scannt Ordner nach Video-Extensions"""
|
|
results = []
|
|
|
|
if recursive:
|
|
for root, dirs, files in os.walk(directory):
|
|
for f in files:
|
|
if Path(f).suffix.lower() in extensions:
|
|
results.append(os.path.join(root, f))
|
|
else:
|
|
try:
|
|
for f in os.listdir(directory):
|
|
full = os.path.join(directory, f)
|
|
if os.path.isfile(full) and Path(f).suffix.lower() in extensions:
|
|
results.append(full)
|
|
except PermissionError:
|
|
logging.error(f"Keine Leseberechtigung: {directory}")
|
|
|
|
logging.info(f"Scan: {len(results)} Dateien in {directory} "
|
|
f"(rekursiv={recursive})")
|
|
return sorted(results)
|
|
|
|
def cleanup_directory(self, directory: str) -> list[str]:
|
|
"""
|
|
Loescht konfigurierte Datei-Typen aus einem Verzeichnis.
|
|
Ausfuehrliche Sicherheits-Checks!
|
|
"""
|
|
cleanup_cfg = self.config.cleanup_config
|
|
|
|
# Check 1: Cleanup aktiviert?
|
|
if not cleanup_cfg.get("enabled", False):
|
|
return []
|
|
|
|
# Check 2: Gueltiger Pfad?
|
|
if not os.path.isdir(directory):
|
|
return []
|
|
|
|
# Check 3: Geschuetzter Pfad?
|
|
abs_path = os.path.abspath(directory)
|
|
if abs_path in _PROTECTED_PATHS:
|
|
logging.warning(f"Geschuetzter Pfad, Cleanup uebersprungen: {abs_path}")
|
|
return []
|
|
|
|
# Check 4: Ordner muss Videodateien enthalten (oder enthalten haben)
|
|
if not self._has_video_files(directory):
|
|
logging.debug(f"Keine Videodateien im Ordner, Cleanup uebersprungen: "
|
|
f"{directory}")
|
|
return []
|
|
|
|
delete_extensions = set(cleanup_cfg.get("delete_extensions", []))
|
|
deleted = []
|
|
|
|
# NUR im angegebenen Ordner, NICHT rekursiv
|
|
try:
|
|
for f in os.listdir(directory):
|
|
full_path = os.path.join(directory, f)
|
|
if not os.path.isfile(full_path):
|
|
continue
|
|
|
|
ext = os.path.splitext(f)[1].lower()
|
|
if ext not in delete_extensions:
|
|
continue
|
|
|
|
if self._is_excluded(f):
|
|
continue
|
|
|
|
try:
|
|
os.remove(full_path)
|
|
deleted.append(full_path)
|
|
logging.info(f"Cleanup: Geloescht {full_path}")
|
|
except OSError as e:
|
|
logging.error(f"Cleanup: Loeschen fehlgeschlagen {full_path}: {e}")
|
|
|
|
except PermissionError:
|
|
logging.error(f"Keine Berechtigung fuer Cleanup: {directory}")
|
|
|
|
if deleted:
|
|
logging.info(f"Cleanup: {len(deleted)} Dateien in {directory} geloescht")
|
|
|
|
return deleted
|
|
|
|
def _is_excluded(self, filename: str) -> bool:
|
|
"""Prueft ob Dateiname auf exclude_patterns matcht"""
|
|
patterns = self.config.cleanup_config.get("exclude_patterns", [])
|
|
return any(
|
|
fnmatch.fnmatch(filename.lower(), p.lower()) for p in patterns
|
|
)
|
|
|
|
def _has_video_files(self, directory: str) -> bool:
|
|
"""Prueft ob Ordner mindestens eine Videodatei enthaelt"""
|
|
extensions = set(self.config.files_config.get("scan_extensions", []))
|
|
try:
|
|
for f in os.listdir(directory):
|
|
if os.path.splitext(f)[1].lower() in extensions:
|
|
return True
|
|
except PermissionError:
|
|
pass
|
|
return False
|