- TV Admin-Center (/tv-admin): HLS-Settings, Session-Monitoring, User-Verwaltung - HLS-Streaming: ffmpeg .ts-Segmente, hls.js, GPU VAAPI, SIGSTOP/SIGCONT - Startseite: Rubriken (Weiterschauen, Neu, Serien, Filme, Schon gesehen) - User-Settings: Startseiten-Rubriken konfigurierbar, Watch-Threshold - UI: Amber/Gold Accent-Farbe, Focus-Ring-Fix, Player-Buttons einheitlich - Cache-Busting: ?v= Timestamp auf allen CSS/JS Includes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1611 lines
66 KiB
Python
1611 lines
66 KiB
Python
"""Import-Service: Videos erkennen, TVDB-Match, umbenennen, einsortieren"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiomysql
|
|
|
|
from app.config import Config
|
|
from app.services.library import (
|
|
LibraryService, VIDEO_EXTENSIONS,
|
|
RE_SXXEXX_MULTI, RE_XXxXX_MULTI
|
|
)
|
|
from app.services.tvdb import TVDBService
|
|
from app.services.probe import ProbeService
|
|
|
|
# Serienname aus Dateiname extrahieren (alles vor SxxExx)
|
|
RE_SERIES_FROM_NAME = re.compile(
|
|
r'^(.+?)[\s._-]+[Ss]\d{1,2}[Ee]\d{1,3}', re.IGNORECASE
|
|
)
|
|
RE_SERIES_FROM_XXx = re.compile(
|
|
r'^(.+?)[\s._-]+\d{1,2}x\d{2,3}', re.IGNORECASE
|
|
)
|
|
# "Serienname - Staffel X" oder "Serienname Season X" in Ordnernamen
|
|
RE_STAFFEL_DIR = re.compile(
|
|
r'^(.+?)[\s._-]+(?:Staffel|Season)\s*(\d{1,2})\s*$', re.IGNORECASE
|
|
)
|
|
|
|
|
|
class ImporterService:
|
|
"""Video-Import: Erkennung, TVDB-Matching, Umbenennung, Kopieren/Verschieben"""
|
|
|
|
def __init__(self, config: Config, library_service: LibraryService,
|
|
tvdb_service: TVDBService):
|
|
self.config = config
|
|
self.library = library_service
|
|
self.tvdb = tvdb_service
|
|
self._db_pool: Optional[aiomysql.Pool] = None
|
|
self.ws_manager = None
|
|
|
|
def set_db_pool(self, pool: aiomysql.Pool) -> None:
|
|
self._db_pool = pool
|
|
|
|
def set_ws_manager(self, ws_manager) -> None:
|
|
"""WebSocket-Manager fuer Live-Updates"""
|
|
self.ws_manager = ws_manager
|
|
|
|
async def _broadcast_import(self, job_id: int, status: str,
|
|
processed: int = 0, total: int = 0,
|
|
current_file: str = "",
|
|
bytes_done: int = 0,
|
|
bytes_total: int = 0) -> None:
|
|
"""Sendet Import-Fortschritt per WebSocket"""
|
|
if not self.ws_manager:
|
|
return
|
|
await self.ws_manager.broadcast({
|
|
"data_import": {
|
|
"job_id": job_id,
|
|
"status": status,
|
|
"processed": processed,
|
|
"total": total,
|
|
"current_file": current_file,
|
|
"bytes_done": bytes_done,
|
|
"bytes_total": bytes_total,
|
|
}
|
|
})
|
|
|
|
@property
|
|
def _naming_pattern(self) -> str:
|
|
return self.config.settings.get("library", {}).get(
|
|
"import_naming_pattern",
|
|
"{series} - S{season:02d}E{episode:02d} - {title}.{ext}"
|
|
)
|
|
|
|
@property
|
|
def _season_pattern(self) -> str:
|
|
return self.config.settings.get("library", {}).get(
|
|
"import_season_pattern", "Season {season:02d}"
|
|
)
|
|
|
|
# === DB-Tabellen erstellen ===
|
|
|
|
async def init_db(self) -> None:
|
|
"""Import-Tabellen erstellen"""
|
|
if not self._db_pool:
|
|
return
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS import_jobs (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
source_path VARCHAR(1024) NOT NULL,
|
|
target_library_id INT NOT NULL,
|
|
status ENUM('pending','analyzing','pending_assignment',
|
|
'ready','importing','done','error')
|
|
DEFAULT 'pending',
|
|
mode ENUM('copy','move') DEFAULT 'copy',
|
|
naming_pattern VARCHAR(256),
|
|
season_pattern VARCHAR(256),
|
|
total_files INT DEFAULT 0,
|
|
processed_files INT DEFAULT 0,
|
|
overwrite_all TINYINT(1) DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (target_library_id)
|
|
REFERENCES library_paths(id)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
""")
|
|
|
|
await cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS import_items (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
import_job_id INT NOT NULL,
|
|
source_file VARCHAR(1024) NOT NULL,
|
|
source_size BIGINT NOT NULL DEFAULT 0,
|
|
source_duration DOUBLE NULL,
|
|
detected_series VARCHAR(256),
|
|
detected_season INT,
|
|
detected_episode INT,
|
|
detected_episode_end INT NULL,
|
|
tvdb_series_id INT NULL,
|
|
tvdb_series_name VARCHAR(256),
|
|
tvdb_episode_title VARCHAR(512),
|
|
target_path VARCHAR(1024),
|
|
target_filename VARCHAR(512),
|
|
status ENUM('pending','pending_series','matched',
|
|
'conflict','skipped','done','error')
|
|
DEFAULT 'pending',
|
|
conflict_reason VARCHAR(512) NULL,
|
|
existing_file_path VARCHAR(1024) NULL,
|
|
existing_file_size BIGINT NULL,
|
|
user_action ENUM('overwrite','skip','rename') NULL,
|
|
FOREIGN KEY (import_job_id)
|
|
REFERENCES import_jobs(id) ON DELETE CASCADE,
|
|
INDEX idx_job (import_job_id)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
""")
|
|
logging.info("Import-Tabellen initialisiert")
|
|
|
|
# Migration: detected_episode_end Spalte hinzufuegen
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
try:
|
|
await cur.execute(
|
|
"ALTER TABLE import_items "
|
|
"ADD COLUMN detected_episode_end INT NULL "
|
|
"AFTER detected_episode"
|
|
)
|
|
logging.info("Import: Spalte detected_episode_end hinzugefuegt")
|
|
except Exception:
|
|
pass # Spalte existiert bereits
|
|
|
|
except Exception as e:
|
|
logging.error(f"Import-Tabellen erstellen fehlgeschlagen: {e}")
|
|
|
|
# === Job-Verwaltung ===
|
|
|
|
async def create_job(self, source_path: str,
|
|
target_library_id: int,
|
|
mode: str = 'copy') -> Optional[int]:
|
|
"""Erstellt einen Import-Job und sucht Video-Dateien im Quellordner"""
|
|
if not self._db_pool:
|
|
return None
|
|
if not os.path.isdir(source_path):
|
|
return None
|
|
if mode not in ('copy', 'move'):
|
|
mode = 'copy'
|
|
|
|
# Video-Dateien im Quellordner finden
|
|
videos = []
|
|
for root, dirs, files in os.walk(source_path):
|
|
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
|
for f in sorted(files):
|
|
ext = os.path.splitext(f)[1].lower()
|
|
if ext in VIDEO_EXTENSIONS:
|
|
videos.append(os.path.join(root, f))
|
|
|
|
if not videos:
|
|
return None
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"INSERT INTO import_jobs "
|
|
"(source_path, target_library_id, status, mode, "
|
|
"naming_pattern, season_pattern, total_files) "
|
|
"VALUES (%s, %s, 'pending', %s, %s, %s, %s)",
|
|
(source_path, target_library_id, mode,
|
|
self._naming_pattern, self._season_pattern,
|
|
len(videos))
|
|
)
|
|
job_id = cur.lastrowid
|
|
|
|
# Items einfuegen
|
|
for vf in videos:
|
|
try:
|
|
size = os.path.getsize(vf)
|
|
except OSError:
|
|
size = 0
|
|
await cur.execute(
|
|
"INSERT INTO import_items "
|
|
"(import_job_id, source_file, source_size) "
|
|
"VALUES (%s, %s, %s)",
|
|
(job_id, vf, size)
|
|
)
|
|
|
|
logging.info(
|
|
f"Import-Job erstellt: {job_id} "
|
|
f"({len(videos)} Videos aus {source_path})"
|
|
)
|
|
return job_id
|
|
except Exception as e:
|
|
logging.error(f"Import-Job erstellen fehlgeschlagen: {e}")
|
|
return None
|
|
|
|
async def delete_job(self, job_id: int) -> dict:
|
|
"""Loescht einen Import-Job (nur wenn nicht gerade importiert wird)"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"SELECT status FROM import_jobs WHERE id = %s",
|
|
(job_id,)
|
|
)
|
|
row = await cur.fetchone()
|
|
if not row:
|
|
return {"error": "Job nicht gefunden"}
|
|
if row[0] == 'importing':
|
|
return {"error": "Job laeuft gerade, kann nicht geloescht werden"}
|
|
# Items werden per ON DELETE CASCADE mitgeloescht
|
|
await cur.execute(
|
|
"DELETE FROM import_jobs WHERE id = %s", (job_id,)
|
|
)
|
|
logging.info(f"Import-Job {job_id} geloescht")
|
|
return {"message": f"Import-Job {job_id} geloescht"}
|
|
except Exception as e:
|
|
logging.error(f"Import-Job {job_id} loeschen fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def analyze_job(self, job_id: int) -> dict:
|
|
"""Phase 1: Nur Serie/Staffel/Episode aus Dateinamen extrahieren.
|
|
|
|
KEIN TVDB-Lookup, KEINE Zielpfade, KEINE Konflikt-Pruefung.
|
|
Das passiert erst nach der Serien-Zuordnung durch den User.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
# Job laden
|
|
await cur.execute(
|
|
"SELECT * FROM import_jobs WHERE id = %s", (job_id,)
|
|
)
|
|
job = await cur.fetchone()
|
|
if not job:
|
|
return {"error": "Job nicht gefunden"}
|
|
|
|
# Status auf analyzing
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET status = 'analyzing' "
|
|
"WHERE id = %s", (job_id,)
|
|
)
|
|
|
|
# Items laden
|
|
await cur.execute(
|
|
"SELECT * FROM import_items "
|
|
"WHERE import_job_id = %s ORDER BY source_file",
|
|
(job_id,)
|
|
)
|
|
items = await cur.fetchall()
|
|
|
|
# Jedes Item analysieren (NUR Dateiname-Erkennung)
|
|
total = len(items)
|
|
for idx, item in enumerate(items):
|
|
await self._analyze_item_basic(item)
|
|
await self._broadcast_import(
|
|
job_id, "analyzing",
|
|
processed=idx + 1, total=total,
|
|
current_file=os.path.basename(
|
|
item.get("source_file", "")
|
|
),
|
|
)
|
|
|
|
# Status auf 'pending_assignment' - wartet auf Serien-Zuordnung
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET status = 'pending_assignment' "
|
|
"WHERE id = %s", (job_id,)
|
|
)
|
|
|
|
return await self.get_job_status(job_id)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Import-Analyse fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def _analyze_item_basic(self, item: dict) -> None:
|
|
"""Phase 1: Nur Serie/Staffel/Episode aus Dateinamen extrahieren."""
|
|
filename = os.path.basename(item["source_file"])
|
|
|
|
# Groessen-Check: Zu kleine Dateien als Sample/Trailer markieren
|
|
if item["source_size"] < self.MIN_EPISODE_SIZE:
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("""
|
|
UPDATE import_items SET
|
|
status = 'skipped',
|
|
conflict_reason = %s
|
|
WHERE id = %s
|
|
""", (
|
|
f"Vermutlich Sample/Trailer "
|
|
f"({self._fmt_size(item['source_size'])})",
|
|
item["id"],
|
|
))
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Serie/Staffel/Episode erkennen (Dateiname + Ordnername)
|
|
info = self._detect_series_info(item["source_file"])
|
|
series_name = info.get("series", "")
|
|
season = info.get("season")
|
|
episode = info.get("episode")
|
|
episode_end = info.get("episode_end")
|
|
|
|
# Status: pending_series wenn Serie erkannt, sonst pending
|
|
if series_name and season and episode:
|
|
status = "pending_series" # Wartet auf Serien-Zuordnung
|
|
else:
|
|
status = "pending" # Nicht erkannt
|
|
|
|
# In DB aktualisieren (NUR erkannte Werte, KEIN Zielpfad)
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("""
|
|
UPDATE import_items SET
|
|
detected_series = %s,
|
|
detected_season = %s,
|
|
detected_episode = %s,
|
|
detected_episode_end = %s,
|
|
status = %s,
|
|
conflict_reason = %s
|
|
WHERE id = %s
|
|
""", (
|
|
series_name, season, episode, episode_end,
|
|
status,
|
|
None if status == "pending_series"
|
|
else "Serie/Staffel/Episode nicht erkannt",
|
|
item["id"],
|
|
))
|
|
except Exception as e:
|
|
logging.error(f"Import-Item analysieren fehlgeschlagen: {e}")
|
|
|
|
async def get_pending_series(self, job_id: int) -> dict:
|
|
"""Gibt alle erkannten Serien zurueck, die noch zugeordnet werden muessen.
|
|
|
|
Gruppiert nach detected_series mit Anzahl der Dateien.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute("""
|
|
SELECT detected_series, COUNT(*) as count,
|
|
MIN(detected_season) as min_season,
|
|
MAX(detected_season) as max_season
|
|
FROM import_items
|
|
WHERE import_job_id = %s
|
|
AND status = 'pending_series'
|
|
AND detected_series IS NOT NULL
|
|
GROUP BY detected_series
|
|
ORDER BY detected_series
|
|
""", (job_id,))
|
|
rows = await cur.fetchall()
|
|
|
|
series_list = []
|
|
for row in rows:
|
|
series_list.append({
|
|
"detected_name": row["detected_series"],
|
|
"count": row["count"],
|
|
"seasons": f"{row['min_season']}-{row['max_season']}"
|
|
if row["min_season"] != row["max_season"]
|
|
else str(row["min_season"]),
|
|
})
|
|
|
|
return {"series": series_list}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Pending Series laden fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def assign_series_mapping(self, job_id: int, detected_series: str,
|
|
tvdb_id: int, tvdb_name: str) -> dict:
|
|
"""Ordnet eine erkannte Serie einer TVDB-Serie zu.
|
|
|
|
Berechnet Zielpfade und prueft Konflikte fuer alle Dateien dieser Serie.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
# Job + Library-Pfad laden
|
|
await cur.execute(
|
|
"SELECT j.*, lp.path as lib_path "
|
|
"FROM import_jobs j "
|
|
"JOIN library_paths lp ON lp.id = j.target_library_id "
|
|
"WHERE j.id = %s", (job_id,)
|
|
)
|
|
job = await cur.fetchone()
|
|
if not job:
|
|
return {"error": "Job nicht gefunden"}
|
|
|
|
# Alle Items mit diesem detected_series
|
|
await cur.execute(
|
|
"SELECT * FROM import_items "
|
|
"WHERE import_job_id = %s "
|
|
"AND LOWER(detected_series) = LOWER(%s) "
|
|
"AND status = 'pending_series'",
|
|
(job_id, detected_series)
|
|
)
|
|
items = await cur.fetchall()
|
|
|
|
if not items:
|
|
return {"error": f"Keine Items fuer '{detected_series}'"}
|
|
|
|
# Jedes Item: Zielpfad berechnen, Episodentitel holen, Konflikte pruefen
|
|
updated = 0
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
for item in items:
|
|
season = item["detected_season"]
|
|
episode = item["detected_episode"]
|
|
episode_end = item.get("detected_episode_end")
|
|
|
|
# Episodentitel von TVDB
|
|
tvdb_ep_title = ""
|
|
if tvdb_id and season and episode and self.tvdb.is_configured:
|
|
tvdb_ep_title = await self._get_episode_title(
|
|
int(tvdb_id), season, episode
|
|
)
|
|
|
|
# Zielpfad berechnen
|
|
ext = os.path.splitext(item["source_file"])[1].lstrip(".")
|
|
pattern = job.get("naming_pattern") or self._naming_pattern
|
|
season_pat = job.get("season_pattern") or self._season_pattern
|
|
target_dir, target_file = self._build_target(
|
|
tvdb_name, season, episode,
|
|
tvdb_ep_title, ext,
|
|
job["lib_path"],
|
|
pattern, season_pat,
|
|
episode_end=episode_end
|
|
)
|
|
target_path = os.path.join(target_dir, target_file)
|
|
|
|
# Konflikt pruefen
|
|
status = "matched"
|
|
conflict = None
|
|
existing_path = None
|
|
existing_size = None
|
|
|
|
if os.path.exists(target_path):
|
|
existing_path = target_path
|
|
existing_size = os.path.getsize(target_path)
|
|
source_size = item["source_size"]
|
|
|
|
if source_size and existing_size:
|
|
diff_pct = abs(source_size - existing_size) / max(
|
|
existing_size, 1
|
|
) * 100
|
|
if diff_pct > 20:
|
|
conflict = (
|
|
f"Datei existiert "
|
|
f"(Quelle: {self._fmt_size(source_size)}, "
|
|
f"Ziel: {self._fmt_size(existing_size)}, "
|
|
f"Diff: {diff_pct:.0f}%)"
|
|
)
|
|
else:
|
|
conflict = "Datei existiert (aehnliche Groesse)"
|
|
else:
|
|
conflict = "Datei existiert bereits"
|
|
status = "conflict"
|
|
|
|
await cur.execute("""
|
|
UPDATE import_items SET
|
|
tvdb_series_id = %s,
|
|
tvdb_series_name = %s,
|
|
tvdb_episode_title = %s,
|
|
target_path = %s,
|
|
target_filename = %s,
|
|
status = %s,
|
|
conflict_reason = %s,
|
|
existing_file_path = %s,
|
|
existing_file_size = %s
|
|
WHERE id = %s
|
|
""", (
|
|
tvdb_id, tvdb_name, tvdb_ep_title,
|
|
target_dir, target_file, status,
|
|
conflict, existing_path, existing_size,
|
|
item["id"],
|
|
))
|
|
updated += 1
|
|
|
|
# Pruefen ob noch pending_series Items uebrig sind
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"SELECT COUNT(*) FROM import_items "
|
|
"WHERE import_job_id = %s AND status = 'pending_series'",
|
|
(job_id,)
|
|
)
|
|
remaining = (await cur.fetchone())[0]
|
|
|
|
# Wenn alle Serien zugeordnet -> Status auf ready
|
|
if remaining == 0:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET status = 'ready' "
|
|
"WHERE id = %s", (job_id,)
|
|
)
|
|
|
|
logging.info(
|
|
f"Serien-Zuordnung: {updated} Items fuer "
|
|
f"'{detected_series}' -> '{tvdb_name}' (TVDB: {tvdb_id})"
|
|
)
|
|
return {
|
|
"ok": True,
|
|
"updated": updated,
|
|
"tvdb_name": tvdb_name,
|
|
"remaining_series": remaining,
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Serien-Zuordnung fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
# Mindestgroesse fuer "echte" Episoden (darunter = Sample/Trailer)
|
|
MIN_EPISODE_SIZE = 100 * 1024 * 1024 # 100 MiB
|
|
|
|
def _detect_series_info(self, file_path: str) -> dict:
|
|
"""Extrahiert Serienname, Staffel, Episode.
|
|
|
|
Versucht zuerst den Dateinamen, dann den uebergeordneten
|
|
Ordnernamen als Fallback. Der Ordnername ist oft zuverlaessiger
|
|
bei Release-Gruppen-Prefixes (z.B. 'tlr-24.s07e01.mkv' vs
|
|
Ordner '24.S07E01.German.DL.1080p.Bluray.x264-TLR').
|
|
Erkennt auch 'Serienname - Staffel X'-Ordner (haeufig bei DE-Medien).
|
|
"""
|
|
filename = os.path.basename(file_path)
|
|
parent_dir = os.path.basename(os.path.dirname(file_path))
|
|
grandparent_dir = os.path.basename(
|
|
os.path.dirname(os.path.dirname(file_path))
|
|
)
|
|
|
|
# Beide Quellen versuchen
|
|
info_file = self._parse_name(filename)
|
|
info_dir = self._parse_name(parent_dir)
|
|
|
|
# Strategie: Ordnername bevorzugen bei Scene-Releases.
|
|
# Scene-Ordner: "24.S07E01.German.DL.1080p-TLR" -> Serie="24"
|
|
# Scene-Datei: "tlr-24.s07e01.1080p.mkv" -> Serie="tlr-24"
|
|
# Ordnername hat Serienname vorne, Dateiname oft Release-Tag vorne
|
|
|
|
# Ordnername hat S/E -> bevorzugen (hat meist korrekten Seriennamen)
|
|
if info_dir.get("season") and info_dir.get("episode"):
|
|
if info_dir.get("series"):
|
|
return info_dir
|
|
# Ordner hat S/E aber keinen Namen -> Dateiname nehmen
|
|
if info_file.get("series"):
|
|
info_dir["series"] = info_file["series"]
|
|
return info_dir
|
|
|
|
# "Staffel X" / "Season X" Pattern im Ordnernamen
|
|
# z.B. "24 - Staffel 6" -> Serie="24", Staffel=6
|
|
# Episode kommt dann aus dem Dateinamen
|
|
staffel_info = self._parse_staffel_dir(parent_dir)
|
|
if not staffel_info:
|
|
staffel_info = self._parse_staffel_dir(grandparent_dir)
|
|
if staffel_info and info_file.get("episode"):
|
|
return {
|
|
"series": staffel_info["series"],
|
|
"season": staffel_info["season"],
|
|
"episode": info_file["episode"],
|
|
"episode_end": info_file.get("episode_end"),
|
|
}
|
|
|
|
# Dateiname hat S/E
|
|
if info_file.get("season") and info_file.get("episode"):
|
|
# Ordner-Serienname als Fallback wenn Datei keinen hat
|
|
if not info_file.get("series") and info_dir.get("series"):
|
|
info_file["series"] = info_dir["series"]
|
|
return info_file
|
|
|
|
return info_file
|
|
|
|
async def _get_cached_tvdb_match(self, series_name: str) -> Optional[dict]:
|
|
"""Prueft ob diese Serie schon in library_series mit TVDB-ID existiert."""
|
|
if not self._db_pool:
|
|
return None
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(
|
|
"SELECT tvdb_id, title FROM library_series "
|
|
"WHERE tvdb_id IS NOT NULL "
|
|
"AND (LOWER(folder_name) = LOWER(%s) "
|
|
" OR LOWER(title) = LOWER(%s))",
|
|
(series_name, series_name)
|
|
)
|
|
row = await cur.fetchone()
|
|
if row:
|
|
return {
|
|
"tvdb_id": row["tvdb_id"],
|
|
"name": row["title"],
|
|
}
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_staffel_dir(dir_name: str) -> Optional[dict]:
|
|
"""Erkennt 'Serienname - Staffel X' Pattern in Ordnernamen"""
|
|
m = RE_STAFFEL_DIR.match(dir_name)
|
|
if m:
|
|
series = m.group(1).replace(".", " ").replace("_", " ").strip()
|
|
series = re.sub(r'\s+', ' ', series).rstrip(" -")
|
|
if series:
|
|
return {"series": series, "season": int(m.group(2))}
|
|
return None
|
|
|
|
def _parse_name(self, name: str) -> dict:
|
|
"""Extrahiert Serienname, Staffel, Episode aus einem Namen.
|
|
Unterstuetzt Doppelfolgen: S09E19E20, S01E01-E02, 1x01-02"""
|
|
result = {"series": "", "season": None, "episode": None,
|
|
"episode_end": None}
|
|
name_no_ext = os.path.splitext(name)[0]
|
|
|
|
# S01E02 / Doppelfolge S01E01E02 Format
|
|
m = RE_SXXEXX_MULTI.search(name)
|
|
if m:
|
|
result["season"] = int(m.group(1))
|
|
result["episode"] = int(m.group(2))
|
|
if m.group(3):
|
|
result["episode_end"] = int(m.group(3))
|
|
sm = RE_SERIES_FROM_NAME.match(name_no_ext)
|
|
if sm:
|
|
result["series"] = self._clean_name(sm.group(1))
|
|
return result
|
|
|
|
# 1x02 / Doppelfolge 1x01-02 Format
|
|
m = RE_XXxXX_MULTI.search(name)
|
|
if m:
|
|
result["season"] = int(m.group(1))
|
|
result["episode"] = int(m.group(2))
|
|
if m.group(3):
|
|
result["episode_end"] = int(m.group(3))
|
|
sm = RE_SERIES_FROM_XXx.match(name_no_ext)
|
|
if sm:
|
|
result["series"] = self._clean_name(sm.group(1))
|
|
return result
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _clean_name(name: str) -> str:
|
|
"""Bereinigt Seriennamen: Punkte/Underscores durch Leerzeichen"""
|
|
name = name.replace(".", " ").replace("_", " ")
|
|
# Mehrfach-Leerzeichen reduzieren
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
# Trailing Bindestriche entfernen
|
|
name = name.rstrip(" -")
|
|
return name
|
|
|
|
def _build_target(self, series: str, season: Optional[int],
|
|
episode: Optional[int], title: str, ext: str,
|
|
lib_path: str, pattern: str,
|
|
season_pattern: str,
|
|
episode_end: Optional[int] = None) -> tuple[str, str]:
|
|
"""Baut Ziel-Ordner und Dateiname nach Pattern.
|
|
Unterstuetzt Doppelfolgen via episode_end."""
|
|
s = season or 1
|
|
e = episode or 0
|
|
|
|
# Season-Ordner
|
|
season_dir = season_pattern.format(season=s)
|
|
|
|
# Episode-Teil: S01E02 oder S01E02E03 bei Doppelfolgen
|
|
ep_str = f"S{s:02d}E{e:02d}"
|
|
if episode_end and episode_end != e:
|
|
ep_str += f"E{episode_end:02d}"
|
|
|
|
# Dateiname - kein Titel: ohne Titel-Teil, sonst mit
|
|
try:
|
|
if title:
|
|
filename = pattern.format(
|
|
series=series, season=s, episode=e,
|
|
title=title, ext=ext
|
|
)
|
|
# Doppelfolge: E-Teil im generierten Namen ersetzen
|
|
if episode_end and episode_end != e:
|
|
filename = filename.replace(
|
|
f"S{s:02d}E{e:02d}", ep_str, 1
|
|
)
|
|
else:
|
|
filename = f"{series} - {ep_str}.{ext}"
|
|
except (KeyError, ValueError):
|
|
if title:
|
|
filename = f"{series} - {ep_str} - {title}.{ext}"
|
|
else:
|
|
filename = f"{series} - {ep_str}.{ext}"
|
|
|
|
# Ungueltige Zeichen entfernen
|
|
for ch in ['<', '>', ':', '"', '|', '?', '*']:
|
|
filename = filename.replace(ch, '')
|
|
series = series.replace(ch, '')
|
|
|
|
target_dir = os.path.join(lib_path, series, season_dir)
|
|
return target_dir, filename
|
|
|
|
async def _get_episode_title(self, tvdb_id: int,
|
|
season: int, episode: int) -> str:
|
|
"""Episodentitel aus TVDB-Cache oder API holen"""
|
|
if not self._db_pool:
|
|
return ""
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
# Zuerst Cache pruefen
|
|
await cur.execute(
|
|
"SELECT episode_name FROM tvdb_episode_cache "
|
|
"WHERE series_tvdb_id = %s "
|
|
"AND season_number = %s "
|
|
"AND episode_number = %s",
|
|
(tvdb_id, season, episode)
|
|
)
|
|
row = await cur.fetchone()
|
|
if row and row[0]:
|
|
return row[0]
|
|
except Exception:
|
|
pass
|
|
|
|
# Cache leer -> Episoden von TVDB laden
|
|
episodes = await self.tvdb.fetch_episodes(tvdb_id)
|
|
for ep in episodes:
|
|
if ep["season_number"] == season and ep["episode_number"] == episode:
|
|
return ep.get("episode_name", "")
|
|
return ""
|
|
|
|
async def execute_import(self, job_id: int) -> dict:
|
|
"""Fuehrt den Import aus (Kopieren/Verschieben + TVDB-Link)"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(
|
|
"SELECT * FROM import_jobs WHERE id = %s", (job_id,)
|
|
)
|
|
job = await cur.fetchone()
|
|
if not job:
|
|
return {"error": "Job nicht gefunden"}
|
|
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET status = 'importing' "
|
|
"WHERE id = %s", (job_id,)
|
|
)
|
|
|
|
# Nur Items mit status matched oder conflict+overwrite
|
|
await cur.execute(
|
|
"SELECT * FROM import_items "
|
|
"WHERE import_job_id = %s "
|
|
"AND (status = 'matched' "
|
|
" OR (status = 'conflict' "
|
|
" AND user_action = 'overwrite'))",
|
|
(job_id,)
|
|
)
|
|
items = await cur.fetchall()
|
|
|
|
done = 0
|
|
errors = 0
|
|
mode = job.get("mode", "copy")
|
|
|
|
# TVDB-IDs sammeln fuer spaetere Verknuepfung
|
|
tvdb_links = {} # series_name -> tvdb_id
|
|
|
|
total_items = len(items)
|
|
for item in items:
|
|
# WS-Broadcast VOR dem Kopieren
|
|
await self._broadcast_import(
|
|
job_id, "importing",
|
|
processed=done + errors, total=total_items,
|
|
current_file=os.path.basename(
|
|
item.get("source_file", "")
|
|
),
|
|
bytes_total=item.get("source_size", 0),
|
|
)
|
|
|
|
ok = await self._process_item(
|
|
item, mode, job_id, done + errors, total_items
|
|
)
|
|
if ok:
|
|
done += 1
|
|
# TVDB-Link merken
|
|
if item.get("tvdb_series_id") and item.get("tvdb_series_name"):
|
|
tvdb_links[item["tvdb_series_name"]] = item["tvdb_series_id"]
|
|
else:
|
|
errors += 1
|
|
|
|
# Progress updaten
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET processed_files = %s "
|
|
"WHERE id = %s", (done + errors, job_id)
|
|
)
|
|
|
|
# WS-Broadcast NACH dem Kopieren
|
|
await self._broadcast_import(
|
|
job_id, "importing",
|
|
processed=done + errors, total=total_items,
|
|
)
|
|
|
|
# Job abschliessen
|
|
final_status = "done" if errors == 0 else "error"
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET status = %s "
|
|
"WHERE id = %s", (final_status, job_id)
|
|
)
|
|
|
|
# TVDB-Zuordnungen in library_series uebernehmen
|
|
linked_series = 0
|
|
if tvdb_links:
|
|
linked_series = await self._link_tvdb_to_series(tvdb_links)
|
|
|
|
# Finaler WS-Broadcast
|
|
await self._broadcast_import(
|
|
job_id, final_status,
|
|
processed=done + errors, total=total_items,
|
|
)
|
|
|
|
# Auto-Scan: Ziel-Bibliothek aktualisieren
|
|
target_lib_id = job.get("target_library_id")
|
|
if target_lib_id and self.library:
|
|
logging.info(
|
|
f"Import abgeschlossen - starte Auto-Scan "
|
|
f"fuer Bibliothek {target_lib_id}"
|
|
)
|
|
await self.library.scan_single_path(target_lib_id)
|
|
|
|
return {
|
|
"done": done,
|
|
"errors": errors,
|
|
"tvdb_linked": linked_series,
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Import ausfuehren fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def _link_tvdb_to_series(self, tvdb_links: dict) -> int:
|
|
"""Verknuepft importierte Serien mit TVDB in library_series"""
|
|
if not self._db_pool or not self.tvdb:
|
|
return 0
|
|
|
|
linked = 0
|
|
for series_name, tvdb_id in tvdb_links.items():
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
# Serie in library_series finden (nach Namen)
|
|
await cur.execute(
|
|
"SELECT id, tvdb_id FROM library_series "
|
|
"WHERE (folder_name = %s OR title = %s) "
|
|
"AND tvdb_id IS NULL "
|
|
"LIMIT 1",
|
|
(series_name, series_name)
|
|
)
|
|
row = await cur.fetchone()
|
|
if row:
|
|
series_id = row[0]
|
|
# TVDB-Daten laden und verknuepfen
|
|
result = await self.tvdb.match_and_update_series(
|
|
series_id, int(tvdb_id), self.library
|
|
)
|
|
if not result.get("error"):
|
|
linked += 1
|
|
logging.info(
|
|
f"Import: TVDB verknuepft - "
|
|
f"{series_name} -> {tvdb_id}"
|
|
)
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"TVDB-Link fehlgeschlagen fuer {series_name}: {e}"
|
|
)
|
|
|
|
return linked
|
|
|
|
async def _process_item(self, item: dict, mode: str,
|
|
job_id: int = 0,
|
|
item_processed: int = 0,
|
|
item_total: int = 0) -> bool:
|
|
"""Einzelnes Item importieren (kopieren/verschieben + Metadaten)"""
|
|
src = item["source_file"]
|
|
target_dir = item["target_path"]
|
|
target_file = item["target_filename"]
|
|
|
|
if not target_dir or not target_file:
|
|
await self._update_item_status(item["id"], "error")
|
|
return False
|
|
|
|
target = os.path.join(target_dir, target_file)
|
|
src_size = item.get("source_size", 0) or os.path.getsize(src)
|
|
|
|
try:
|
|
# Zielordner erstellen
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
# Alte Dateien fuer dieselbe Episode aufraeumen
|
|
# (z.B. "S01E03 - Unbekannt.mkv" wenn jetzt "S01E03 - Willkür.mkv" kommt)
|
|
season = item.get("detected_season")
|
|
episode = item.get("detected_episode")
|
|
if season is not None and episode is not None and os.path.isdir(target_dir):
|
|
ep_pattern = f"S{season:02d}E{episode:02d}"
|
|
for existing in os.listdir(target_dir):
|
|
existing_path = os.path.join(target_dir, existing)
|
|
if (existing != target_file
|
|
and ep_pattern in existing
|
|
and os.path.isfile(existing_path)):
|
|
logging.info(
|
|
f"Import: Alte Episode-Datei entfernt: {existing}"
|
|
)
|
|
os.remove(existing_path)
|
|
# Auch aus library_videos loeschen
|
|
if self._db_pool:
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"DELETE FROM library_videos "
|
|
"WHERE file_path = %s",
|
|
(existing_path,)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Fortschritt-Tracking in DB setzen
|
|
if job_id and self._db_pool:
|
|
await self._update_file_progress(
|
|
job_id, target_file, 0, src_size
|
|
)
|
|
|
|
if mode == "move":
|
|
shutil.move(src, target)
|
|
# Bei Move sofort fertig
|
|
if job_id and self._db_pool:
|
|
await self._update_file_progress(
|
|
job_id, target_file, src_size, src_size
|
|
)
|
|
else:
|
|
# Kopieren mit Fortschritt
|
|
await self._copy_with_progress(
|
|
src, target, job_id, target_file, src_size,
|
|
item_processed, item_total
|
|
)
|
|
|
|
logging.info(
|
|
f"Import: {os.path.basename(src)} -> {target}"
|
|
)
|
|
|
|
# Metadaten in Datei einbetten (falls TVDB-Infos vorhanden)
|
|
if item.get("tvdb_series_name") or item.get("detected_series"):
|
|
# WS-Broadcast: Metadaten-Phase anzeigen
|
|
await self._broadcast_import(
|
|
job_id, "embedding",
|
|
processed=item_processed, total=item_total,
|
|
current_file=target_file,
|
|
)
|
|
await self._embed_metadata(target, item)
|
|
|
|
await self._update_item_status(item["id"], "done")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Import fehlgeschlagen: {src}: {e}")
|
|
await self._update_item_status(item["id"], "error")
|
|
return False
|
|
|
|
async def _embed_metadata(self, file_path: str, item: dict) -> bool:
|
|
"""Bettet Metadaten mit ffmpeg in die Datei ein"""
|
|
import asyncio
|
|
import tempfile
|
|
|
|
series_name = item.get("tvdb_series_name") or item.get("detected_series") or ""
|
|
season = item.get("detected_season") or 0
|
|
episode = item.get("detected_episode") or 0
|
|
episode_title = item.get("tvdb_episode_title") or ""
|
|
|
|
if not series_name:
|
|
return False
|
|
|
|
# Temporaere Ausgabedatei
|
|
base, ext = os.path.splitext(file_path)
|
|
temp_file = f"{base}_temp{ext}"
|
|
|
|
# ffmpeg Metadaten-Befehl
|
|
cmd = [
|
|
"ffmpeg", "-y", "-i", file_path,
|
|
"-map", "0",
|
|
"-c", "copy",
|
|
"-metadata", f"title={episode_title}" if episode_title else f"S{season:02d}E{episode:02d}",
|
|
"-metadata", f"show={series_name}",
|
|
"-metadata", f"season_number={season}",
|
|
"-metadata", f"episode_sort={episode}",
|
|
"-metadata", f"episode_id=S{season:02d}E{episode:02d}",
|
|
]
|
|
|
|
# Fuer MKV zusaetzliche Tags
|
|
if file_path.lower().endswith(".mkv"):
|
|
cmd.extend([
|
|
"-metadata:s:v:0", f"title={series_name} - S{season:02d}E{episode:02d}",
|
|
])
|
|
|
|
cmd.append(temp_file)
|
|
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await asyncio.wait_for(
|
|
process.communicate(), timeout=600 # 10 Min fuer grosse Dateien
|
|
)
|
|
|
|
if process.returncode == 0:
|
|
# Temporaere Datei ueber Original verschieben
|
|
os.replace(temp_file, file_path)
|
|
logging.info(f"Metadaten eingebettet: {os.path.basename(file_path)}")
|
|
return True
|
|
else:
|
|
logging.warning(
|
|
f"Metadaten einbetten fehlgeschlagen: "
|
|
f"{stderr.decode()[:200]}"
|
|
)
|
|
# Temp-Datei loeschen falls vorhanden
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
return False
|
|
|
|
except asyncio.TimeoutError:
|
|
logging.warning(f"Metadaten einbetten Timeout: {file_path}")
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
return False
|
|
except Exception as e:
|
|
logging.warning(f"Metadaten einbetten Fehler: {e}")
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
return False
|
|
|
|
async def _update_item_status(self, item_id: int,
|
|
status: str) -> None:
|
|
if not self._db_pool:
|
|
return
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_items SET status = %s "
|
|
"WHERE id = %s", (status, item_id)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _update_file_progress(self, job_id: int, filename: str,
|
|
bytes_done: int, bytes_total: int) -> None:
|
|
"""Aktualisiert Byte-Fortschritt fuer aktuelle Datei"""
|
|
if not self._db_pool:
|
|
return
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET "
|
|
"current_file_name = %s, "
|
|
"current_file_bytes = %s, "
|
|
"current_file_total = %s "
|
|
"WHERE id = %s",
|
|
(filename, bytes_done, bytes_total, job_id)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _copy_with_progress(self, src: str, dst: str,
|
|
job_id: int, filename: str,
|
|
total_size: int,
|
|
item_processed: int = 0,
|
|
item_total: int = 0) -> None:
|
|
"""Kopiert Datei mit Fortschritts-Updates in DB + WebSocket"""
|
|
chunk_size = 64 * 1024 * 1024 # 64 MB Chunks
|
|
bytes_copied = 0
|
|
last_update = 0
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
|
|
while True:
|
|
# Chunk lesen (in Thread um nicht zu blockieren)
|
|
chunk = await loop.run_in_executor(
|
|
None, fsrc.read, chunk_size
|
|
)
|
|
if not chunk:
|
|
break
|
|
|
|
# Chunk schreiben
|
|
await loop.run_in_executor(None, fdst.write, chunk)
|
|
bytes_copied += len(chunk)
|
|
|
|
# Progress alle 50 MB updaten (DB + WebSocket)
|
|
if bytes_copied - last_update >= 50 * 1024 * 1024:
|
|
await self._update_file_progress(
|
|
job_id, filename, bytes_copied, total_size
|
|
)
|
|
await self._broadcast_import(
|
|
job_id, "importing",
|
|
processed=item_processed,
|
|
total=item_total,
|
|
current_file=filename,
|
|
bytes_done=bytes_copied,
|
|
bytes_total=total_size,
|
|
)
|
|
last_update = bytes_copied
|
|
|
|
# Finales Update
|
|
await self._update_file_progress(
|
|
job_id, filename, total_size, total_size
|
|
)
|
|
|
|
# Metadaten kopieren (Zeitstempel etc.)
|
|
shutil.copystat(src, dst)
|
|
|
|
async def resolve_conflict(self, item_id: int,
|
|
action: str) -> bool:
|
|
"""Konflikt loesen: overwrite, skip, rename"""
|
|
if not self._db_pool or action not in ('overwrite', 'skip', 'rename'):
|
|
return False
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
if action == 'skip':
|
|
await cur.execute(
|
|
"UPDATE import_items SET status = 'skipped', "
|
|
"user_action = 'skip' WHERE id = %s",
|
|
(item_id,)
|
|
)
|
|
elif action == 'rename':
|
|
# Dateiname mit Suffix versehen
|
|
await cur.execute(
|
|
"SELECT target_filename FROM import_items "
|
|
"WHERE id = %s", (item_id,)
|
|
)
|
|
row = await cur.fetchone()
|
|
if row and row[0]:
|
|
name, ext = os.path.splitext(row[0])
|
|
new_name = f"{name}_neu{ext}"
|
|
await cur.execute(
|
|
"UPDATE import_items SET "
|
|
"target_filename = %s, "
|
|
"status = 'matched', "
|
|
"user_action = 'rename' "
|
|
"WHERE id = %s",
|
|
(new_name, item_id)
|
|
)
|
|
else: # overwrite
|
|
await cur.execute(
|
|
"UPDATE import_items SET user_action = 'overwrite' "
|
|
"WHERE id = %s", (item_id,)
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Konflikt loesen fehlgeschlagen: {e}")
|
|
return False
|
|
|
|
async def resolve_all_conflicts(self, job_id: int, action: str) -> dict:
|
|
"""Loest ALLE Konflikte eines Jobs auf einmal.
|
|
|
|
action: 'overwrite' oder 'skip'
|
|
"""
|
|
if not self._db_pool or action not in ('overwrite', 'skip'):
|
|
return {"error": "Ungueltige Aktion"}
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
if action == 'skip':
|
|
await cur.execute(
|
|
"UPDATE import_items SET status = 'skipped', "
|
|
"user_action = 'skip' "
|
|
"WHERE import_job_id = %s AND status = 'conflict' "
|
|
"AND user_action IS NULL",
|
|
(job_id,)
|
|
)
|
|
else: # overwrite
|
|
await cur.execute(
|
|
"UPDATE import_items SET user_action = 'overwrite' "
|
|
"WHERE import_job_id = %s AND status = 'conflict' "
|
|
"AND user_action IS NULL",
|
|
(job_id,)
|
|
)
|
|
updated = cur.rowcount
|
|
logging.info(f"Alle Konflikte geloest: {updated} Items -> {action}")
|
|
return {"ok": True, "updated": updated}
|
|
except Exception as e:
|
|
logging.error(f"Alle Konflikte loesen fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def set_overwrite_mode(self, job_id: int, overwrite: bool) -> dict:
|
|
"""Setzt den Ueberschreiben-Modus fuer den ganzen Job.
|
|
|
|
Wenn overwrite=True, werden alle Konflikte automatisch ueberschrieben.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_jobs SET overwrite_all = %s "
|
|
"WHERE id = %s", (1 if overwrite else 0, job_id)
|
|
)
|
|
# Wenn aktiviert: Alle bestehenden Konflikte auf overwrite setzen
|
|
if overwrite:
|
|
await cur.execute(
|
|
"UPDATE import_items SET user_action = 'overwrite' "
|
|
"WHERE import_job_id = %s AND status = 'conflict' "
|
|
"AND user_action IS NULL",
|
|
(job_id,)
|
|
)
|
|
return {"ok": True, "overwrite_all": overwrite}
|
|
except Exception as e:
|
|
logging.error(f"Overwrite-Modus setzen fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def update_item(self, item_id: int, **kwargs) -> bool:
|
|
"""Manuelle Korrektur eines Items"""
|
|
if not self._db_pool:
|
|
return False
|
|
allowed = {
|
|
'detected_series', 'detected_season', 'detected_episode',
|
|
'detected_episode_end',
|
|
'tvdb_series_id', 'tvdb_series_name', 'tvdb_episode_title',
|
|
'target_path', 'target_filename', 'status'
|
|
}
|
|
updates = []
|
|
params = []
|
|
for k, v in kwargs.items():
|
|
if k in allowed:
|
|
updates.append(f"{k} = %s")
|
|
params.append(v)
|
|
if not updates:
|
|
return False
|
|
params.append(item_id)
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"UPDATE import_items SET {', '.join(updates)} "
|
|
f"WHERE id = %s", params
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Import-Item aktualisieren fehlgeschlagen: {e}")
|
|
return False
|
|
|
|
async def reassign_item(self, item_id: int,
|
|
series_name: str,
|
|
season: int, episode: int,
|
|
tvdb_id: int = None) -> dict:
|
|
"""Weist einem pending-Item eine Serie/Staffel/Episode zu.
|
|
|
|
Berechnet automatisch den Zielpfad und holt ggf. TVDB-Episodentitel.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
# Item laden
|
|
await cur.execute(
|
|
"SELECT i.*, j.target_library_id, j.naming_pattern, "
|
|
"j.season_pattern FROM import_items i "
|
|
"JOIN import_jobs j ON j.id = i.import_job_id "
|
|
"WHERE i.id = %s", (item_id,)
|
|
)
|
|
item = await cur.fetchone()
|
|
if not item:
|
|
return {"error": "Item nicht gefunden"}
|
|
|
|
# Library-Pfad laden
|
|
await cur.execute(
|
|
"SELECT * FROM library_paths WHERE id = %s",
|
|
(item["target_library_id"],)
|
|
)
|
|
lib_path = await cur.fetchone()
|
|
if not lib_path:
|
|
return {"error": "Ziel-Library nicht gefunden"}
|
|
|
|
# TVDB-Name und Episodentitel holen
|
|
tvdb_name = series_name
|
|
tvdb_ep_title = ""
|
|
if tvdb_id and self.tvdb.is_configured:
|
|
# Serien-Info von TVDB holen
|
|
try:
|
|
info = await self.tvdb.get_series_info(tvdb_id)
|
|
if info and info.get("name"):
|
|
tvdb_name = info["name"]
|
|
except Exception:
|
|
pass
|
|
# Episodentitel holen
|
|
tvdb_ep_title = await self._get_episode_title(
|
|
tvdb_id, season, episode
|
|
)
|
|
|
|
# Zielpfad berechnen
|
|
ext = os.path.splitext(item["source_file"])[1].lstrip(".")
|
|
pattern = item.get("naming_pattern") or self._naming_pattern
|
|
season_pattern = item.get("season_pattern") or self._season_pattern
|
|
target_dir, target_file = self._build_target(
|
|
tvdb_name or series_name,
|
|
season, episode,
|
|
tvdb_ep_title or "",
|
|
ext,
|
|
lib_path["path"],
|
|
pattern, season_pattern
|
|
)
|
|
|
|
# In DB aktualisieren
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("""
|
|
UPDATE import_items SET
|
|
detected_series = %s,
|
|
detected_season = %s,
|
|
detected_episode = %s,
|
|
tvdb_series_id = %s,
|
|
tvdb_series_name = %s,
|
|
tvdb_episode_title = %s,
|
|
target_path = %s,
|
|
target_filename = %s,
|
|
status = 'matched'
|
|
WHERE id = %s
|
|
""", (
|
|
series_name, season, episode,
|
|
tvdb_id, tvdb_name, tvdb_ep_title,
|
|
target_dir, target_file, item_id,
|
|
))
|
|
|
|
return {
|
|
"ok": True,
|
|
"target_dir": target_dir,
|
|
"target_file": target_file,
|
|
"tvdb_name": tvdb_name,
|
|
"tvdb_ep_title": tvdb_ep_title,
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Import-Item zuordnen fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def reassign_series(self, job_id: int, detected_series: str,
|
|
tvdb_id: int = None,
|
|
series_name: str = None) -> dict:
|
|
"""Ordnet alle Items mit gleichem detected_series einer Serie zu.
|
|
|
|
Setzt TVDB-ID + Episodentitel + Zielpfade fuer alle passenden Items.
|
|
"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
# Job + Library-Pfad laden
|
|
await cur.execute(
|
|
"SELECT j.*, lp.path as lib_path "
|
|
"FROM import_jobs j "
|
|
"JOIN library_paths lp ON lp.id = j.target_library_id "
|
|
"WHERE j.id = %s", (job_id,)
|
|
)
|
|
job = await cur.fetchone()
|
|
if not job:
|
|
return {"error": "Job nicht gefunden"}
|
|
|
|
# Alle Items mit gleichem detected_series
|
|
await cur.execute(
|
|
"SELECT * FROM import_items "
|
|
"WHERE import_job_id = %s "
|
|
"AND LOWER(detected_series) = LOWER(%s) "
|
|
"AND status IN ('pending', 'matched') "
|
|
"AND detected_season IS NOT NULL "
|
|
"AND detected_episode IS NOT NULL",
|
|
(job_id, detected_series)
|
|
)
|
|
items = await cur.fetchall()
|
|
|
|
if not items:
|
|
return {"error": f"Keine Items fuer '{detected_series}'"}
|
|
|
|
# TVDB-Name holen
|
|
tvdb_name = series_name or detected_series
|
|
if tvdb_id and self.tvdb.is_configured:
|
|
try:
|
|
info = await self.tvdb.get_series_info(tvdb_id)
|
|
if info and info.get("name"):
|
|
tvdb_name = info["name"]
|
|
except Exception:
|
|
pass
|
|
|
|
# Jedes Item aktualisieren (eine Connection fuer alle)
|
|
updated = 0
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
for item in items:
|
|
season = item["detected_season"]
|
|
episode = item["detected_episode"]
|
|
ep_end = item.get("detected_episode_end")
|
|
|
|
# Episodentitel holen
|
|
tvdb_ep_title = ""
|
|
if tvdb_id and season and episode:
|
|
tvdb_ep_title = await self._get_episode_title(
|
|
int(tvdb_id), season, episode
|
|
)
|
|
|
|
# Zielpfad berechnen
|
|
ext = os.path.splitext(
|
|
item["source_file"]
|
|
)[1].lstrip(".")
|
|
pattern = (job.get("naming_pattern")
|
|
or self._naming_pattern)
|
|
season_pat = (job.get("season_pattern")
|
|
or self._season_pattern)
|
|
target_dir, target_file = self._build_target(
|
|
tvdb_name, season, episode,
|
|
tvdb_ep_title, ext,
|
|
job["lib_path"],
|
|
pattern, season_pat,
|
|
episode_end=ep_end
|
|
)
|
|
|
|
await cur.execute("""
|
|
UPDATE import_items SET
|
|
tvdb_series_id = %s,
|
|
tvdb_series_name = %s,
|
|
tvdb_episode_title = %s,
|
|
target_path = %s,
|
|
target_filename = %s,
|
|
status = 'matched',
|
|
conflict_reason = NULL
|
|
WHERE id = %s
|
|
""", (
|
|
tvdb_id, tvdb_name, tvdb_ep_title,
|
|
target_dir, target_file, item["id"],
|
|
))
|
|
updated += 1
|
|
|
|
logging.info(
|
|
f"Serien-Zuordnung: {updated} Items fuer "
|
|
f"'{detected_series}' -> '{tvdb_name}'"
|
|
)
|
|
return {"ok": True, "updated": updated, "tvdb_name": tvdb_name}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Serien-Zuordnung fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def skip_series(self, job_id: int,
|
|
detected_series: str) -> dict:
|
|
"""Ueberspringt alle Items mit gleichem detected_series."""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_items SET status = 'skipped', "
|
|
"conflict_reason = 'Serie uebersprungen' "
|
|
"WHERE import_job_id = %s "
|
|
"AND LOWER(detected_series) = LOWER(%s) "
|
|
"AND status IN ('pending', 'pending_series', "
|
|
"'matched')",
|
|
(job_id, detected_series)
|
|
)
|
|
skipped = cur.rowcount
|
|
logging.info(
|
|
f"Serie uebersprungen: {skipped} Items "
|
|
f"fuer '{detected_series}'"
|
|
)
|
|
return {"ok": True, "skipped": skipped}
|
|
except Exception as e:
|
|
logging.error(f"Serie ueberspringen fehlgeschlagen: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def skip_item(self, item_id: int) -> bool:
|
|
"""Markiert ein Item als uebersprungen"""
|
|
if not self._db_pool:
|
|
return False
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
"UPDATE import_items SET status = 'skipped', "
|
|
"conflict_reason = 'Manuell uebersprungen' "
|
|
"WHERE id = %s", (item_id,)
|
|
)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
async def get_all_jobs(self) -> list:
|
|
"""Liste aller Import-Jobs (neueste zuerst)"""
|
|
if not self._db_pool:
|
|
return []
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(
|
|
"SELECT id, source_path, status, total_files, "
|
|
"processed_files, created_at FROM import_jobs "
|
|
"ORDER BY id DESC LIMIT 20"
|
|
)
|
|
jobs = await cur.fetchall()
|
|
return [self._serialize(j) for j in jobs]
|
|
except Exception as e:
|
|
logging.error(f"Import-Jobs laden fehlgeschlagen: {e}")
|
|
return []
|
|
|
|
async def get_job_status(self, job_id: int) -> dict:
|
|
"""Status eines Import-Jobs mit allen Items"""
|
|
if not self._db_pool:
|
|
return {"error": "Keine DB-Verbindung"}
|
|
try:
|
|
async with self._db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(
|
|
"SELECT * FROM import_jobs WHERE id = %s", (job_id,)
|
|
)
|
|
job = await cur.fetchone()
|
|
if not job:
|
|
return {"error": "Job nicht gefunden"}
|
|
|
|
await cur.execute(
|
|
"SELECT * FROM import_items "
|
|
"WHERE import_job_id = %s ORDER BY source_file",
|
|
(job_id,)
|
|
)
|
|
items = await cur.fetchall()
|
|
|
|
# Bei abgeschlossenen Jobs: Importierte Serien-Ordner sammeln
|
|
imported_series = []
|
|
if job.get("status") in ("done", "error"):
|
|
series_folders = set()
|
|
for item in items:
|
|
if item.get("status") == "done" and item.get("target_path"):
|
|
series_folders.add(item["target_path"])
|
|
imported_series = list(series_folders)
|
|
|
|
return {
|
|
"job": self._serialize(job),
|
|
"items": [self._serialize(i) for i in items],
|
|
"imported_series": imported_series,
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def _serialize(row: dict) -> dict:
|
|
"""Dict JSON-kompatibel machen"""
|
|
result = {}
|
|
for k, v in row.items():
|
|
if hasattr(v, "isoformat"):
|
|
result[k] = str(v)
|
|
else:
|
|
result[k] = v
|
|
return result
|
|
|
|
@staticmethod
|
|
def _fmt_size(b: int) -> str:
|
|
"""Bytes menschenlesbar"""
|
|
for u in ("B", "KiB", "MiB", "GiB"):
|
|
if b < 1024:
|
|
return f"{b:.1f} {u}"
|
|
b /= 1024
|
|
return f"{b:.1f} TiB"
|