"""Datenbank-Modelle - Getrennte Bereiche: Mail-Abruf und Datei-Sortierung""" from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, Text, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime from ..config import DATABASE_URL engine = create_engine(DATABASE_URL, echo=False) SessionLocal = sessionmaker(bind=engine) Base = declarative_base() # ============ BEREICH 1: Mail-Abruf ============ class Postfach(Base): """IMAP-Postfach Konfiguration""" __tablename__ = "postfaecher" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) # IMAP imap_server = Column(String(255), nullable=False) imap_port = Column(Integer, default=993) email = Column(String(255), nullable=False) passwort = Column(String(255), nullable=False) ordner = Column(String(100), default="INBOX") alle_ordner = Column(Boolean, default=False) # Alle IMAP-Ordner durchsuchen nur_ungelesen = Column(Boolean, default=False) # Nur ungelesene Mails (False = alle) ab_datum = Column(DateTime, nullable=True) # Nur Mails ab diesem Datum # Ziel ziel_ordner = Column(String(500), nullable=False) # Filter erlaubte_typen = Column(JSON, default=lambda: [".pdf"]) max_groesse_mb = Column(Integer, default=25) # Status aktiv = Column(Boolean, default=True) letzter_abruf = Column(DateTime) letzte_anzahl = Column(Integer, default=0) # ============ BEREICH 2: Datei-Sortierung ============ class QuellOrdner(Base): """Ordner der nach Dateien gescannt wird""" __tablename__ = "quell_ordner" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) pfad = Column(String(500), nullable=False) ziel_ordner = Column(String(500), nullable=False) rekursiv = Column(Boolean, default=True) # Unterordner einschließen dateitypen = Column(JSON, default=lambda: [".pdf", ".jpg", ".jpeg", ".png", ".tiff"]) aktiv = Column(Boolean, default=True) class SortierRegel(Base): """Regeln für Datei-Erkennung und Benennung""" __tablename__ = "sortier_regeln" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) prioritaet = Column(Integer, default=100) aktiv = Column(Boolean, default=True) # Erkennungsmuster muster = Column(JSON, default=dict) # Extraktion extraktion = Column(JSON, default=dict) # Ausgabe schema = Column(String(500), default="{datum} - Dokument.pdf") unterordner = Column(String(100)) # Optional: Unterordner im Ziel class VerarbeiteteMail(Base): """Tracking welche Mails bereits verarbeitet wurden""" __tablename__ = "verarbeitete_mails" id = Column(Integer, primary_key=True) postfach_id = Column(Integer, nullable=False) message_id = Column(String(500), nullable=False) # Email Message-ID Header ordner = Column(String(200)) # IMAP Ordner betreff = Column(String(500)) absender = Column(String(255)) anzahl_attachments = Column(Integer, default=0) verarbeitet_am = Column(DateTime, default=datetime.utcnow) class VerarbeiteteDatei(Base): """Log verarbeiteter Dateien""" __tablename__ = "verarbeitete_dateien" id = Column(Integer, primary_key=True) original_pfad = Column(String(1000)) original_name = Column(String(500)) neuer_pfad = Column(String(1000)) neuer_name = Column(String(500)) ist_zugferd = Column(Boolean, default=False) ocr_durchgefuehrt = Column(Boolean, default=False) status = Column(String(50)) # sortiert, zugferd, fehler, keine_regel fehler = Column(Text) extrahierte_daten = Column(JSON) verarbeitet_am = Column(DateTime, default=datetime.utcnow) def migrate_db(): """Fügt fehlende Spalten hinzu ohne Daten zu löschen""" from sqlalchemy import inspect, text inspector = inspect(engine) # Migrations-Definitionen: {tabelle: {spalte: sql_typ}} migrations = { "postfaecher": { "alle_ordner": "BOOLEAN DEFAULT 0", "nur_ungelesen": "BOOLEAN DEFAULT 0", "ab_datum": "DATETIME" }, "quell_ordner": { "rekursiv": "BOOLEAN DEFAULT 1", "dateitypen": "JSON" } } with engine.connect() as conn: for table, columns in migrations.items(): if table not in inspector.get_table_names(): continue existing = [col["name"] for col in inspector.get_columns(table)] for col_name, col_type in columns.items(): if col_name not in existing: try: conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_type}")) conn.commit() print(f"Migration: {table}.{col_name} hinzugefügt") except Exception as e: print(f"Migration übersprungen: {table}.{col_name} - {e}") def init_db(): """Datenbank initialisieren""" Base.metadata.create_all(engine) migrate_db() def get_db(): """Database Session Generator""" db = SessionLocal() try: yield db finally: db.close()