docker.dateiverwaltung/backend/app/models/database.py

161 lines
5.1 KiB
Python

"""Datenbank-Modelle - Getrennte Bereiche: Mail-Abruf und Datei-Sortierung"""
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, Text, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from ..config import DATABASE_URL
engine = create_engine(DATABASE_URL, echo=False)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
# ============ BEREICH 1: Mail-Abruf ============
class Postfach(Base):
"""IMAP-Postfach Konfiguration"""
__tablename__ = "postfaecher"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
# IMAP
imap_server = Column(String(255), nullable=False)
imap_port = Column(Integer, default=993)
email = Column(String(255), nullable=False)
passwort = Column(String(255), nullable=False)
ordner = Column(String(100), default="INBOX")
alle_ordner = Column(Boolean, default=False) # Alle IMAP-Ordner durchsuchen
nur_ungelesen = Column(Boolean, default=False) # Nur ungelesene Mails (False = alle)
# Ziel
ziel_ordner = Column(String(500), nullable=False)
# Filter
erlaubte_typen = Column(JSON, default=lambda: [".pdf"])
max_groesse_mb = Column(Integer, default=25)
# Status
aktiv = Column(Boolean, default=True)
letzter_abruf = Column(DateTime)
letzte_anzahl = Column(Integer, default=0)
# ============ BEREICH 2: Datei-Sortierung ============
class QuellOrdner(Base):
"""Ordner der nach Dateien gescannt wird"""
__tablename__ = "quell_ordner"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
pfad = Column(String(500), nullable=False)
ziel_ordner = Column(String(500), nullable=False)
rekursiv = Column(Boolean, default=True) # Unterordner einschließen
dateitypen = Column(JSON, default=lambda: [".pdf", ".jpg", ".jpeg", ".png", ".tiff"])
aktiv = Column(Boolean, default=True)
class SortierRegel(Base):
"""Regeln für Datei-Erkennung und Benennung"""
__tablename__ = "sortier_regeln"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
prioritaet = Column(Integer, default=100)
aktiv = Column(Boolean, default=True)
# Erkennungsmuster
muster = Column(JSON, default=dict)
# Extraktion
extraktion = Column(JSON, default=dict)
# Ausgabe
schema = Column(String(500), default="{datum} - Dokument.pdf")
unterordner = Column(String(100)) # Optional: Unterordner im Ziel
class VerarbeiteteMail(Base):
"""Tracking welche Mails bereits verarbeitet wurden"""
__tablename__ = "verarbeitete_mails"
id = Column(Integer, primary_key=True)
postfach_id = Column(Integer, nullable=False)
message_id = Column(String(500), nullable=False) # Email Message-ID Header
ordner = Column(String(200)) # IMAP Ordner
betreff = Column(String(500))
absender = Column(String(255))
anzahl_attachments = Column(Integer, default=0)
verarbeitet_am = Column(DateTime, default=datetime.utcnow)
class VerarbeiteteDatei(Base):
"""Log verarbeiteter Dateien"""
__tablename__ = "verarbeitete_dateien"
id = Column(Integer, primary_key=True)
original_pfad = Column(String(1000))
original_name = Column(String(500))
neuer_pfad = Column(String(1000))
neuer_name = Column(String(500))
ist_zugferd = Column(Boolean, default=False)
ocr_durchgefuehrt = Column(Boolean, default=False)
status = Column(String(50)) # sortiert, zugferd, fehler, keine_regel
fehler = Column(Text)
extrahierte_daten = Column(JSON)
verarbeitet_am = Column(DateTime, default=datetime.utcnow)
def migrate_db():
"""Fügt fehlende Spalten hinzu ohne Daten zu löschen"""
from sqlalchemy import inspect, text
inspector = inspect(engine)
# Migrations-Definitionen: {tabelle: {spalte: sql_typ}}
migrations = {
"postfaecher": {
"alle_ordner": "BOOLEAN DEFAULT 0",
"nur_ungelesen": "BOOLEAN DEFAULT 0"
},
"quell_ordner": {
"rekursiv": "BOOLEAN DEFAULT 1",
"dateitypen": "JSON"
}
}
with engine.connect() as conn:
for table, columns in migrations.items():
if table not in inspector.get_table_names():
continue
existing = [col["name"] for col in inspector.get_columns(table)]
for col_name, col_type in columns.items():
if col_name not in existing:
try:
conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_type}"))
conn.commit()
print(f"Migration: {table}.{col_name} hinzugefügt")
except Exception as e:
print(f"Migration übersprungen: {table}.{col_name} - {e}")
def init_db():
"""Datenbank initialisieren"""
Base.metadata.create_all(engine)
migrate_db()
def get_db():
"""Database Session Generator"""
db = SessionLocal()
try:
yield db
finally:
db.close()