"""Rekursives Ordner-Scanning und Cleanup-Service""" import os import logging import fnmatch from pathlib import Path from typing import Optional from app.config import Config # Sicherheits-Blacklist: Diese Pfade duerfen NIE bereinigt werden _PROTECTED_PATHS = {"/", "/home", "/root", "/mnt", "/media", "/tmp", "/var", "/etc"} class ScannerService: """Scannt Ordner nach Videodateien und fuehrt optionalen Cleanup durch""" def __init__(self, config: Config): self.config = config def scan_path(self, path: str, recursive: Optional[bool] = None) -> list[str]: """ Scannt einen Pfad nach Videodateien. - Einzelne Datei: Gibt [path] zurueck wenn gueltige Extension - Ordner: Scannt nach konfigurierten Extensions """ path = path.strip() if not path or not os.path.exists(path): logging.warning(f"Pfad nicht gefunden: {path}") return [] files_cfg = self.config.files_config extensions = set(files_cfg.get("scan_extensions", [])) # Einzelne Datei if os.path.isfile(path): ext = os.path.splitext(path)[1].lower() if ext in extensions: return [path] else: logging.warning(f"Dateiendung {ext} nicht in scan_extensions: {path}") return [] # Ordner scannen if os.path.isdir(path): use_recursive = recursive if recursive is not None else \ files_cfg.get("recursive_scan", True) return self._scan_directory(path, use_recursive, extensions) return [] def _scan_directory(self, directory: str, recursive: bool, extensions: set) -> list[str]: """Scannt Ordner nach Video-Extensions""" results = [] if recursive: for root, dirs, files in os.walk(directory): for f in files: if Path(f).suffix.lower() in extensions: results.append(os.path.join(root, f)) else: try: for f in os.listdir(directory): full = os.path.join(directory, f) if os.path.isfile(full) and Path(f).suffix.lower() in extensions: results.append(full) except PermissionError: logging.error(f"Keine Leseberechtigung: {directory}") logging.info(f"Scan: {len(results)} Dateien in {directory} " f"(rekursiv={recursive})") return sorted(results) def cleanup_directory(self, directory: str) -> list[str]: """ Loescht konfigurierte Datei-Typen aus einem Verzeichnis. Ausfuehrliche Sicherheits-Checks! """ cleanup_cfg = self.config.cleanup_config # Check 1: Cleanup aktiviert? if not cleanup_cfg.get("enabled", False): return [] # Check 2: Gueltiger Pfad? if not os.path.isdir(directory): return [] # Check 3: Geschuetzter Pfad? abs_path = os.path.abspath(directory) if abs_path in _PROTECTED_PATHS: logging.warning(f"Geschuetzter Pfad, Cleanup uebersprungen: {abs_path}") return [] # Check 4: Ordner muss Videodateien enthalten (oder enthalten haben) if not self._has_video_files(directory): logging.debug(f"Keine Videodateien im Ordner, Cleanup uebersprungen: " f"{directory}") return [] delete_extensions = set(cleanup_cfg.get("delete_extensions", [])) deleted = [] # NUR im angegebenen Ordner, NICHT rekursiv try: for f in os.listdir(directory): full_path = os.path.join(directory, f) if not os.path.isfile(full_path): continue ext = os.path.splitext(f)[1].lower() if ext not in delete_extensions: continue if self._is_excluded(f): continue try: os.remove(full_path) deleted.append(full_path) logging.info(f"Cleanup: Geloescht {full_path}") except OSError as e: logging.error(f"Cleanup: Loeschen fehlgeschlagen {full_path}: {e}") except PermissionError: logging.error(f"Keine Berechtigung fuer Cleanup: {directory}") if deleted: logging.info(f"Cleanup: {len(deleted)} Dateien in {directory} geloescht") return deleted def _is_excluded(self, filename: str) -> bool: """Prueft ob Dateiname auf exclude_patterns matcht""" patterns = self.config.cleanup_config.get("exclude_patterns", []) return any( fnmatch.fnmatch(filename.lower(), p.lower()) for p in patterns ) def _has_video_files(self, directory: str) -> bool: """Prueft ob Ordner mindestens eine Videodatei enthaelt""" extensions = set(self.config.files_config.get("scan_extensions", [])) try: for f in os.listdir(directory): if os.path.splitext(f)[1].lower() in extensions: return True except PermissionError: pass return False