docker.videokonverter/video-konverter/app/services/auth.py
data 956b7b9ac8 feat: VideoKonverter v5.6 - Player-Overlay, Immersive Fullscreen, Audio-Normalisierung
Tizen TV: Transparenter iframe-Overlay statt opacity:0 - Player-Controls
(Progress-Bar, Buttons, Popup-Menue) jetzt sichtbar ueber dem AVPlay-Video.
CSS-Klasse "vknative-playing" macht Hintergruende transparent, AVPlay-Video
scheint durch den iframe hindurch.

Android App: Immersive Sticky Fullscreen mit WindowInsetsControllerCompat.
Status- und Navigationsleiste komplett versteckt, per Swipe vom Rand
temporaer einblendbar.

Audio-Normalisierung (3 Stufen):
- Server-seitig: EBU R128 loudnorm (I=-14 LUFS) im HLS-Transcoding
- Server-seitig: dynaudnorm (dynamische Szenen-Anpassung) im HLS-Transcoding
- Client-seitig: DynamicsCompressorNode im Browser-Player
Alle Optionen konfigurierbar: loudnorm/dynaudnorm im TV Admin-Center,
Audio-Kompressor pro Client in den Einstellungen.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 21:07:04 +01:00

1173 lines
50 KiB
Python

"""Authentifizierung und User-Verwaltung fuer die TV-App"""
import json
import logging
import secrets
import time
from typing import Optional
import aiomysql
import bcrypt
class AuthService:
"""Verwaltet TV-User, Sessions und Berechtigungen"""
def __init__(self, db_pool_getter):
self._get_pool = db_pool_getter
async def init_db(self) -> None:
"""Erstellt DB-Tabellen fuer TV-Auth und migriert bestehende"""
pool = await self._get_pool()
if not pool:
logging.error("Auth: Kein DB-Pool verfuegbar")
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# === Bestehende Tabellen ===
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
display_name VARCHAR(128),
is_admin TINYINT DEFAULT 0,
can_view_series TINYINT DEFAULT 1,
can_view_movies TINYINT DEFAULT 1,
allowed_paths JSON DEFAULT NULL,
last_login TIMESTAMP NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_sessions (
id VARCHAR(64) PRIMARY KEY,
user_id INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_agent VARCHAR(512),
FOREIGN KEY (user_id) REFERENCES tv_users(id) ON DELETE CASCADE,
INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_watch_progress (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
video_id INT NOT NULL,
position_sec DOUBLE DEFAULT 0,
duration_sec DOUBLE DEFAULT 0,
completed TINYINT DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX idx_user_video (user_id, video_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# === Neue Tabellen (v4.0) ===
# Client-Einstellungen (pro Geraet/Browser)
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_clients (
id VARCHAR(64) PRIMARY KEY,
name VARCHAR(128) DEFAULT NULL,
sound_mode ENUM('stereo','surround','original')
DEFAULT 'stereo',
stream_quality ENUM('uhd','hd','sd','low')
DEFAULT 'hd',
audio_compressor BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Spalte audio_compressor hinzufuegen (Migration fuer bestehende DBs)
await cur.execute("""
ALTER TABLE tv_clients
ADD COLUMN IF NOT EXISTS audio_compressor BOOLEAN DEFAULT FALSE
""")
# Merkliste (Watchlist)
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_watchlist (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
series_id INT NULL,
movie_id INT NULL,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE INDEX idx_user_series (user_id, series_id),
UNIQUE INDEX idx_user_movie (user_id, movie_id),
FOREIGN KEY (user_id) REFERENCES tv_users(id)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Manueller Watch-Status
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_watch_status (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
video_id INT NULL,
series_id INT NULL,
season_key VARCHAR(64) NULL,
status ENUM('unwatched','watching','watched')
DEFAULT 'unwatched',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user (user_id),
UNIQUE INDEX idx_user_video (user_id, video_id),
UNIQUE INDEX idx_user_series (user_id, series_id),
UNIQUE INDEX idx_user_season (user_id, season_key),
FOREIGN KEY (user_id) REFERENCES tv_users(id)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Episoden-Thumbnails Cache
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_episode_thumbnails (
video_id INT PRIMARY KEY,
thumbnail_path VARCHAR(1024) NOT NULL,
source ENUM('tvdb','ffmpeg') DEFAULT 'ffmpeg',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Such-History (pro User)
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_search_history (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
query VARCHAR(256) NOT NULL,
searched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user (user_id),
INDEX idx_query (query(64)),
FOREIGN KEY (user_id) REFERENCES tv_users(id)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Bewertungen (pro User, fuer Serien und Filme)
await cur.execute("""
CREATE TABLE IF NOT EXISTS tv_ratings (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
series_id INT NULL,
movie_id INT NULL,
rating TINYINT NOT NULL DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX idx_user_series (user_id, series_id),
UNIQUE INDEX idx_user_movie (user_id, movie_id),
FOREIGN KEY (user_id) REFERENCES tv_users(id)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# === Migration: Neue Spalten zu bestehenden Tabellen ===
await self._migrate_columns(cur)
# Standard-Admin erstellen falls keine User existieren
await self._ensure_default_admin()
logging.info("TV-Auth: DB-Tabellen initialisiert")
async def _migrate_columns(self, cur) -> None:
"""Fuegt neue Spalten zu bestehenden Tabellen hinzu (idempotent)"""
# Hilfsfunktion: Spalte hinzufuegen falls nicht vorhanden
async def add_column(table: str, column: str, definition: str):
await cur.execute(
"SELECT COUNT(*) FROM information_schema.COLUMNS "
"WHERE TABLE_SCHEMA = DATABASE() "
"AND TABLE_NAME = %s AND COLUMN_NAME = %s",
(table, column)
)
row = await cur.fetchone()
if row[0] == 0:
await cur.execute(
f"ALTER TABLE {table} ADD COLUMN {column} {definition}"
)
logging.info(f"TV-Auth: Spalte {table}.{column} hinzugefuegt")
# tv_users: User-Einstellungen
await add_column("tv_users", "preferred_audio_lang",
"VARCHAR(8) DEFAULT 'deu'")
await add_column("tv_users", "preferred_subtitle_lang",
"VARCHAR(8) DEFAULT NULL")
await add_column("tv_users", "subtitles_enabled",
"TINYINT DEFAULT 0")
await add_column("tv_users", "ui_lang",
"VARCHAR(8) DEFAULT 'de'")
await add_column("tv_users", "series_view",
"VARCHAR(16) DEFAULT 'grid'")
await add_column("tv_users", "movies_view",
"VARCHAR(16) DEFAULT 'grid'")
await add_column("tv_users", "avatar_color",
"VARCHAR(7) DEFAULT '#64b5f6'")
# Auto-Play Einstellungen
await add_column("tv_users", "autoplay_enabled",
"TINYINT DEFAULT 1")
await add_column("tv_users", "autoplay_countdown_sec",
"INT DEFAULT 10")
await add_column("tv_users", "autoplay_max_episodes",
"INT DEFAULT 0")
# tv_sessions: Client-Referenz und permanente Sessions
await add_column("tv_sessions", "client_id",
"VARCHAR(64) DEFAULT NULL")
await add_column("tv_sessions", "expires_at",
"TIMESTAMP NULL DEFAULT NULL")
# tvdb_episode_cache: Beschreibung und Bild-URL
await add_column("tvdb_episode_cache", "overview",
"TEXT DEFAULT NULL")
await add_column("tvdb_episode_cache", "image_url",
"VARCHAR(1024) DEFAULT NULL")
# tv_users: Theme
await add_column("tv_users", "theme",
"VARCHAR(16) DEFAULT 'dark'")
# tv_users: Technische Metadaten in Serien-Detail anzeigen
await add_column("tv_users", "show_tech_info",
"TINYINT DEFAULT 0")
# tv_users: Startseiten-Rubriken konfigurierbar
await add_column("tv_users", "home_show_continue",
"TINYINT DEFAULT 1")
await add_column("tv_users", "home_show_new",
"TINYINT DEFAULT 1")
await add_column("tv_users", "home_hide_watched",
"TINYINT DEFAULT 1")
await add_column("tv_users", "home_show_watched",
"TINYINT DEFAULT 1")
# library_series: TVDB-Score (externe Bewertung 0-100)
await add_column("library_series", "tvdb_score",
"FLOAT DEFAULT NULL")
# library_movies: TVDB-Score (externe Bewertung 0-100)
await add_column("library_movies", "tvdb_score",
"FLOAT DEFAULT NULL")
async def _ensure_default_admin(self) -> None:
"""Erstellt admin/admin falls keine User existieren"""
pool = await self._get_pool()
if not pool:
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT COUNT(*) FROM tv_users")
row = await cur.fetchone()
if row[0] == 0:
await self.create_user(
"admin", "admin",
display_name="Administrator",
is_admin=True
)
logging.info("TV-Auth: Standard-Admin erstellt (admin/admin)")
# --- User-CRUD ---
async def create_user(self, username: str, password: str,
display_name: str = None, is_admin: bool = False,
can_view_series: bool = True,
can_view_movies: bool = True,
show_tech_info: bool = False,
allowed_paths: list = None) -> Optional[int]:
"""Erstellt neuen User, gibt ID zurueck"""
pw_hash = bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
paths_json = json.dumps(allowed_paths) if allowed_paths else None
pool = await self._get_pool()
if not pool:
return None
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
INSERT INTO tv_users
(username, password_hash, display_name, is_admin,
can_view_series, can_view_movies, show_tech_info,
allowed_paths)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (username, pw_hash, display_name, int(is_admin),
int(can_view_series), int(can_view_movies),
int(show_tech_info), paths_json))
return cur.lastrowid
except Exception as e:
logging.error(f"TV-Auth: User erstellen fehlgeschlagen: {e}")
return None
async def update_user(self, user_id: int, **kwargs) -> bool:
"""Aktualisiert User-Felder (password, display_name, Rechte)"""
pool = await self._get_pool()
if not pool:
return False
updates = []
values = []
if "password" in kwargs and kwargs["password"]:
pw_hash = bcrypt.hashpw(
kwargs["password"].encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
updates.append("password_hash = %s")
values.append(pw_hash)
for field in ("display_name", "is_admin",
"can_view_series", "can_view_movies",
"show_tech_info"):
if field in kwargs:
updates.append(f"{field} = %s")
val = kwargs[field]
if isinstance(val, bool):
val = int(val)
values.append(val)
if "allowed_paths" in kwargs:
updates.append("allowed_paths = %s")
ap = kwargs["allowed_paths"]
values.append(json.dumps(ap) if ap else None)
if not updates:
return False
values.append(user_id)
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"UPDATE tv_users SET {', '.join(updates)} WHERE id = %s",
tuple(values)
)
return True
except Exception as e:
logging.error(f"TV-Auth: User aktualisieren fehlgeschlagen: {e}")
return False
async def delete_user(self, user_id: int) -> bool:
"""Loescht User und alle Sessions"""
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"DELETE FROM tv_users WHERE id = %s", (user_id,)
)
return cur.rowcount > 0
except Exception as e:
logging.error(f"TV-Auth: User loeschen fehlgeschlagen: {e}")
return False
async def list_users(self) -> list[dict]:
"""Gibt alle User zurueck (ohne Passwort-Hash)"""
pool = await self._get_pool()
if not pool:
return []
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT id, username, display_name, is_admin,
can_view_series, can_view_movies, show_tech_info,
allowed_paths, last_login, created_at
FROM tv_users ORDER BY id
""")
rows = await cur.fetchall()
for row in rows:
# JSON-Feld parsen
if row.get("allowed_paths") and isinstance(
row["allowed_paths"], str):
row["allowed_paths"] = json.loads(row["allowed_paths"])
# Timestamps als String
for k in ("last_login", "created_at"):
if row.get(k) and hasattr(row[k], "isoformat"):
row[k] = str(row[k])
return rows
async def get_user(self, user_id: int) -> Optional[dict]:
"""Einzelnen User laden"""
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT id, username, display_name, is_admin,
can_view_series, can_view_movies, allowed_paths,
last_login, created_at
FROM tv_users WHERE id = %s
""", (user_id,))
row = await cur.fetchone()
if row and row.get("allowed_paths") and isinstance(
row["allowed_paths"], str):
row["allowed_paths"] = json.loads(row["allowed_paths"])
return row
# --- Login / Sessions ---
async def verify_login(self, username: str, password: str) -> Optional[dict]:
"""Prueft Credentials, gibt User-Dict zurueck oder None"""
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tv_users WHERE username = %s",
(username,)
)
user = await cur.fetchone()
if not user:
return None
if not bcrypt.checkpw(
password.encode("utf-8"),
user["password_hash"].encode("utf-8")
):
return None
# last_login aktualisieren
await cur.execute(
"UPDATE tv_users SET last_login = NOW() WHERE id = %s",
(user["id"],)
)
del user["password_hash"]
return user
async def create_session(self, user_id: int,
user_agent: str = "",
client_id: str = "",
persistent: bool = False) -> str:
"""Erstellt Session, gibt Token zurueck.
persistent=True -> Session laeuft nie ab (expires_at=NULL)"""
session_id = secrets.token_urlsafe(48)
pool = await self._get_pool()
if not pool:
return ""
# Nicht-persistente Sessions laufen nach 30 Tagen ab
expires = None if persistent else "DATE_ADD(NOW(), INTERVAL 30 DAY)"
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if persistent:
await cur.execute("""
INSERT INTO tv_sessions
(id, user_id, user_agent, client_id, expires_at)
VALUES (%s, %s, %s, %s, NULL)
""", (session_id, user_id,
user_agent[:512] if user_agent else "",
client_id or None))
else:
await cur.execute("""
INSERT INTO tv_sessions
(id, user_id, user_agent, client_id, expires_at)
VALUES (%s, %s, %s, %s,
DATE_ADD(NOW(), INTERVAL 30 DAY))
""", (session_id, user_id,
user_agent[:512] if user_agent else "",
client_id or None))
return session_id
async def validate_session(self, session_id: str) -> Optional[dict]:
"""Prueft Session, gibt User-Dict mit Einstellungen zurueck oder None.
Beruecksichtigt expires_at (NULL = permanent, sonst Ablauf-Datum)."""
if not session_id:
return None
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT u.id, u.username, u.display_name, u.is_admin,
u.can_view_series, u.can_view_movies,
u.allowed_paths,
u.preferred_audio_lang, u.preferred_subtitle_lang,
u.subtitles_enabled, u.ui_lang,
u.series_view, u.movies_view, u.avatar_color,
u.autoplay_enabled, u.autoplay_countdown_sec,
u.autoplay_max_episodes, u.theme,
u.home_show_continue, u.home_show_new,
u.home_hide_watched, u.home_show_watched,
s.client_id
FROM tv_sessions s
JOIN tv_users u ON s.user_id = u.id
WHERE s.id = %s
AND (s.expires_at IS NULL OR s.expires_at > NOW())
""", (session_id,))
user = await cur.fetchone()
if user:
# Session-Aktivitaet aktualisieren
await cur.execute(
"UPDATE tv_sessions SET last_active = NOW() "
"WHERE id = %s", (session_id,)
)
if user.get("allowed_paths") and isinstance(
user["allowed_paths"], str):
user["allowed_paths"] = json.loads(
user["allowed_paths"])
return user
async def delete_session(self, session_id: str) -> None:
"""Logout: Session loeschen"""
pool = await self._get_pool()
if not pool:
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"DELETE FROM tv_sessions WHERE id = %s", (session_id,)
)
async def cleanup_old_sessions(self) -> int:
"""Loescht abgelaufene Sessions (expires_at abgelaufen).
Persistente Sessions (expires_at IS NULL) werden nie geloescht."""
pool = await self._get_pool()
if not pool:
return 0
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"DELETE FROM tv_sessions "
"WHERE expires_at IS NOT NULL AND expires_at < NOW()"
)
return cur.rowcount
# --- Client-Verwaltung (pro Geraet) ---
async def get_or_create_client(self, client_id: str = None) -> str:
"""Gibt bestehende oder neue Client-ID zurueck"""
pool = await self._get_pool()
if not pool:
return ""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if client_id:
await cur.execute(
"SELECT id FROM tv_clients WHERE id = %s",
(client_id,))
if await cur.fetchone():
await cur.execute(
"UPDATE tv_clients SET last_active = NOW() "
"WHERE id = %s", (client_id,))
return client_id
# Neuen Client erstellen
new_id = secrets.token_urlsafe(32)
await cur.execute(
"INSERT INTO tv_clients (id) VALUES (%s)",
(new_id,))
return new_id
async def get_client_settings(self, client_id: str) -> Optional[dict]:
"""Liest Client-Einstellungen"""
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tv_clients WHERE id = %s",
(client_id,))
return await cur.fetchone()
async def update_client_settings(self, client_id: str,
**kwargs) -> bool:
"""Aktualisiert Client-Einstellungen (name, sound_mode, stream_quality)"""
pool = await self._get_pool()
if not pool:
return False
allowed = {"name", "sound_mode", "stream_quality", "audio_compressor"}
updates = []
values = []
for key, val in kwargs.items():
if key in allowed and val is not None:
updates.append(f"{key} = %s")
values.append(val)
if not updates:
return False
values.append(client_id)
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"UPDATE tv_clients SET {', '.join(updates)} "
"WHERE id = %s", tuple(values))
return True
except Exception as e:
logging.error(f"TV-Auth: Client-Settings fehlgeschlagen: {e}")
return False
# --- Multi-User: Profile auf dem selben Geraet ---
async def get_client_profiles(self, client_id: str) -> list[dict]:
"""Alle eingeloggten User auf einem Client (fuer Quick-Switch)"""
pool = await self._get_pool()
if not pool:
return []
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT s.id as session_id, u.id as user_id,
u.username, u.display_name, u.avatar_color
FROM tv_sessions s
JOIN tv_users u ON s.user_id = u.id
WHERE s.client_id = %s
AND (s.expires_at IS NULL OR s.expires_at > NOW())
ORDER BY s.last_active DESC
""", (client_id,))
return await cur.fetchall()
async def get_all_users(self) -> list[dict]:
"""Alle User laden (fuer Profilauswahl)"""
pool = await self._get_pool()
if not pool:
return []
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT id, username, display_name, avatar_color
FROM tv_users
ORDER BY id
""")
return await cur.fetchall()
# --- User-Einstellungen ---
async def update_user_settings(self, user_id: int,
**kwargs) -> bool:
"""Aktualisiert User-Einstellungen (Sprache, Ansichten, Auto-Play)"""
pool = await self._get_pool()
if not pool:
return False
allowed = {
"preferred_audio_lang", "preferred_subtitle_lang",
"subtitles_enabled", "ui_lang",
"series_view", "movies_view", "avatar_color",
"autoplay_enabled", "autoplay_countdown_sec",
"autoplay_max_episodes", "display_name", "theme",
"home_show_continue", "home_show_new",
"home_hide_watched", "home_show_watched",
}
updates = []
values = []
for key, val in kwargs.items():
if key in allowed:
updates.append(f"{key} = %s")
if isinstance(val, bool):
val = int(val)
values.append(val)
if not updates:
return False
values.append(user_id)
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"UPDATE tv_users SET {', '.join(updates)} "
"WHERE id = %s", tuple(values))
return True
except Exception as e:
logging.error(f"TV-Auth: Einstellungen fehlgeschlagen: {e}")
return False
# --- Watchlist (Merkliste) ---
async def add_to_watchlist(self, user_id: int,
series_id: int = None,
movie_id: int = None) -> bool:
"""Fuegt Serie oder Film zur Merkliste hinzu"""
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
INSERT IGNORE INTO tv_watchlist
(user_id, series_id, movie_id)
VALUES (%s, %s, %s)
""", (user_id, series_id, movie_id))
return cur.rowcount > 0
except Exception as e:
logging.error(f"TV-Auth: Watchlist hinzufuegen fehlgeschlagen: {e}")
return False
async def remove_from_watchlist(self, user_id: int,
series_id: int = None,
movie_id: int = None) -> bool:
"""Entfernt Serie oder Film von der Merkliste"""
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if series_id:
await cur.execute(
"DELETE FROM tv_watchlist "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
elif movie_id:
await cur.execute(
"DELETE FROM tv_watchlist "
"WHERE user_id = %s AND movie_id = %s",
(user_id, movie_id))
return cur.rowcount > 0
except Exception as e:
logging.error(f"TV-Auth: Watchlist entfernen fehlgeschlagen: {e}")
return False
async def toggle_watchlist(self, user_id: int,
series_id: int = None,
movie_id: int = None) -> bool:
"""Toggle: Hinzufuegen wenn nicht vorhanden, entfernen wenn schon drin.
Gibt True zurueck wenn jetzt in der Liste, False wenn entfernt."""
pool = await self._get_pool()
if not pool:
return False
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# Pruefen ob schon in Liste
if series_id:
await cur.execute(
"SELECT id FROM tv_watchlist "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
else:
await cur.execute(
"SELECT id FROM tv_watchlist "
"WHERE user_id = %s AND movie_id = %s",
(user_id, movie_id))
exists = await cur.fetchone()
if exists:
await cur.execute(
"DELETE FROM tv_watchlist WHERE id = %s",
(exists[0],))
return False # Entfernt
else:
await cur.execute(
"INSERT INTO tv_watchlist "
"(user_id, series_id, movie_id) VALUES (%s, %s, %s)",
(user_id, series_id, movie_id))
return True # Hinzugefuegt
async def get_watchlist(self, user_id: int) -> dict:
"""Gibt Merkliste zurueck (Serien + Filme)"""
pool = await self._get_pool()
if not pool:
return {"series": [], "movies": []}
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
# Serien
await cur.execute("""
SELECT w.id as watchlist_id, w.added_at,
s.id, s.title, s.folder_name, s.poster_url,
s.genres, s.overview
FROM tv_watchlist w
JOIN library_series s ON w.series_id = s.id
WHERE w.user_id = %s AND w.series_id IS NOT NULL
ORDER BY w.added_at DESC
""", (user_id,))
series = await cur.fetchall()
# Filme
await cur.execute("""
SELECT w.id as watchlist_id, w.added_at,
m.id, m.title, m.folder_name, m.poster_url,
m.year, m.genres, m.overview
FROM tv_watchlist w
JOIN library_movies m ON w.movie_id = m.id
WHERE w.user_id = %s AND w.movie_id IS NOT NULL
ORDER BY w.added_at DESC
""", (user_id,))
movies = await cur.fetchall()
return {"series": series, "movies": movies}
async def is_in_watchlist(self, user_id: int,
series_id: int = None,
movie_id: int = None) -> bool:
"""Prueft ob Serie/Film in der Merkliste ist"""
pool = await self._get_pool()
if not pool:
return False
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if series_id:
await cur.execute(
"SELECT 1 FROM tv_watchlist "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
else:
await cur.execute(
"SELECT 1 FROM tv_watchlist "
"WHERE user_id = %s AND movie_id = %s",
(user_id, movie_id))
return await cur.fetchone() is not None
# --- Watch-Status (manuell gesehen/nicht gesehen) ---
async def set_watch_status(self, user_id: int, status: str,
video_id: int = None,
series_id: int = None,
season_key: str = None) -> bool:
"""Setzt manuellen Watch-Status (unwatched/watching/watched)"""
if status not in ("unwatched", "watching", "watched"):
return False
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
INSERT INTO tv_watch_status
(user_id, video_id, series_id, season_key, status)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE status = VALUES(status)
""", (user_id, video_id, series_id, season_key, status))
# Bei Staffel/Serie auch Einzel-Episoden aktualisieren
if series_id and not video_id and not season_key:
# Ganze Serie markieren
await cur.execute("""
INSERT INTO tv_watch_status
(user_id, video_id, status)
SELECT %s, v.id, %s
FROM library_videos v
WHERE v.series_id = %s
ON DUPLICATE KEY UPDATE
status = VALUES(status)
""", (user_id, status, series_id))
elif season_key:
# Ganze Staffel markieren (format: "series_id:season")
parts = season_key.split(":")
if len(parts) == 2:
sid, sn = int(parts[0]), int(parts[1])
await cur.execute("""
INSERT INTO tv_watch_status
(user_id, video_id, status)
SELECT %s, v.id, %s
FROM library_videos v
WHERE v.series_id = %s
AND v.season_number = %s
ON DUPLICATE KEY UPDATE
status = VALUES(status)
""", (user_id, status, sid, sn))
return True
except Exception as e:
logging.error(f"TV-Auth: Watch-Status fehlgeschlagen: {e}")
return False
async def get_watch_status(self, user_id: int,
video_id: int = None,
series_id: int = None) -> Optional[str]:
"""Gibt Watch-Status zurueck"""
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if video_id:
await cur.execute(
"SELECT status FROM tv_watch_status "
"WHERE user_id = %s AND video_id = %s",
(user_id, video_id))
elif series_id:
await cur.execute(
"SELECT status FROM tv_watch_status "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
else:
return None
row = await cur.fetchone()
return row[0] if row else None
# --- Such-History ---
async def save_search(self, user_id: int, query: str) -> None:
"""Speichert Suchanfrage in der History"""
if not query or len(query) < 2:
return
pool = await self._get_pool()
if not pool:
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# Duplikate vermeiden: gleiche Query aktualisieren
await cur.execute(
"DELETE FROM tv_search_history "
"WHERE user_id = %s AND query = %s",
(user_id, query))
await cur.execute(
"INSERT INTO tv_search_history (user_id, query) "
"VALUES (%s, %s)", (user_id, query))
# Max. 50 Eintraege behalten
await cur.execute("""
DELETE FROM tv_search_history
WHERE user_id = %s AND id NOT IN (
SELECT id FROM (
SELECT id FROM tv_search_history
WHERE user_id = %s
ORDER BY searched_at DESC LIMIT 50
) t
)
""", (user_id, user_id))
async def get_search_history(self, user_id: int,
limit: int = 20) -> list[str]:
"""Gibt letzte Suchanfragen zurueck"""
pool = await self._get_pool()
if not pool:
return []
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT query FROM tv_search_history "
"WHERE user_id = %s ORDER BY searched_at DESC LIMIT %s",
(user_id, limit))
rows = await cur.fetchall()
return [r[0] for r in rows]
async def get_search_suggestions(self, user_id: int,
prefix: str,
limit: int = 8) -> list[str]:
"""Autocomplete: Vorschlaege aus History + Serien/Film-Titel"""
if not prefix or len(prefix) < 1:
return []
pool = await self._get_pool()
if not pool:
return []
suggestions = []
search = f"{prefix}%"
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# Aus Such-History
await cur.execute(
"SELECT DISTINCT query FROM tv_search_history "
"WHERE user_id = %s AND query LIKE %s "
"ORDER BY searched_at DESC LIMIT %s",
(user_id, search, limit))
rows = await cur.fetchall()
suggestions.extend(r[0] for r in rows)
# Aus Serien-Titeln
remaining = limit - len(suggestions)
if remaining > 0:
await cur.execute(
"SELECT title FROM library_series "
"WHERE title LIKE %s ORDER BY title LIMIT %s",
(search, remaining))
rows = await cur.fetchall()
for r in rows:
if r[0] not in suggestions:
suggestions.append(r[0])
# Aus Film-Titeln
remaining = limit - len(suggestions)
if remaining > 0:
await cur.execute(
"SELECT title FROM library_movies "
"WHERE title LIKE %s ORDER BY title LIMIT %s",
(search, remaining))
rows = await cur.fetchall()
for r in rows:
if r[0] not in suggestions:
suggestions.append(r[0])
return suggestions[:limit]
async def clear_search_history(self, user_id: int) -> bool:
"""Loescht alle Suchanfragen eines Users"""
pool = await self._get_pool()
if not pool:
return False
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"DELETE FROM tv_search_history WHERE user_id = %s",
(user_id,))
return True
# --- Fortschritt zuruecksetzen ---
async def reset_all_progress(self, user_id: int) -> bool:
"""Setzt ALLE Fortschritte und Status eines Users zurueck"""
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"DELETE FROM tv_watch_progress WHERE user_id = %s",
(user_id,))
await cur.execute(
"DELETE FROM tv_watch_status WHERE user_id = %s",
(user_id,))
return True
except Exception as e:
logging.error(f"TV-Auth: Reset fehlgeschlagen: {e}")
return False
# --- Watch-Progress ---
async def save_progress(self, user_id: int, video_id: int,
position_sec: float,
duration_sec: float = 0) -> None:
"""Speichert Wiedergabe-Position"""
if duration_sec > 0 and position_sec / duration_sec > 0.9:
completed = 1
elif duration_sec > 0 and position_sec >= duration_sec:
completed = 1
else:
completed = 0
pool = await self._get_pool()
if not pool:
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("""
INSERT INTO tv_watch_progress
(user_id, video_id, position_sec, duration_sec, completed)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
position_sec = VALUES(position_sec),
duration_sec = VALUES(duration_sec),
completed = VALUES(completed)
""", (user_id, video_id, position_sec, duration_sec, completed))
async def get_progress(self, user_id: int,
video_id: int) -> Optional[dict]:
"""Liest Wiedergabe-Position"""
pool = await self._get_pool()
if not pool:
return None
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT position_sec, duration_sec, completed
FROM tv_watch_progress
WHERE user_id = %s AND video_id = %s
""", (user_id, video_id))
return await cur.fetchone()
async def get_continue_watching(self, user_id: int,
limit: int = 20) -> list[dict]:
"""Gibt 'Weiterschauen' Liste zurueck (nicht fertig, zuletzt gesehen)"""
pool = await self._get_pool()
if not pool:
return []
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("""
SELECT wp.video_id, wp.position_sec, wp.duration_sec,
wp.updated_at,
v.file_name, v.file_path,
v.duration_sec as video_duration,
v.width, v.height, v.video_codec,
s.id as series_id, s.title as series_title,
s.poster_url as series_poster
FROM tv_watch_progress wp
JOIN library_videos v ON wp.video_id = v.id
LEFT JOIN library_series s ON v.series_id = s.id
WHERE wp.user_id = %s AND wp.completed = 0
AND wp.position_sec > 10
ORDER BY wp.updated_at DESC
LIMIT %s
""", (user_id, limit))
rows = await cur.fetchall()
for row in rows:
if row.get("updated_at") and hasattr(
row["updated_at"], "isoformat"):
row["updated_at"] = str(row["updated_at"])
return rows
# --- Bewertungen (Ratings) ---
async def set_rating(self, user_id: int, rating: int,
series_id: int = None,
movie_id: int = None) -> bool:
"""Setzt User-Bewertung (1-5 Sterne). 0 = Bewertung loeschen."""
if rating < 0 or rating > 5:
return False
pool = await self._get_pool()
if not pool:
return False
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if rating == 0:
# Bewertung loeschen
if series_id:
await cur.execute(
"DELETE FROM tv_ratings "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
elif movie_id:
await cur.execute(
"DELETE FROM tv_ratings "
"WHERE user_id = %s AND movie_id = %s",
(user_id, movie_id))
else:
await cur.execute("""
INSERT INTO tv_ratings
(user_id, series_id, movie_id, rating)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE rating = VALUES(rating)
""", (user_id, series_id, movie_id, rating))
return True
except Exception as e:
logging.error(f"TV-Auth: Rating fehlgeschlagen: {e}")
return False
async def get_rating(self, user_id: int,
series_id: int = None,
movie_id: int = None) -> int:
"""Gibt User-Rating zurueck (0 = keine Bewertung)"""
pool = await self._get_pool()
if not pool:
return 0
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if series_id:
await cur.execute(
"SELECT rating FROM tv_ratings "
"WHERE user_id = %s AND series_id = %s",
(user_id, series_id))
elif movie_id:
await cur.execute(
"SELECT rating FROM tv_ratings "
"WHERE user_id = %s AND movie_id = %s",
(user_id, movie_id))
else:
return 0
row = await cur.fetchone()
return row[0] if row else 0
async def get_avg_rating(self, series_id: int = None,
movie_id: int = None) -> dict:
"""Gibt Durchschnittsbewertung + Anzahl zurueck"""
pool = await self._get_pool()
if not pool:
return {"avg": 0, "count": 0}
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if series_id:
await cur.execute(
"SELECT AVG(rating) as avg_r, COUNT(*) as cnt "
"FROM tv_ratings WHERE series_id = %s AND rating > 0",
(series_id,))
elif movie_id:
await cur.execute(
"SELECT AVG(rating) as avg_r, COUNT(*) as cnt "
"FROM tv_ratings WHERE movie_id = %s AND rating > 0",
(movie_id,))
else:
return {"avg": 0, "count": 0}
row = await cur.fetchone()
return {
"avg": round(float(row[0] or 0), 1),
"count": int(row[1] or 0),
}