docker.videokonverter/video-konverter/app/services/scanner.py
data 37dff4de69 feat: VideoKonverter v2.9 - Projekt-Reset aus Docker-Image
Projekt aus Docker-Image videoconverter:2.9 extrahiert.
Enthält zweiphasigen Import-Workflow mit Serien-Zuordnung.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-27 11:41:48 +01:00

149 lines
5.2 KiB
Python

"""Rekursives Ordner-Scanning und Cleanup-Service"""
import os
import logging
import fnmatch
from pathlib import Path
from typing import Optional
from app.config import Config
# Sicherheits-Blacklist: Diese Pfade duerfen NIE bereinigt werden
_PROTECTED_PATHS = {"/", "/home", "/root", "/mnt", "/media", "/tmp", "/var", "/etc"}
class ScannerService:
"""Scannt Ordner nach Videodateien und fuehrt optionalen Cleanup durch"""
def __init__(self, config: Config):
self.config = config
def scan_path(self, path: str, recursive: Optional[bool] = None) -> list[str]:
"""
Scannt einen Pfad nach Videodateien.
- Einzelne Datei: Gibt [path] zurueck wenn gueltige Extension
- Ordner: Scannt nach konfigurierten Extensions
"""
path = path.strip()
if not path or not os.path.exists(path):
logging.warning(f"Pfad nicht gefunden: {path}")
return []
files_cfg = self.config.files_config
extensions = set(files_cfg.get("scan_extensions", []))
# Einzelne Datei
if os.path.isfile(path):
ext = os.path.splitext(path)[1].lower()
if ext in extensions:
return [path]
else:
logging.warning(f"Dateiendung {ext} nicht in scan_extensions: {path}")
return []
# Ordner scannen
if os.path.isdir(path):
use_recursive = recursive if recursive is not None else \
files_cfg.get("recursive_scan", True)
return self._scan_directory(path, use_recursive, extensions)
return []
def _scan_directory(self, directory: str, recursive: bool,
extensions: set) -> list[str]:
"""Scannt Ordner nach Video-Extensions"""
results = []
if recursive:
for root, dirs, files in os.walk(directory):
for f in files:
if Path(f).suffix.lower() in extensions:
results.append(os.path.join(root, f))
else:
try:
for f in os.listdir(directory):
full = os.path.join(directory, f)
if os.path.isfile(full) and Path(f).suffix.lower() in extensions:
results.append(full)
except PermissionError:
logging.error(f"Keine Leseberechtigung: {directory}")
logging.info(f"Scan: {len(results)} Dateien in {directory} "
f"(rekursiv={recursive})")
return sorted(results)
def cleanup_directory(self, directory: str) -> list[str]:
"""
Loescht konfigurierte Datei-Typen aus einem Verzeichnis.
Ausfuehrliche Sicherheits-Checks!
"""
cleanup_cfg = self.config.cleanup_config
# Check 1: Cleanup aktiviert?
if not cleanup_cfg.get("enabled", False):
return []
# Check 2: Gueltiger Pfad?
if not os.path.isdir(directory):
return []
# Check 3: Geschuetzter Pfad?
abs_path = os.path.abspath(directory)
if abs_path in _PROTECTED_PATHS:
logging.warning(f"Geschuetzter Pfad, Cleanup uebersprungen: {abs_path}")
return []
# Check 4: Ordner muss Videodateien enthalten (oder enthalten haben)
if not self._has_video_files(directory):
logging.debug(f"Keine Videodateien im Ordner, Cleanup uebersprungen: "
f"{directory}")
return []
delete_extensions = set(cleanup_cfg.get("delete_extensions", []))
deleted = []
# NUR im angegebenen Ordner, NICHT rekursiv
try:
for f in os.listdir(directory):
full_path = os.path.join(directory, f)
if not os.path.isfile(full_path):
continue
ext = os.path.splitext(f)[1].lower()
if ext not in delete_extensions:
continue
if self._is_excluded(f):
continue
try:
os.remove(full_path)
deleted.append(full_path)
logging.info(f"Cleanup: Geloescht {full_path}")
except OSError as e:
logging.error(f"Cleanup: Loeschen fehlgeschlagen {full_path}: {e}")
except PermissionError:
logging.error(f"Keine Berechtigung fuer Cleanup: {directory}")
if deleted:
logging.info(f"Cleanup: {len(deleted)} Dateien in {directory} geloescht")
return deleted
def _is_excluded(self, filename: str) -> bool:
"""Prueft ob Dateiname auf exclude_patterns matcht"""
patterns = self.config.cleanup_config.get("exclude_patterns", [])
return any(
fnmatch.fnmatch(filename.lower(), p.lower()) for p in patterns
)
def _has_video_files(self, directory: str) -> bool:
"""Prueft ob Ordner mindestens eine Videodatei enthaelt"""
extensions = set(self.config.files_config.get("scan_extensions", []))
try:
for f in os.listdir(directory):
if os.path.splitext(f)[1].lower() in extensions:
return True
except PermissionError:
pass
return False