"""Import-Service: Videos erkennen, TVDB-Match, umbenennen, einsortieren""" import asyncio import json import logging import os import re import shutil from pathlib import Path from typing import Optional import aiomysql from app.config import Config from app.services.library import ( LibraryService, VIDEO_EXTENSIONS, RE_SXXEXX, RE_XXxXX ) from app.services.tvdb import TVDBService from app.services.probe import ProbeService # Serienname aus Dateiname extrahieren (alles vor SxxExx) RE_SERIES_FROM_NAME = re.compile( r'^(.+?)[\s._-]+[Ss]\d{1,2}[Ee]\d{1,3}', re.IGNORECASE ) RE_SERIES_FROM_XXx = re.compile( r'^(.+?)[\s._-]+\d{1,2}x\d{2,3}', re.IGNORECASE ) # "Serienname - Staffel X" oder "Serienname Season X" in Ordnernamen RE_STAFFEL_DIR = re.compile( r'^(.+?)[\s._-]+(?:Staffel|Season)\s*(\d{1,2})\s*$', re.IGNORECASE ) class ImporterService: """Video-Import: Erkennung, TVDB-Matching, Umbenennung, Kopieren/Verschieben""" def __init__(self, config: Config, library_service: LibraryService, tvdb_service: TVDBService): self.config = config self.library = library_service self.tvdb = tvdb_service self._db_pool: Optional[aiomysql.Pool] = None self.ws_manager = None def set_db_pool(self, pool: aiomysql.Pool) -> None: self._db_pool = pool def set_ws_manager(self, ws_manager) -> None: """WebSocket-Manager fuer Live-Updates""" self.ws_manager = ws_manager async def _broadcast_import(self, job_id: int, status: str, processed: int = 0, total: int = 0, current_file: str = "", bytes_done: int = 0, bytes_total: int = 0) -> None: """Sendet Import-Fortschritt per WebSocket""" if not self.ws_manager: return await self.ws_manager.broadcast({ "data_import": { "job_id": job_id, "status": status, "processed": processed, "total": total, "current_file": current_file, "bytes_done": bytes_done, "bytes_total": bytes_total, } }) @property def _naming_pattern(self) -> str: return self.config.settings.get("library", {}).get( "import_naming_pattern", "{series} - S{season:02d}E{episode:02d} - {title}.{ext}" ) @property def _season_pattern(self) -> str: return self.config.settings.get("library", {}).get( "import_season_pattern", "Season {season:02d}" ) # === DB-Tabellen erstellen === async def init_db(self) -> None: """Import-Tabellen erstellen""" if not self._db_pool: return try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" CREATE TABLE IF NOT EXISTS import_jobs ( id INT AUTO_INCREMENT PRIMARY KEY, source_path VARCHAR(1024) NOT NULL, target_library_id INT NOT NULL, status ENUM('pending','analyzing','pending_assignment', 'ready','importing','done','error') DEFAULT 'pending', mode ENUM('copy','move') DEFAULT 'copy', naming_pattern VARCHAR(256), season_pattern VARCHAR(256), total_files INT DEFAULT 0, processed_files INT DEFAULT 0, overwrite_all TINYINT(1) DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (target_library_id) REFERENCES library_paths(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) await cur.execute(""" CREATE TABLE IF NOT EXISTS import_items ( id INT AUTO_INCREMENT PRIMARY KEY, import_job_id INT NOT NULL, source_file VARCHAR(1024) NOT NULL, source_size BIGINT NOT NULL DEFAULT 0, source_duration DOUBLE NULL, detected_series VARCHAR(256), detected_season INT, detected_episode INT, tvdb_series_id INT NULL, tvdb_series_name VARCHAR(256), tvdb_episode_title VARCHAR(512), target_path VARCHAR(1024), target_filename VARCHAR(512), status ENUM('pending','pending_series','matched', 'conflict','skipped','done','error') DEFAULT 'pending', conflict_reason VARCHAR(512) NULL, existing_file_path VARCHAR(1024) NULL, existing_file_size BIGINT NULL, user_action ENUM('overwrite','skip','rename') NULL, FOREIGN KEY (import_job_id) REFERENCES import_jobs(id) ON DELETE CASCADE, INDEX idx_job (import_job_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) logging.info("Import-Tabellen initialisiert") except Exception as e: logging.error(f"Import-Tabellen erstellen fehlgeschlagen: {e}") # === Job-Verwaltung === async def create_job(self, source_path: str, target_library_id: int, mode: str = 'copy') -> Optional[int]: """Erstellt einen Import-Job und sucht Video-Dateien im Quellordner""" if not self._db_pool: return None if not os.path.isdir(source_path): return None if mode not in ('copy', 'move'): mode = 'copy' # Video-Dateien im Quellordner finden videos = [] for root, dirs, files in os.walk(source_path): dirs[:] = [d for d in dirs if not d.startswith(".")] for f in sorted(files): ext = os.path.splitext(f)[1].lower() if ext in VIDEO_EXTENSIONS: videos.append(os.path.join(root, f)) if not videos: return None try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "INSERT INTO import_jobs " "(source_path, target_library_id, status, mode, " "naming_pattern, season_pattern, total_files) " "VALUES (%s, %s, 'pending', %s, %s, %s, %s)", (source_path, target_library_id, mode, self._naming_pattern, self._season_pattern, len(videos)) ) job_id = cur.lastrowid # Items einfuegen for vf in videos: try: size = os.path.getsize(vf) except OSError: size = 0 await cur.execute( "INSERT INTO import_items " "(import_job_id, source_file, source_size) " "VALUES (%s, %s, %s)", (job_id, vf, size) ) logging.info( f"Import-Job erstellt: {job_id} " f"({len(videos)} Videos aus {source_path})" ) return job_id except Exception as e: logging.error(f"Import-Job erstellen fehlgeschlagen: {e}") return None async def delete_job(self, job_id: int) -> dict: """Loescht einen Import-Job (nur wenn nicht gerade importiert wird)""" if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "SELECT status FROM import_jobs WHERE id = %s", (job_id,) ) row = await cur.fetchone() if not row: return {"error": "Job nicht gefunden"} if row[0] == 'importing': return {"error": "Job laeuft gerade, kann nicht geloescht werden"} # Items werden per ON DELETE CASCADE mitgeloescht await cur.execute( "DELETE FROM import_jobs WHERE id = %s", (job_id,) ) logging.info(f"Import-Job {job_id} geloescht") return {"message": f"Import-Job {job_id} geloescht"} except Exception as e: logging.error(f"Import-Job {job_id} loeschen fehlgeschlagen: {e}") return {"error": str(e)} async def analyze_job(self, job_id: int) -> dict: """Phase 1: Nur Serie/Staffel/Episode aus Dateinamen extrahieren. KEIN TVDB-Lookup, KEINE Zielpfade, KEINE Konflikt-Pruefung. Das passiert erst nach der Serien-Zuordnung durch den User. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: # Job laden await cur.execute( "SELECT * FROM import_jobs WHERE id = %s", (job_id,) ) job = await cur.fetchone() if not job: return {"error": "Job nicht gefunden"} # Status auf analyzing await cur.execute( "UPDATE import_jobs SET status = 'analyzing' " "WHERE id = %s", (job_id,) ) # Items laden await cur.execute( "SELECT * FROM import_items " "WHERE import_job_id = %s ORDER BY source_file", (job_id,) ) items = await cur.fetchall() # Jedes Item analysieren (NUR Dateiname-Erkennung) total = len(items) for idx, item in enumerate(items): await self._analyze_item_basic(item) await self._broadcast_import( job_id, "analyzing", processed=idx + 1, total=total, current_file=os.path.basename( item.get("source_file", "") ), ) # Status auf 'pending_assignment' - wartet auf Serien-Zuordnung async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_jobs SET status = 'pending_assignment' " "WHERE id = %s", (job_id,) ) return await self.get_job_status(job_id) except Exception as e: logging.error(f"Import-Analyse fehlgeschlagen: {e}") return {"error": str(e)} async def _analyze_item_basic(self, item: dict) -> None: """Phase 1: Nur Serie/Staffel/Episode aus Dateinamen extrahieren.""" filename = os.path.basename(item["source_file"]) # Groessen-Check: Zu kleine Dateien als Sample/Trailer markieren if item["source_size"] < self.MIN_EPISODE_SIZE: try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" UPDATE import_items SET status = 'skipped', conflict_reason = %s WHERE id = %s """, ( f"Vermutlich Sample/Trailer " f"({self._fmt_size(item['source_size'])})", item["id"], )) except Exception: pass return # Serie/Staffel/Episode erkennen (Dateiname + Ordnername) info = self._detect_series_info(item["source_file"]) series_name = info.get("series", "") season = info.get("season") episode = info.get("episode") # Status: pending_series wenn Serie erkannt, sonst pending if series_name and season and episode: status = "pending_series" # Wartet auf Serien-Zuordnung else: status = "pending" # Nicht erkannt # In DB aktualisieren (NUR erkannte Werte, KEIN Zielpfad) try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" UPDATE import_items SET detected_series = %s, detected_season = %s, detected_episode = %s, status = %s, conflict_reason = %s WHERE id = %s """, ( series_name, season, episode, status, None if status == "pending_series" else "Serie/Staffel/Episode nicht erkannt", item["id"], )) except Exception as e: logging.error(f"Import-Item analysieren fehlgeschlagen: {e}") async def get_pending_series(self, job_id: int) -> dict: """Gibt alle erkannten Serien zurueck, die noch zugeordnet werden muessen. Gruppiert nach detected_series mit Anzahl der Dateien. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(""" SELECT detected_series, COUNT(*) as count, MIN(detected_season) as min_season, MAX(detected_season) as max_season FROM import_items WHERE import_job_id = %s AND status = 'pending_series' AND detected_series IS NOT NULL GROUP BY detected_series ORDER BY detected_series """, (job_id,)) rows = await cur.fetchall() series_list = [] for row in rows: series_list.append({ "detected_name": row["detected_series"], "count": row["count"], "seasons": f"{row['min_season']}-{row['max_season']}" if row["min_season"] != row["max_season"] else str(row["min_season"]), }) return {"series": series_list} except Exception as e: logging.error(f"Pending Series laden fehlgeschlagen: {e}") return {"error": str(e)} async def assign_series_mapping(self, job_id: int, detected_series: str, tvdb_id: int, tvdb_name: str) -> dict: """Ordnet eine erkannte Serie einer TVDB-Serie zu. Berechnet Zielpfade und prueft Konflikte fuer alle Dateien dieser Serie. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: # Job + Library-Pfad laden await cur.execute( "SELECT j.*, lp.path as lib_path " "FROM import_jobs j " "JOIN library_paths lp ON lp.id = j.target_library_id " "WHERE j.id = %s", (job_id,) ) job = await cur.fetchone() if not job: return {"error": "Job nicht gefunden"} # Alle Items mit diesem detected_series await cur.execute( "SELECT * FROM import_items " "WHERE import_job_id = %s " "AND LOWER(detected_series) = LOWER(%s) " "AND status = 'pending_series'", (job_id, detected_series) ) items = await cur.fetchall() if not items: return {"error": f"Keine Items fuer '{detected_series}'"} # Jedes Item: Zielpfad berechnen, Episodentitel holen, Konflikte pruefen updated = 0 async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: for item in items: season = item["detected_season"] episode = item["detected_episode"] # Episodentitel von TVDB tvdb_ep_title = "" if tvdb_id and season and episode and self.tvdb.is_configured: tvdb_ep_title = await self._get_episode_title( int(tvdb_id), season, episode ) # Zielpfad berechnen ext = os.path.splitext(item["source_file"])[1].lstrip(".") pattern = job.get("naming_pattern") or self._naming_pattern season_pat = job.get("season_pattern") or self._season_pattern target_dir, target_file = self._build_target( tvdb_name, season, episode, tvdb_ep_title, ext, job["lib_path"], pattern, season_pat ) target_path = os.path.join(target_dir, target_file) # Konflikt pruefen status = "matched" conflict = None existing_path = None existing_size = None if os.path.exists(target_path): existing_path = target_path existing_size = os.path.getsize(target_path) source_size = item["source_size"] if source_size and existing_size: diff_pct = abs(source_size - existing_size) / max( existing_size, 1 ) * 100 if diff_pct > 20: conflict = ( f"Datei existiert " f"(Quelle: {self._fmt_size(source_size)}, " f"Ziel: {self._fmt_size(existing_size)}, " f"Diff: {diff_pct:.0f}%)" ) else: conflict = "Datei existiert (aehnliche Groesse)" else: conflict = "Datei existiert bereits" status = "conflict" await cur.execute(""" UPDATE import_items SET tvdb_series_id = %s, tvdb_series_name = %s, tvdb_episode_title = %s, target_path = %s, target_filename = %s, status = %s, conflict_reason = %s, existing_file_path = %s, existing_file_size = %s WHERE id = %s """, ( tvdb_id, tvdb_name, tvdb_ep_title, target_dir, target_file, status, conflict, existing_path, existing_size, item["id"], )) updated += 1 # Pruefen ob noch pending_series Items uebrig sind async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "SELECT COUNT(*) FROM import_items " "WHERE import_job_id = %s AND status = 'pending_series'", (job_id,) ) remaining = (await cur.fetchone())[0] # Wenn alle Serien zugeordnet -> Status auf ready if remaining == 0: await cur.execute( "UPDATE import_jobs SET status = 'ready' " "WHERE id = %s", (job_id,) ) logging.info( f"Serien-Zuordnung: {updated} Items fuer " f"'{detected_series}' -> '{tvdb_name}' (TVDB: {tvdb_id})" ) return { "ok": True, "updated": updated, "tvdb_name": tvdb_name, "remaining_series": remaining, } except Exception as e: logging.error(f"Serien-Zuordnung fehlgeschlagen: {e}") return {"error": str(e)} # Mindestgroesse fuer "echte" Episoden (darunter = Sample/Trailer) MIN_EPISODE_SIZE = 100 * 1024 * 1024 # 100 MiB def _detect_series_info(self, file_path: str) -> dict: """Extrahiert Serienname, Staffel, Episode. Versucht zuerst den Dateinamen, dann den uebergeordneten Ordnernamen als Fallback. Der Ordnername ist oft zuverlaessiger bei Release-Gruppen-Prefixes (z.B. 'tlr-24.s07e01.mkv' vs Ordner '24.S07E01.German.DL.1080p.Bluray.x264-TLR'). Erkennt auch 'Serienname - Staffel X'-Ordner (haeufig bei DE-Medien). """ filename = os.path.basename(file_path) parent_dir = os.path.basename(os.path.dirname(file_path)) grandparent_dir = os.path.basename( os.path.dirname(os.path.dirname(file_path)) ) # Beide Quellen versuchen info_file = self._parse_name(filename) info_dir = self._parse_name(parent_dir) # Strategie: Ordnername bevorzugen bei Scene-Releases. # Scene-Ordner: "24.S07E01.German.DL.1080p-TLR" -> Serie="24" # Scene-Datei: "tlr-24.s07e01.1080p.mkv" -> Serie="tlr-24" # Ordnername hat Serienname vorne, Dateiname oft Release-Tag vorne # Ordnername hat S/E -> bevorzugen (hat meist korrekten Seriennamen) if info_dir.get("season") and info_dir.get("episode"): if info_dir.get("series"): return info_dir # Ordner hat S/E aber keinen Namen -> Dateiname nehmen if info_file.get("series"): info_dir["series"] = info_file["series"] return info_dir # "Staffel X" / "Season X" Pattern im Ordnernamen # z.B. "24 - Staffel 6" -> Serie="24", Staffel=6 # Episode kommt dann aus dem Dateinamen staffel_info = self._parse_staffel_dir(parent_dir) if not staffel_info: staffel_info = self._parse_staffel_dir(grandparent_dir) if staffel_info and info_file.get("episode"): return { "series": staffel_info["series"], "season": staffel_info["season"], "episode": info_file["episode"], } # Dateiname hat S/E if info_file.get("season") and info_file.get("episode"): # Ordner-Serienname als Fallback wenn Datei keinen hat if not info_file.get("series") and info_dir.get("series"): info_file["series"] = info_dir["series"] return info_file return info_file async def _get_cached_tvdb_match(self, series_name: str) -> Optional[dict]: """Prueft ob diese Serie schon in library_series mit TVDB-ID existiert.""" if not self._db_pool: return None try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT tvdb_id, title FROM library_series " "WHERE tvdb_id IS NOT NULL " "AND (LOWER(folder_name) = LOWER(%s) " " OR LOWER(title) = LOWER(%s))", (series_name, series_name) ) row = await cur.fetchone() if row: return { "tvdb_id": row["tvdb_id"], "name": row["title"], } except Exception: pass return None @staticmethod def _parse_staffel_dir(dir_name: str) -> Optional[dict]: """Erkennt 'Serienname - Staffel X' Pattern in Ordnernamen""" m = RE_STAFFEL_DIR.match(dir_name) if m: series = m.group(1).replace(".", " ").replace("_", " ").strip() series = re.sub(r'\s+', ' ', series).rstrip(" -") if series: return {"series": series, "season": int(m.group(2))} return None def _parse_name(self, name: str) -> dict: """Extrahiert Serienname, Staffel, Episode aus einem Namen""" result = {"series": "", "season": None, "episode": None} name_no_ext = os.path.splitext(name)[0] # S01E02 Format m = RE_SXXEXX.search(name) if m: result["season"] = int(m.group(1)) result["episode"] = int(m.group(2)) sm = RE_SERIES_FROM_NAME.match(name_no_ext) if sm: result["series"] = self._clean_name(sm.group(1)) return result # 1x02 Format m = RE_XXxXX.search(name) if m: result["season"] = int(m.group(1)) result["episode"] = int(m.group(2)) sm = RE_SERIES_FROM_XXx.match(name_no_ext) if sm: result["series"] = self._clean_name(sm.group(1)) return result return result @staticmethod def _clean_name(name: str) -> str: """Bereinigt Seriennamen: Punkte/Underscores durch Leerzeichen""" name = name.replace(".", " ").replace("_", " ") # Mehrfach-Leerzeichen reduzieren name = re.sub(r'\s+', ' ', name).strip() # Trailing Bindestriche entfernen name = name.rstrip(" -") return name def _build_target(self, series: str, season: Optional[int], episode: Optional[int], title: str, ext: str, lib_path: str, pattern: str, season_pattern: str) -> tuple[str, str]: """Baut Ziel-Ordner und Dateiname nach Pattern""" s = season or 1 e = episode or 0 # Season-Ordner season_dir = season_pattern.format(season=s) # Dateiname - kein Titel: ohne Titel-Teil, sonst mit try: if title: filename = pattern.format( series=series, season=s, episode=e, title=title, ext=ext ) else: # Ohne Titel: "Serie - S01E03.ext" filename = f"{series} - S{s:02d}E{e:02d}.{ext}" except (KeyError, ValueError): if title: filename = f"{series} - S{s:02d}E{e:02d} - {title}.{ext}" else: filename = f"{series} - S{s:02d}E{e:02d}.{ext}" # Ungueltige Zeichen entfernen for ch in ['<', '>', ':', '"', '|', '?', '*']: filename = filename.replace(ch, '') series = series.replace(ch, '') target_dir = os.path.join(lib_path, series, season_dir) return target_dir, filename async def _get_episode_title(self, tvdb_id: int, season: int, episode: int) -> str: """Episodentitel aus TVDB-Cache oder API holen""" if not self._db_pool: return "" try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: # Zuerst Cache pruefen await cur.execute( "SELECT episode_name FROM tvdb_episode_cache " "WHERE series_tvdb_id = %s " "AND season_number = %s " "AND episode_number = %s", (tvdb_id, season, episode) ) row = await cur.fetchone() if row and row[0]: return row[0] except Exception: pass # Cache leer -> Episoden von TVDB laden episodes = await self.tvdb.fetch_episodes(tvdb_id) for ep in episodes: if ep["season_number"] == season and ep["episode_number"] == episode: return ep.get("episode_name", "") return "" async def execute_import(self, job_id: int) -> dict: """Fuehrt den Import aus (Kopieren/Verschieben + TVDB-Link)""" if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT * FROM import_jobs WHERE id = %s", (job_id,) ) job = await cur.fetchone() if not job: return {"error": "Job nicht gefunden"} await cur.execute( "UPDATE import_jobs SET status = 'importing' " "WHERE id = %s", (job_id,) ) # Nur Items mit status matched oder conflict+overwrite await cur.execute( "SELECT * FROM import_items " "WHERE import_job_id = %s " "AND (status = 'matched' " " OR (status = 'conflict' " " AND user_action = 'overwrite'))", (job_id,) ) items = await cur.fetchall() done = 0 errors = 0 mode = job.get("mode", "copy") # TVDB-IDs sammeln fuer spaetere Verknuepfung tvdb_links = {} # series_name -> tvdb_id total_items = len(items) for item in items: # WS-Broadcast VOR dem Kopieren await self._broadcast_import( job_id, "importing", processed=done + errors, total=total_items, current_file=os.path.basename( item.get("source_file", "") ), bytes_total=item.get("source_size", 0), ) ok = await self._process_item( item, mode, job_id, done + errors, total_items ) if ok: done += 1 # TVDB-Link merken if item.get("tvdb_series_id") and item.get("tvdb_series_name"): tvdb_links[item["tvdb_series_name"]] = item["tvdb_series_id"] else: errors += 1 # Progress updaten async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_jobs SET processed_files = %s " "WHERE id = %s", (done + errors, job_id) ) # WS-Broadcast NACH dem Kopieren await self._broadcast_import( job_id, "importing", processed=done + errors, total=total_items, ) # Job abschliessen final_status = "done" if errors == 0 else "error" async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_jobs SET status = %s " "WHERE id = %s", (final_status, job_id) ) # TVDB-Zuordnungen in library_series uebernehmen linked_series = 0 if tvdb_links: linked_series = await self._link_tvdb_to_series(tvdb_links) # Finaler WS-Broadcast await self._broadcast_import( job_id, final_status, processed=done + errors, total=total_items, ) # Auto-Scan: Ziel-Bibliothek aktualisieren target_lib_id = job.get("target_library_id") if target_lib_id and self.library: logging.info( f"Import abgeschlossen - starte Auto-Scan " f"fuer Bibliothek {target_lib_id}" ) await self.library.scan_single_path(target_lib_id) return { "done": done, "errors": errors, "tvdb_linked": linked_series, } except Exception as e: logging.error(f"Import ausfuehren fehlgeschlagen: {e}") return {"error": str(e)} async def _link_tvdb_to_series(self, tvdb_links: dict) -> int: """Verknuepft importierte Serien mit TVDB in library_series""" if not self._db_pool or not self.tvdb: return 0 linked = 0 for series_name, tvdb_id in tvdb_links.items(): try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: # Serie in library_series finden (nach Namen) await cur.execute( "SELECT id, tvdb_id FROM library_series " "WHERE (folder_name = %s OR title = %s) " "AND tvdb_id IS NULL " "LIMIT 1", (series_name, series_name) ) row = await cur.fetchone() if row: series_id = row[0] # TVDB-Daten laden und verknuepfen result = await self.tvdb.match_and_update_series( series_id, int(tvdb_id), self.library ) if not result.get("error"): linked += 1 logging.info( f"Import: TVDB verknuepft - " f"{series_name} -> {tvdb_id}" ) except Exception as e: logging.warning( f"TVDB-Link fehlgeschlagen fuer {series_name}: {e}" ) return linked async def _process_item(self, item: dict, mode: str, job_id: int = 0, item_processed: int = 0, item_total: int = 0) -> bool: """Einzelnes Item importieren (kopieren/verschieben + Metadaten)""" src = item["source_file"] target_dir = item["target_path"] target_file = item["target_filename"] if not target_dir or not target_file: await self._update_item_status(item["id"], "error") return False target = os.path.join(target_dir, target_file) src_size = item.get("source_size", 0) or os.path.getsize(src) try: # Zielordner erstellen os.makedirs(target_dir, exist_ok=True) # Alte Dateien fuer dieselbe Episode aufraeumen # (z.B. "S01E03 - Unbekannt.mkv" wenn jetzt "S01E03 - Willkür.mkv" kommt) season = item.get("detected_season") episode = item.get("detected_episode") if season is not None and episode is not None and os.path.isdir(target_dir): ep_pattern = f"S{season:02d}E{episode:02d}" for existing in os.listdir(target_dir): existing_path = os.path.join(target_dir, existing) if (existing != target_file and ep_pattern in existing and os.path.isfile(existing_path)): logging.info( f"Import: Alte Episode-Datei entfernt: {existing}" ) os.remove(existing_path) # Auch aus library_videos loeschen if self._db_pool: try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "DELETE FROM library_videos " "WHERE file_path = %s", (existing_path,) ) except Exception: pass # Fortschritt-Tracking in DB setzen if job_id and self._db_pool: await self._update_file_progress( job_id, target_file, 0, src_size ) if mode == "move": shutil.move(src, target) # Bei Move sofort fertig if job_id and self._db_pool: await self._update_file_progress( job_id, target_file, src_size, src_size ) else: # Kopieren mit Fortschritt await self._copy_with_progress( src, target, job_id, target_file, src_size, item_processed, item_total ) logging.info( f"Import: {os.path.basename(src)} -> {target}" ) # Metadaten in Datei einbetten (falls TVDB-Infos vorhanden) if item.get("tvdb_series_name") or item.get("detected_series"): # WS-Broadcast: Metadaten-Phase anzeigen await self._broadcast_import( job_id, "embedding", processed=item_processed, total=item_total, current_file=target_file, ) await self._embed_metadata(target, item) await self._update_item_status(item["id"], "done") return True except Exception as e: logging.error(f"Import fehlgeschlagen: {src}: {e}") await self._update_item_status(item["id"], "error") return False async def _embed_metadata(self, file_path: str, item: dict) -> bool: """Bettet Metadaten mit ffmpeg in die Datei ein""" import asyncio import tempfile series_name = item.get("tvdb_series_name") or item.get("detected_series") or "" season = item.get("detected_season") or 0 episode = item.get("detected_episode") or 0 episode_title = item.get("tvdb_episode_title") or "" if not series_name: return False # Temporaere Ausgabedatei base, ext = os.path.splitext(file_path) temp_file = f"{base}_temp{ext}" # ffmpeg Metadaten-Befehl cmd = [ "ffmpeg", "-y", "-i", file_path, "-map", "0", "-c", "copy", "-metadata", f"title={episode_title}" if episode_title else f"S{season:02d}E{episode:02d}", "-metadata", f"show={series_name}", "-metadata", f"season_number={season}", "-metadata", f"episode_sort={episode}", "-metadata", f"episode_id=S{season:02d}E{episode:02d}", ] # Fuer MKV zusaetzliche Tags if file_path.lower().endswith(".mkv"): cmd.extend([ "-metadata:s:v:0", f"title={series_name} - S{season:02d}E{episode:02d}", ]) cmd.append(temp_file) try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await asyncio.wait_for( process.communicate(), timeout=600 # 10 Min fuer grosse Dateien ) if process.returncode == 0: # Temporaere Datei ueber Original verschieben os.replace(temp_file, file_path) logging.info(f"Metadaten eingebettet: {os.path.basename(file_path)}") return True else: logging.warning( f"Metadaten einbetten fehlgeschlagen: " f"{stderr.decode()[:200]}" ) # Temp-Datei loeschen falls vorhanden if os.path.exists(temp_file): os.remove(temp_file) return False except asyncio.TimeoutError: logging.warning(f"Metadaten einbetten Timeout: {file_path}") if os.path.exists(temp_file): os.remove(temp_file) return False except Exception as e: logging.warning(f"Metadaten einbetten Fehler: {e}") if os.path.exists(temp_file): os.remove(temp_file) return False async def _update_item_status(self, item_id: int, status: str) -> None: if not self._db_pool: return try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_items SET status = %s " "WHERE id = %s", (status, item_id) ) except Exception: pass async def _update_file_progress(self, job_id: int, filename: str, bytes_done: int, bytes_total: int) -> None: """Aktualisiert Byte-Fortschritt fuer aktuelle Datei""" if not self._db_pool: return try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_jobs SET " "current_file_name = %s, " "current_file_bytes = %s, " "current_file_total = %s " "WHERE id = %s", (filename, bytes_done, bytes_total, job_id) ) except Exception: pass async def _copy_with_progress(self, src: str, dst: str, job_id: int, filename: str, total_size: int, item_processed: int = 0, item_total: int = 0) -> None: """Kopiert Datei mit Fortschritts-Updates in DB + WebSocket""" chunk_size = 64 * 1024 * 1024 # 64 MB Chunks bytes_copied = 0 last_update = 0 loop = asyncio.get_event_loop() with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst: while True: # Chunk lesen (in Thread um nicht zu blockieren) chunk = await loop.run_in_executor( None, fsrc.read, chunk_size ) if not chunk: break # Chunk schreiben await loop.run_in_executor(None, fdst.write, chunk) bytes_copied += len(chunk) # Progress alle 50 MB updaten (DB + WebSocket) if bytes_copied - last_update >= 50 * 1024 * 1024: await self._update_file_progress( job_id, filename, bytes_copied, total_size ) await self._broadcast_import( job_id, "importing", processed=item_processed, total=item_total, current_file=filename, bytes_done=bytes_copied, bytes_total=total_size, ) last_update = bytes_copied # Finales Update await self._update_file_progress( job_id, filename, total_size, total_size ) # Metadaten kopieren (Zeitstempel etc.) shutil.copystat(src, dst) async def resolve_conflict(self, item_id: int, action: str) -> bool: """Konflikt loesen: overwrite, skip, rename""" if not self._db_pool or action not in ('overwrite', 'skip', 'rename'): return False try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: if action == 'skip': await cur.execute( "UPDATE import_items SET status = 'skipped', " "user_action = 'skip' WHERE id = %s", (item_id,) ) elif action == 'rename': # Dateiname mit Suffix versehen await cur.execute( "SELECT target_filename FROM import_items " "WHERE id = %s", (item_id,) ) row = await cur.fetchone() if row and row[0]: name, ext = os.path.splitext(row[0]) new_name = f"{name}_neu{ext}" await cur.execute( "UPDATE import_items SET " "target_filename = %s, " "status = 'matched', " "user_action = 'rename' " "WHERE id = %s", (new_name, item_id) ) else: # overwrite await cur.execute( "UPDATE import_items SET user_action = 'overwrite' " "WHERE id = %s", (item_id,) ) return True except Exception as e: logging.error(f"Konflikt loesen fehlgeschlagen: {e}") return False async def resolve_all_conflicts(self, job_id: int, action: str) -> dict: """Loest ALLE Konflikte eines Jobs auf einmal. action: 'overwrite' oder 'skip' """ if not self._db_pool or action not in ('overwrite', 'skip'): return {"error": "Ungueltige Aktion"} try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: if action == 'skip': await cur.execute( "UPDATE import_items SET status = 'skipped', " "user_action = 'skip' " "WHERE import_job_id = %s AND status = 'conflict' " "AND user_action IS NULL", (job_id,) ) else: # overwrite await cur.execute( "UPDATE import_items SET user_action = 'overwrite' " "WHERE import_job_id = %s AND status = 'conflict' " "AND user_action IS NULL", (job_id,) ) updated = cur.rowcount logging.info(f"Alle Konflikte geloest: {updated} Items -> {action}") return {"ok": True, "updated": updated} except Exception as e: logging.error(f"Alle Konflikte loesen fehlgeschlagen: {e}") return {"error": str(e)} async def set_overwrite_mode(self, job_id: int, overwrite: bool) -> dict: """Setzt den Ueberschreiben-Modus fuer den ganzen Job. Wenn overwrite=True, werden alle Konflikte automatisch ueberschrieben. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_jobs SET overwrite_all = %s " "WHERE id = %s", (1 if overwrite else 0, job_id) ) # Wenn aktiviert: Alle bestehenden Konflikte auf overwrite setzen if overwrite: await cur.execute( "UPDATE import_items SET user_action = 'overwrite' " "WHERE import_job_id = %s AND status = 'conflict' " "AND user_action IS NULL", (job_id,) ) return {"ok": True, "overwrite_all": overwrite} except Exception as e: logging.error(f"Overwrite-Modus setzen fehlgeschlagen: {e}") return {"error": str(e)} async def update_item(self, item_id: int, **kwargs) -> bool: """Manuelle Korrektur eines Items""" if not self._db_pool: return False allowed = { 'detected_series', 'detected_season', 'detected_episode', 'tvdb_series_id', 'tvdb_series_name', 'tvdb_episode_title', 'target_path', 'target_filename', 'status' } updates = [] params = [] for k, v in kwargs.items(): if k in allowed: updates.append(f"{k} = %s") params.append(v) if not updates: return False params.append(item_id) try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( f"UPDATE import_items SET {', '.join(updates)} " f"WHERE id = %s", params ) return True except Exception as e: logging.error(f"Import-Item aktualisieren fehlgeschlagen: {e}") return False async def reassign_item(self, item_id: int, series_name: str, season: int, episode: int, tvdb_id: int = None) -> dict: """Weist einem pending-Item eine Serie/Staffel/Episode zu. Berechnet automatisch den Zielpfad und holt ggf. TVDB-Episodentitel. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: # Item laden await cur.execute( "SELECT i.*, j.target_library_id, j.naming_pattern, " "j.season_pattern FROM import_items i " "JOIN import_jobs j ON j.id = i.import_job_id " "WHERE i.id = %s", (item_id,) ) item = await cur.fetchone() if not item: return {"error": "Item nicht gefunden"} # Library-Pfad laden await cur.execute( "SELECT * FROM library_paths WHERE id = %s", (item["target_library_id"],) ) lib_path = await cur.fetchone() if not lib_path: return {"error": "Ziel-Library nicht gefunden"} # TVDB-Name und Episodentitel holen tvdb_name = series_name tvdb_ep_title = "" if tvdb_id and self.tvdb.is_configured: # Serien-Info von TVDB holen try: info = await self.tvdb.get_series_info(tvdb_id) if info and info.get("name"): tvdb_name = info["name"] except Exception: pass # Episodentitel holen tvdb_ep_title = await self._get_episode_title( tvdb_id, season, episode ) # Zielpfad berechnen ext = os.path.splitext(item["source_file"])[1].lstrip(".") pattern = item.get("naming_pattern") or self._naming_pattern season_pattern = item.get("season_pattern") or self._season_pattern target_dir, target_file = self._build_target( tvdb_name or series_name, season, episode, tvdb_ep_title or "", ext, lib_path["path"], pattern, season_pattern ) # In DB aktualisieren async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(""" UPDATE import_items SET detected_series = %s, detected_season = %s, detected_episode = %s, tvdb_series_id = %s, tvdb_series_name = %s, tvdb_episode_title = %s, target_path = %s, target_filename = %s, status = 'matched' WHERE id = %s """, ( series_name, season, episode, tvdb_id, tvdb_name, tvdb_ep_title, target_dir, target_file, item_id, )) return { "ok": True, "target_dir": target_dir, "target_file": target_file, "tvdb_name": tvdb_name, "tvdb_ep_title": tvdb_ep_title, } except Exception as e: logging.error(f"Import-Item zuordnen fehlgeschlagen: {e}") return {"error": str(e)} async def reassign_series(self, job_id: int, detected_series: str, tvdb_id: int = None, series_name: str = None) -> dict: """Ordnet alle Items mit gleichem detected_series einer Serie zu. Setzt TVDB-ID + Episodentitel + Zielpfade fuer alle passenden Items. """ if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: # Job + Library-Pfad laden await cur.execute( "SELECT j.*, lp.path as lib_path " "FROM import_jobs j " "JOIN library_paths lp ON lp.id = j.target_library_id " "WHERE j.id = %s", (job_id,) ) job = await cur.fetchone() if not job: return {"error": "Job nicht gefunden"} # Alle Items mit gleichem detected_series await cur.execute( "SELECT * FROM import_items " "WHERE import_job_id = %s " "AND LOWER(detected_series) = LOWER(%s) " "AND status IN ('pending', 'matched') " "AND detected_season IS NOT NULL " "AND detected_episode IS NOT NULL", (job_id, detected_series) ) items = await cur.fetchall() if not items: return {"error": f"Keine Items fuer '{detected_series}'"} # TVDB-Name holen tvdb_name = series_name or detected_series if tvdb_id and self.tvdb.is_configured: try: info = await self.tvdb.get_series_info(tvdb_id) if info and info.get("name"): tvdb_name = info["name"] except Exception: pass # Jedes Item aktualisieren (eine Connection fuer alle) updated = 0 async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: for item in items: season = item["detected_season"] episode = item["detected_episode"] # Episodentitel holen tvdb_ep_title = "" if tvdb_id and season and episode: tvdb_ep_title = await self._get_episode_title( int(tvdb_id), season, episode ) # Zielpfad berechnen ext = os.path.splitext( item["source_file"] )[1].lstrip(".") pattern = (job.get("naming_pattern") or self._naming_pattern) season_pat = (job.get("season_pattern") or self._season_pattern) target_dir, target_file = self._build_target( tvdb_name, season, episode, tvdb_ep_title, ext, job["lib_path"], pattern, season_pat ) await cur.execute(""" UPDATE import_items SET tvdb_series_id = %s, tvdb_series_name = %s, tvdb_episode_title = %s, target_path = %s, target_filename = %s, status = 'matched', conflict_reason = NULL WHERE id = %s """, ( tvdb_id, tvdb_name, tvdb_ep_title, target_dir, target_file, item["id"], )) updated += 1 logging.info( f"Serien-Zuordnung: {updated} Items fuer " f"'{detected_series}' -> '{tvdb_name}'" ) return {"ok": True, "updated": updated, "tvdb_name": tvdb_name} except Exception as e: logging.error(f"Serien-Zuordnung fehlgeschlagen: {e}") return {"error": str(e)} async def skip_series(self, job_id: int, detected_series: str) -> dict: """Ueberspringt alle Items mit gleichem detected_series.""" if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_items SET status = 'skipped', " "conflict_reason = 'Serie uebersprungen' " "WHERE import_job_id = %s " "AND LOWER(detected_series) = LOWER(%s) " "AND status IN ('pending', 'matched')", (job_id, detected_series) ) skipped = cur.rowcount logging.info( f"Serie uebersprungen: {skipped} Items " f"fuer '{detected_series}'" ) return {"ok": True, "skipped": skipped} except Exception as e: logging.error(f"Serie ueberspringen fehlgeschlagen: {e}") return {"error": str(e)} async def skip_item(self, item_id: int) -> bool: """Markiert ein Item als uebersprungen""" if not self._db_pool: return False try: async with self._db_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "UPDATE import_items SET status = 'skipped', " "conflict_reason = 'Manuell uebersprungen' " "WHERE id = %s", (item_id,) ) return True except Exception: return False async def get_all_jobs(self) -> list: """Liste aller Import-Jobs (neueste zuerst)""" if not self._db_pool: return [] try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT id, source_path, status, total_files, " "processed_files, created_at FROM import_jobs " "ORDER BY id DESC LIMIT 20" ) jobs = await cur.fetchall() return [self._serialize(j) for j in jobs] except Exception as e: logging.error(f"Import-Jobs laden fehlgeschlagen: {e}") return [] async def get_job_status(self, job_id: int) -> dict: """Status eines Import-Jobs mit allen Items""" if not self._db_pool: return {"error": "Keine DB-Verbindung"} try: async with self._db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute( "SELECT * FROM import_jobs WHERE id = %s", (job_id,) ) job = await cur.fetchone() if not job: return {"error": "Job nicht gefunden"} await cur.execute( "SELECT * FROM import_items " "WHERE import_job_id = %s ORDER BY source_file", (job_id,) ) items = await cur.fetchall() # Bei abgeschlossenen Jobs: Importierte Serien-Ordner sammeln imported_series = [] if job.get("status") in ("done", "error"): series_folders = set() for item in items: if item.get("status") == "done" and item.get("target_path"): series_folders.add(item["target_path"]) imported_series = list(series_folders) return { "job": self._serialize(job), "items": [self._serialize(i) for i in items], "imported_series": imported_series, } except Exception as e: return {"error": str(e)} @staticmethod def _serialize(row: dict) -> dict: """Dict JSON-kompatibel machen""" result = {} for k, v in row.items(): if hasattr(v, "isoformat"): result[k] = str(v) else: result[k] = v return result @staticmethod def _fmt_size(b: int) -> str: """Bytes menschenlesbar""" for u in ("B", "KiB", "MiB", "GiB"): if b < 1024: return f"{b:.1f} {u}" b /= 1024 return f"{b:.1f} TiB"