Projekt aus Docker-Image videoconverter:2.9 extrahiert. Enthält zweiphasigen Import-Workflow mit Serien-Zuordnung. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
155 lines
5.2 KiB
Python
155 lines
5.2 KiB
Python
"""Clean-Service: Findet und entfernt Nicht-Video-Dateien aus der Bibliothek"""
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.config import Config
|
|
from app.services.library import LibraryService, VIDEO_EXTENSIONS
|
|
|
|
|
|
class CleanerService:
|
|
"""Scannt Library-Ordner nach Nicht-Video-Dateien und bietet Cleanup an"""
|
|
|
|
def __init__(self, config: Config, library_service: LibraryService):
|
|
self.config = config
|
|
self.library = library_service
|
|
|
|
@property
|
|
def _cleanup_config(self) -> dict:
|
|
return self.config.settings.get("cleanup", {})
|
|
|
|
@property
|
|
def _keep_extensions(self) -> set:
|
|
"""Extensions die behalten werden sollen"""
|
|
exts = self._cleanup_config.get("keep_extensions", [])
|
|
return {e.lower() for e in exts}
|
|
|
|
async def scan_for_junk(self, library_path_id: int = None) -> dict:
|
|
"""Scannt Library-Ordner nach Nicht-Video-Dateien.
|
|
Gibt zurueck: files, total_size, total_count"""
|
|
paths = await self.library.get_paths()
|
|
if library_path_id:
|
|
paths = [p for p in paths if p["id"] == library_path_id]
|
|
|
|
keep_exts = self._keep_extensions
|
|
junk_files = []
|
|
total_size = 0
|
|
|
|
for lib_path in paths:
|
|
if not lib_path.get("enabled"):
|
|
continue
|
|
base = lib_path["path"]
|
|
if not os.path.isdir(base):
|
|
continue
|
|
|
|
for root, dirs, files in os.walk(base):
|
|
# Versteckte Ordner ueberspringen
|
|
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
|
for f in files:
|
|
ext = os.path.splitext(f)[1].lower()
|
|
# Video-Dateien und Keep-Extensions ueberspringen
|
|
if ext in VIDEO_EXTENSIONS:
|
|
continue
|
|
if ext in keep_exts:
|
|
continue
|
|
|
|
fp = os.path.join(root, f)
|
|
try:
|
|
size = os.path.getsize(fp)
|
|
except OSError:
|
|
size = 0
|
|
|
|
# Relativen Pfad berechnen
|
|
rel = os.path.relpath(fp, base)
|
|
# Serien-Ordner aus erstem Pfadteil
|
|
parts = rel.replace("\\", "/").split("/")
|
|
parent_series = parts[0] if len(parts) > 1 else ""
|
|
|
|
junk_files.append({
|
|
"path": fp,
|
|
"name": f,
|
|
"size": size,
|
|
"extension": ext,
|
|
"parent_series": parent_series,
|
|
"library_name": lib_path["name"],
|
|
})
|
|
total_size += size
|
|
|
|
return {
|
|
"files": junk_files,
|
|
"total_size": total_size,
|
|
"total_count": len(junk_files),
|
|
}
|
|
|
|
async def delete_files(self, file_paths: list[str]) -> dict:
|
|
"""Loescht die angegebenen Dateien"""
|
|
deleted = 0
|
|
failed = 0
|
|
freed = 0
|
|
errors = []
|
|
|
|
# Sicherheitscheck: Nur Dateien in Library-Pfaden loeschen
|
|
paths = await self.library.get_paths()
|
|
allowed_prefixes = [p["path"] for p in paths]
|
|
|
|
for fp in file_paths:
|
|
# Pruefen ob Datei in erlaubtem Pfad liegt
|
|
is_allowed = any(
|
|
fp.startswith(prefix + "/") or fp == prefix
|
|
for prefix in allowed_prefixes
|
|
)
|
|
if not is_allowed:
|
|
errors.append(f"Nicht erlaubt: {fp}")
|
|
failed += 1
|
|
continue
|
|
|
|
try:
|
|
size = os.path.getsize(fp)
|
|
os.remove(fp)
|
|
deleted += 1
|
|
freed += size
|
|
logging.info(f"Clean: Geloescht: {fp}")
|
|
except OSError as e:
|
|
errors.append(f"{fp}: {e}")
|
|
failed += 1
|
|
|
|
return {
|
|
"deleted": deleted,
|
|
"failed": failed,
|
|
"freed_bytes": freed,
|
|
"errors": errors,
|
|
}
|
|
|
|
async def delete_empty_dirs(self,
|
|
library_path_id: int = None) -> int:
|
|
"""Leere Unterordner loeschen (bottom-up)"""
|
|
paths = await self.library.get_paths()
|
|
if library_path_id:
|
|
paths = [p for p in paths if p["id"] == library_path_id]
|
|
|
|
removed = 0
|
|
for lib_path in paths:
|
|
if not lib_path.get("enabled"):
|
|
continue
|
|
base = lib_path["path"]
|
|
if not os.path.isdir(base):
|
|
continue
|
|
|
|
# Bottom-up: Tiefste Ordner zuerst
|
|
for root, dirs, files in os.walk(base, topdown=False):
|
|
# Nicht den Basis-Ordner selbst loeschen
|
|
if root == base:
|
|
continue
|
|
# Versteckte Ordner ueberspringen
|
|
if os.path.basename(root).startswith("."):
|
|
continue
|
|
try:
|
|
if not os.listdir(root):
|
|
os.rmdir(root)
|
|
removed += 1
|
|
logging.info(f"Clean: Leerer Ordner entfernt: {root}")
|
|
except OSError:
|
|
pass
|
|
|
|
return removed
|