"""Datenbank-Modelle - Getrennte Bereiche: Mail-Abruf und Datei-Sortierung""" from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, Text, JSON, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime from sqlalchemy import event from ..config import DATABASE_URL # Datenbank-Engine erstellen (SQLite oder MariaDB) is_sqlite = DATABASE_URL.startswith("sqlite") if is_sqlite: # SQLite mit WAL-Modus und Timeout für bessere Concurrency engine = create_engine( DATABASE_URL, echo=False, connect_args={"check_same_thread": False, "timeout": 30} ) # WAL-Modus aktivieren für bessere gleichzeitige Zugriffe @event.listens_for(engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA journal_mode=WAL") cursor.execute("PRAGMA busy_timeout=30000") cursor.close() else: # MariaDB/MySQL mit Retry bei Verbindungsfehlern engine = create_engine( DATABASE_URL, echo=False, pool_pre_ping=True, # Prüft Verbindung vor Nutzung pool_recycle=1800, # Recycled Verbindungen nach 30 Min pool_size=5, # Max 5 Verbindungen im Pool max_overflow=10, # Bis zu 10 zusätzliche bei Bedarf pool_timeout=30, # Timeout beim Warten auf Verbindung connect_args={ "connect_timeout": 10 # Timeout beim Verbindungsaufbau } ) SessionLocal = sessionmaker(bind=engine) def get_db_with_retry(max_retries: int = 5, retry_delay: int = 5): """ Gibt eine DB-Session zurück, mit Retry bei Verbindungsfehlern. Für Scheduler und Background-Tasks. """ import time from sqlalchemy.exc import OperationalError, DisconnectionError from sqlalchemy import text last_error = None for attempt in range(max_retries): try: db = SessionLocal() # Test-Query um Verbindung zu prüfen db.execute(text("SELECT 1")) return db except (OperationalError, DisconnectionError) as e: last_error = e if attempt < max_retries - 1: print(f"[DB] ⚠️ Verbindungsfehler (Versuch {attempt + 1}/{max_retries}): {e}", flush=True) print(f"[DB] Warte {retry_delay} Sekunden vor erneutem Versuch...", flush=True) time.sleep(retry_delay) else: print(f"[DB] ❌ Verbindung fehlgeschlagen nach {max_retries} Versuchen: {e}", flush=True) except Exception as e: print(f"[DB] ❌ Unerwarteter Fehler: {e}", flush=True) raise raise last_error Base = declarative_base() # ============ BEREICH 1: Mail-Abruf ============ class Postfach(Base): """IMAP-Postfach Konfiguration""" __tablename__ = "postfaecher" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) # IMAP imap_server = Column(String(255), nullable=False) imap_port = Column(Integer, default=993) email = Column(String(255), nullable=False) passwort = Column(String(255), nullable=False) ordner = Column(String(100), default="INBOX") alle_ordner = Column(Boolean, default=False) # Alle IMAP-Ordner durchsuchen nur_ungelesen = Column(Boolean, default=False) # Nur ungelesene Mails (False = alle) # Ziel ziel_ordner = Column(String(500), nullable=False) # Filter erlaubte_typen = Column(JSON, default=lambda: [".pdf"]) max_groesse_mb = Column(Integer, default=25) min_groesse_kb = Column(Integer, default=10) # Mindestgröße in KB (gegen Icons) ab_datum = Column(DateTime) # Nur Mails ab diesem Datum verarbeiten # Größenfilter pro Dateityp: {".pdf": {"min_kb": 10, "max_mb": 25}, ".jpg": {"min_kb": 50, "max_mb": 10}} groessen_filter = Column(JSON, default=lambda: {}) # Status aktiv = Column(Boolean, default=True) letzter_abruf = Column(DateTime) letzte_anzahl = Column(Integer, default=0) # ============ BEREICH 2: Datei-Sortierung ============ class QuellOrdner(Base): """Ordner der nach Dateien gescannt wird""" __tablename__ = "quell_ordner" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) pfad = Column(String(500), nullable=False) ziel_ordner = Column(String(500), nullable=False) rekursiv = Column(Boolean, default=True) # Unterordner einschließen dateitypen = Column(JSON, default=lambda: [".pdf", ".jpg", ".jpeg", ".png", ".tiff"]) # ZUGFeRD-Behandlung: "separieren", "regel", "normal", "ignorieren" zugferd_behandlung = Column(String(20), default="separieren") # Signierte PDFs: "normal", "separieren", "regel", "ignorieren" signiert_behandlung = Column(String(20), default="normal") aktiv = Column(Boolean, default=True) # NEU: Direkt verschieben ohne Regelprüfung direkt_verschieben = Column(Boolean, default=False) # OCR-Optionen ocr_aktivieren = Column(Boolean, default=True) # OCR für gescannte PDFs original_sichern = Column(String(500)) # Ordner für Original-Backup (vor OCR) # Status (wie bei Postfächern) letzte_verarbeitung = Column(DateTime) letzte_anzahl = Column(Integer, default=0) class SortierRegel(Base): """Regeln für Datei-Erkennung und Benennung""" __tablename__ = "sortier_regeln" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) prioritaet = Column(Integer, default=100) aktiv = Column(Boolean, default=True) # Erkennungsmuster (JSON mit keywords, text_match, text_regex, etc.) # NEU: Auch negative Muster möglich (keywords_nicht, text_not_match) muster = Column(JSON, default=dict) # Extraktion (JSON mit datum, betrag, nummer, firma, etc.) extraktion = Column(JSON, default=dict) # Ausgabe schema = Column(String(500), default="{datum} - Dokument.pdf") unterordner = Column(String(100)) # Optional: Unterordner im Ziel # NEU: Fallback-Regel (greift wenn keine andere Regel passt) ist_fallback = Column(Boolean, default=False) # Freie Ordner (zusätzlich zu den zugewiesenen Quell-Ordnern) freie_ordner = Column(JSON, default=list) # Ziel-Ordner für diese Regel (optional, überschreibt Quell-Ordner Ziel) ziel_ordner = Column(String(500)) # Nur umbenennen, nicht verschieben (Dateien bleiben im Quellordner) nur_umbenennen = Column(Boolean, default=False) class OrdnerRegel(Base): """Verknüpfung zwischen Quell-Ordnern und Sortier-Regeln""" __tablename__ = "ordner_regeln" id = Column(Integer, primary_key=True) ordner_id = Column(Integer, ForeignKey("quell_ordner.id", ondelete="CASCADE"), nullable=False) regel_id = Column(Integer, ForeignKey("sortier_regeln.id", ondelete="CASCADE"), nullable=False) class Zeitplan(Base): """Scheduler-Konfiguration für automatische Ausführung""" __tablename__ = "zeitplaene" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) aktiv = Column(Boolean, default=True) # Was wird ausgeführt typ = Column(String(50), nullable=False) # "mail_abruf", "grobsortierung", "sortierregeln", "db_backup" postfach_id = Column(Integer) # Optional: spezifisches Postfach quell_ordner_id = Column(Integer) # Optional: spezifischer Quellordner regel_id = Column(Integer) # Optional: spezifische Regel (für sortierregeln) datenbank_id = Column(Integer) # Optional: spezifische Datenbank (für db_backup) # Zeitplan-Intervall intervall = Column(String(20), nullable=False) # "stündlich", "täglich", "wöchentlich", "monatlich" stunde = Column(Integer, default=6) # Uhrzeit (0-23) minute = Column(Integer, default=0) # Minute (0-59) wochentag = Column(Integer) # 0=Montag, 6=Sonntag (für wöchentlich) monatstag = Column(Integer) # 1-28 (für monatlich) # Status letzte_ausfuehrung = Column(DateTime) naechste_ausfuehrung = Column(DateTime) letzter_status = Column(String(50)) # "erfolg", "fehler" letzte_meldung = Column(Text) erstellt_am = Column(DateTime, default=datetime.utcnow) class VerarbeiteteMail(Base): """Tracking welche Mails bereits verarbeitet wurden""" __tablename__ = "verarbeitete_mails" id = Column(Integer, primary_key=True) postfach_id = Column(Integer, nullable=False) message_id = Column(String(500), nullable=False) # Email Message-ID Header ordner = Column(String(200)) # IMAP Ordner betreff = Column(String(500)) absender = Column(String(255)) anzahl_attachments = Column(Integer, default=0) verarbeitet_am = Column(DateTime, default=datetime.utcnow) class VerarbeiteteDatei(Base): """Log verarbeiteter Dateien""" __tablename__ = "verarbeitete_dateien" id = Column(Integer, primary_key=True) original_pfad = Column(String(1000)) original_name = Column(String(500)) neuer_pfad = Column(String(1000)) neuer_name = Column(String(500)) ist_zugferd = Column(Boolean, default=False) ocr_durchgefuehrt = Column(Boolean, default=False) status = Column(String(50)) # sortiert, zugferd, fehler, keine_regel fehler = Column(Text) extrahierte_daten = Column(JSON) verarbeitet_am = Column(DateTime, default=datetime.utcnow) # ============ BEREICH 3: Datenbank-Backup ============ class DbServer(Base): """Datenbank-Server Konfiguration""" __tablename__ = "db_server" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) typ = Column(String(20), nullable=False) # "mariadb", "mysql", "postgresql" host = Column(String(255), nullable=False) port = Column(Integer, default=3306) user = Column(String(100), nullable=False) passwort = Column(String(255), nullable=False) aktiv = Column(Boolean, default=True) erstellt_am = Column(DateTime, default=datetime.utcnow) class Datenbank(Base): """Datenbank-Konfiguration für Backup""" __tablename__ = "datenbanken" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) server_id = Column(Integer, ForeignKey("db_server.id", ondelete="CASCADE"), nullable=False) database = Column(String(100), nullable=False) # Datenbank-Name backup_pfad = Column(String(500), nullable=False) # Zielordner für Backups aufbewahrung = Column(Integer, default=7) # Anzahl Backups die behalten werden format = Column(String(20), default="sql") # "sql", "sql.gz", "zip" aktiv = Column(Boolean, default=True) letztes_backup = Column(DateTime) letzte_groesse_mb = Column(Integer) erstellt_am = Column(DateTime, default=datetime.utcnow) class BackupLog(Base): """Log durchgeführter Backups""" __tablename__ = "backup_log" id = Column(Integer, primary_key=True) datenbank_id = Column(Integer, ForeignKey("datenbanken.id", ondelete="CASCADE"), nullable=False) dateiname = Column(String(500), nullable=False) groesse_bytes = Column(Integer) status = Column(String(50)) # "erfolg", "fehler" fehler = Column(Text) erstellt_am = Column(DateTime, default=datetime.utcnow) def migrate_db(): """Fügt fehlende Spalten hinzu ohne Daten zu löschen""" from sqlalchemy import inspect, text inspector = inspect(engine) # Migrations-Definitionen: {tabelle: {spalte: sql_typ}} migrations = { "postfaecher": { "alle_ordner": "BOOLEAN DEFAULT 0", "nur_ungelesen": "BOOLEAN DEFAULT 0", "min_groesse_kb": "INTEGER DEFAULT 10", "ab_datum": "DATETIME", "groessen_filter": "JSON" }, "quell_ordner": { "rekursiv": "BOOLEAN DEFAULT 1", "dateitypen": "JSON", "zugferd_behandlung": "VARCHAR(20) DEFAULT 'separieren'", "signiert_behandlung": "VARCHAR(20) DEFAULT 'normal'", "direkt_verschieben": "BOOLEAN DEFAULT 0", "ocr_aktivieren": "BOOLEAN DEFAULT 1", "original_sichern": "VARCHAR(500)", "letzte_verarbeitung": "DATETIME", "letzte_anzahl": "INTEGER DEFAULT 0" }, "sortier_regeln": { "ist_fallback": "BOOLEAN DEFAULT 0", "freie_ordner": "JSON", "ziel_ordner": "VARCHAR(500)", "nur_umbenennen": "BOOLEAN DEFAULT 0" }, "zeitplaene": { "regel_id": "INTEGER", "datenbank_id": "INTEGER" } } with engine.connect() as conn: for table, columns in migrations.items(): if table not in inspector.get_table_names(): continue existing = [col["name"] for col in inspector.get_columns(table)] for col_name, col_type in columns.items(): if col_name not in existing: try: conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_type}")) conn.commit() print(f"Migration: {table}.{col_name} hinzugefügt") except Exception as e: print(f"Migration übersprungen: {table}.{col_name} - {e}") def init_db(): """Datenbank initialisieren""" Base.metadata.create_all(engine) migrate_db() def get_db(): """Database Session Generator""" db = SessionLocal() try: yield db finally: db.close()