docker.dateiverwaltung/Source/backend/app/services/scheduler_service.py
data bb84f200d0 Fix: Feinsortierung im Scheduler + DB-Reconnect
- execute_sortierregeln verarbeitet jetzt OrdnerRegel-Zuweisungen
  (wie der Button, nicht nur freie_ordner)
- DB-Verbindung mit Retry bei Verbindungsfehlern (5 Versuche)
- SQLAlchemy 2.0 kompatibel (text() für Raw-SQL)
- Bessere Pool-Einstellungen für MariaDB

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-23 13:52:31 +01:00

1179 lines
45 KiB
Python
Executable file
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Scheduler-Service für automatische Ausführung von Aufgaben
"""
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import logging
import os
import stat
from pathlib import Path
from typing import Optional, Dict, List
from zoneinfo import ZoneInfo
from ..models.database import SessionLocal, Zeitplan, Postfach, QuellOrdner, OrdnerRegel, SortierRegel
from ..modules.mail_fetcher import MailFetcher
from ..modules.sorter import Sorter
from ..config import INBOX_DIR
logger = logging.getLogger(__name__)
def check_folder_permissions(pfad: str, name: str = "") -> Dict:
"""
Prüft Ordner-Berechtigungen und gibt detaillierte Infos zurück.
Gibt Debug-Infos auf stdout aus für Container-Logs.
"""
result = {
"pfad": pfad,
"name": name,
"existiert": False,
"lesbar": False,
"schreibbar": False,
"ist_ordner": False,
"dateien_anzahl": 0,
"fehler": None
}
prefix = f"[DEBUG] [{name}]" if name else "[DEBUG]"
try:
p = Path(pfad)
result["existiert"] = p.exists()
if not p.exists():
print(f"{prefix} ❌ Pfad existiert NICHT: {pfad}", flush=True)
result["fehler"] = "Pfad existiert nicht"
return result
result["ist_ordner"] = p.is_dir()
if not p.is_dir():
print(f"{prefix} ❌ Pfad ist KEIN Ordner: {pfad}", flush=True)
result["fehler"] = "Pfad ist kein Ordner"
return result
# Berechtigungen prüfen
result["lesbar"] = os.access(pfad, os.R_OK)
result["schreibbar"] = os.access(pfad, os.W_OK)
# Dateien zählen
try:
dateien = list(p.glob("*"))
result["dateien_anzahl"] = len([f for f in dateien if f.is_file()])
except PermissionError:
result["dateien_anzahl"] = -1
# Stat-Infos
try:
st = p.stat()
mode = stat.filemode(st.st_mode)
uid = st.st_uid
gid = st.st_gid
except Exception:
mode = "?"
uid = "?"
gid = "?"
# Status-Symbol
if result["lesbar"] and result["schreibbar"]:
status = ""
elif result["lesbar"]:
status = "⚠️ NUR LESBAR"
else:
status = "❌ KEIN ZUGRIFF"
print(f"{prefix} {status} {pfad}", flush=True)
print(f"{prefix} Rechte: {mode} | UID: {uid} | GID: {gid} | Dateien: {result['dateien_anzahl']}", flush=True)
if not result["schreibbar"]:
result["fehler"] = "Keine Schreibrechte"
except Exception as e:
print(f"{prefix} ❌ FEHLER beim Prüfen: {pfad} - {e}", flush=True)
result["fehler"] = str(e)
return result
def check_all_folders_on_startup():
"""Prüft alle konfigurierten Ordner beim Start"""
print("", flush=True)
print("=" * 60, flush=True)
print("[DEBUG] === ORDNER-BERECHTIGUNGEN PRÜFEN ===", flush=True)
print("=" * 60, flush=True)
# Prozess-Info
print(f"[DEBUG] Prozess läuft als UID: {os.getuid()}, GID: {os.getgid()}", flush=True)
# Umgebungsvariablen für PUID/PGID (Unraid-Style)
puid = os.environ.get("PUID", "nicht gesetzt")
pgid = os.environ.get("PGID", "nicht gesetzt")
print(f"[DEBUG] Umgebung: PUID={puid}, PGID={pgid}", flush=True)
# Aktueller User
try:
import pwd
import grp
user_info = pwd.getpwuid(os.getuid())
group_info = grp.getgrgid(os.getgid())
print(f"[DEBUG] User: {user_info.pw_name}, Gruppe: {group_info.gr_name}", flush=True)
except Exception:
print("[DEBUG] User/Gruppe konnte nicht ermittelt werden", flush=True)
db = SessionLocal()
try:
# Quellordner prüfen
quell_ordner = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all()
print(f"[DEBUG] Gefunden: {len(quell_ordner)} aktive Quellordner", flush=True)
print("", flush=True)
for qo in quell_ordner:
print(f"[DEBUG] --- {qo.name} ---", flush=True)
# Quellpfad prüfen
check_folder_permissions(qo.pfad, f"{qo.name}/Quelle")
# Zielpfad prüfen
check_folder_permissions(qo.ziel_ordner, f"{qo.name}/Ziel")
print("", flush=True)
# Postfächer Zielordner prüfen
postfaecher = db.query(Postfach).filter(Postfach.aktiv == True).all()
print(f"[DEBUG] Gefunden: {len(postfaecher)} aktive Postfächer", flush=True)
for pf in postfaecher:
if pf.ziel_ordner:
check_folder_permissions(pf.ziel_ordner, f"Postfach/{pf.name}")
print("=" * 60, flush=True)
print("", flush=True)
except Exception as e:
print(f"[DEBUG] ❌ Fehler bei Ordner-Prüfung: {e}", flush=True)
finally:
db.close()
# Globaler Scheduler
scheduler: Optional[BackgroundScheduler] = None
# Timezone für Scheduler (robust gegen ungültige TZ-Variablen)
def get_timezone():
"""Ermittelt eine gültige Timezone"""
tz_env = os.environ.get("TZ", "Europe/Berlin")
# Prüfen ob gültiger Timezone-String (nicht ${...} oder ähnlich)
if tz_env.startswith("$") or "/" not in tz_env:
tz_env = "Europe/Berlin"
try:
return ZoneInfo(tz_env)
except Exception:
return ZoneInfo("Europe/Berlin")
def get_scheduler() -> BackgroundScheduler:
"""Gibt den globalen Scheduler zurück"""
global scheduler
if scheduler is None:
scheduler = BackgroundScheduler(timezone=get_timezone())
return scheduler
def init_scheduler():
"""Initialisiert den Scheduler beim App-Start"""
global scheduler
scheduler = BackgroundScheduler(timezone=get_timezone())
# Ordner-Berechtigungen beim Start prüfen
check_all_folders_on_startup()
# Zeitpläne aus DB laden und Jobs erstellen
sync_zeitplaene()
scheduler.start()
logger.info("Scheduler gestartet")
# Überfällige Zeitpläne beim Start ausführen (asynchron nach 5 Sekunden)
import threading
def delayed_overdue_check():
import time
time.sleep(5) # Warte bis App vollständig gestartet
execute_overdue_zeitplaene()
thread = threading.Thread(target=delayed_overdue_check, daemon=True)
thread.start()
def execute_overdue_zeitplaene():
"""Führt alle überfälligen Zeitpläne aus"""
from datetime import datetime
print("Prüfe überfällige Zeitpläne...", flush=True)
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).filter(Zeitplan.aktiv == True).all()
now = datetime.utcnow()
print(f"Gefunden: {len(zeitplaene)} aktive Zeitpläne, aktuelle Zeit (UTC): {now}", flush=True)
for zp in zeitplaene:
print(f" Zeitplan '{zp.name}': nächste={zp.naechste_ausfuehrung}, letzte={zp.letzte_ausfuehrung}", flush=True)
# Prüfen ob überfällig (naechste_ausfuehrung liegt in der Vergangenheit)
if zp.naechste_ausfuehrung and zp.naechste_ausfuehrung < now:
print(f" -> Führe überfälligen Zeitplan aus: {zp.name}", flush=True)
try:
execute_zeitplan(zp.id)
except Exception as e:
print(f" -> Fehler bei überfälligem Zeitplan {zp.name}: {e}", flush=True)
# Oder: Noch nie ausgeführt
elif zp.letzte_ausfuehrung is None:
print(f" -> Führe noch nie ausgeführten Zeitplan aus: {zp.name}", flush=True)
try:
execute_zeitplan(zp.id)
except Exception as e:
print(f" -> Fehler bei Zeitplan {zp.name}: {e}", flush=True)
else:
print(f" -> Nicht überfällig", flush=True)
except Exception as e:
print(f"Fehler bei Zeitplan-Prüfung: {e}", flush=True)
finally:
db.close()
def shutdown_scheduler():
"""Beendet den Scheduler"""
global scheduler
if scheduler:
scheduler.shutdown()
logger.info("Scheduler beendet")
def sync_zeitplaene():
"""Synchronisiert Zeitpläne aus der DB mit dem Scheduler"""
global scheduler
if not scheduler:
return
# Alle bestehenden Jobs entfernen
scheduler.remove_all_jobs()
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).filter(Zeitplan.aktiv == True).all()
for zp in zeitplaene:
add_job_for_zeitplan(zp)
logger.info(f"Job hinzugefügt: {zp.name} ({zp.intervall})")
finally:
db.close()
def add_job_for_zeitplan(zp: Zeitplan):
"""Fügt einen Job für einen Zeitplan hinzu"""
global scheduler
if not scheduler:
return
job_id = f"zeitplan_{zp.id}"
# CronTrigger basierend auf Intervall erstellen
trigger = create_trigger(zp)
if not trigger:
return
# Job hinzufügen
scheduler.add_job(
func=execute_zeitplan,
trigger=trigger,
args=[zp.id],
id=job_id,
name=zp.name,
replace_existing=True
)
# Nächste Ausführungszeit berechnen und speichern
job = scheduler.get_job(job_id)
if job:
try:
# APScheduler 3.x: next_run_time als Attribut
# APScheduler 4.x: get_next_fire_time() oder scheduled_fire_time
next_time = getattr(job, 'next_run_time', None)
if next_time is None and hasattr(job, 'get_next_fire_time'):
next_time = job.get_next_fire_time()
if next_time:
db = SessionLocal()
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zp.id).first()
if zeitplan:
zeitplan.naechste_ausfuehrung = next_time
db.commit()
finally:
db.close()
except Exception as e:
logger.warning(f"Konnte nächste Ausführungszeit nicht ermitteln: {e}")
def create_trigger(zp: Zeitplan) -> Optional[CronTrigger]:
"""Erstellt einen CronTrigger basierend auf dem Zeitplan"""
try:
tz = get_timezone()
if zp.intervall == "stündlich":
return CronTrigger(minute=zp.minute or 0, timezone=tz)
elif zp.intervall == "täglich":
return CronTrigger(hour=zp.stunde or 6, minute=zp.minute or 0, timezone=tz)
elif zp.intervall == "wöchentlich":
return CronTrigger(
day_of_week=zp.wochentag or 0,
hour=zp.stunde or 6,
minute=zp.minute or 0,
timezone=tz
)
elif zp.intervall == "monatlich":
return CronTrigger(
day=zp.monatstag or 1,
hour=zp.stunde or 6,
minute=zp.minute or 0,
timezone=tz
)
return None
except Exception as e:
logger.error(f"Fehler beim Erstellen des Triggers: {e}")
return None
def execute_zeitplan(zeitplan_id: int):
"""Führt einen Zeitplan aus mit Retry bei DB-Verbindungsfehlern"""
from sqlalchemy.exc import OperationalError, DisconnectionError
from sqlalchemy import text
import time
max_db_retries = 5
retry_delay = 10
# Versuche DB-Verbindung mit Retry
db = None
for attempt in range(max_db_retries):
try:
db = SessionLocal()
# Test-Query um Verbindung zu prüfen
db.execute(text("SELECT 1"))
break
except (OperationalError, DisconnectionError) as e:
print(f"[SCHEDULER] ⚠️ DB-Verbindungsfehler bei Zeitplan {zeitplan_id} (Versuch {attempt + 1}/{max_db_retries}): {e}", flush=True)
if db:
try:
db.close()
except:
pass
if attempt < max_db_retries - 1:
print(f"[SCHEDULER] Warte {retry_delay} Sekunden vor erneutem Versuch...", flush=True)
time.sleep(retry_delay)
else:
print(f"[SCHEDULER] ❌ DB-Verbindung fehlgeschlagen nach {max_db_retries} Versuchen - Zeitplan übersprungen", flush=True)
logger.error(f"DB-Verbindung fehlgeschlagen für Zeitplan {zeitplan_id}")
return
except Exception as e:
print(f"[SCHEDULER] ❌ Unerwarteter DB-Fehler: {e}", flush=True)
logger.error(f"Unerwarteter DB-Fehler für Zeitplan {zeitplan_id}: {e}")
if db:
try:
db.close()
except:
pass
return
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zeitplan_id).first()
if not zeitplan:
return
logger.info(f"Starte Zeitplan: {zeitplan.name}")
try:
if zeitplan.typ == "mail_abruf":
result = execute_mail_abruf(db, zeitplan)
elif zeitplan.typ == "grobsortierung":
result = execute_grobsortierung(db, zeitplan)
elif zeitplan.typ == "sortierregeln":
result = execute_sortierregeln(db, zeitplan)
elif zeitplan.typ == "sortierung":
# Legacy: alte "sortierung" wird wie "grobsortierung" behandelt
result = execute_grobsortierung(db, zeitplan)
elif zeitplan.typ == "db_backup":
result = execute_db_backup(db, zeitplan)
else:
result = {"erfolg": False, "meldung": f"Unbekannter Typ: {zeitplan.typ}"}
# Status aktualisieren
zeitplan.letzte_ausfuehrung = datetime.utcnow()
zeitplan.letzter_status = "erfolg" if result.get("erfolg") else "fehler"
zeitplan.letzte_meldung = result.get("meldung", "")[:500]
# Nächste Ausführung berechnen
job = scheduler.get_job(f"zeitplan_{zeitplan_id}")
if job:
try:
next_time = getattr(job, 'next_run_time', None)
if next_time is None and hasattr(job, 'get_next_fire_time'):
next_time = job.get_next_fire_time()
if next_time:
zeitplan.naechste_ausfuehrung = next_time
except Exception:
pass
db.commit()
logger.info(f"Zeitplan abgeschlossen: {zeitplan.name} - {zeitplan.letzter_status}")
except (OperationalError, DisconnectionError) as e:
# DB-Verbindung während Ausführung verloren
print(f"[SCHEDULER] ❌ DB-Verbindung verloren während Zeitplan '{zeitplan.name}': {e}", flush=True)
logger.error(f"DB-Verbindung verloren bei Zeitplan {zeitplan.name}: {e}")
# Versuche Status trotzdem zu speichern
try:
db.rollback()
zeitplan.letzte_ausfuehrung = datetime.utcnow()
zeitplan.letzter_status = "fehler"
zeitplan.letzte_meldung = f"DB-Verbindungsfehler: {str(e)[:450]}"
db.commit()
except:
pass
except Exception as e:
zeitplan.letzte_ausfuehrung = datetime.utcnow()
zeitplan.letzter_status = "fehler"
zeitplan.letzte_meldung = str(e)[:500]
try:
db.commit()
except:
pass
logger.error(f"Fehler bei Zeitplan {zeitplan.name}: {e}")
finally:
if db:
try:
db.close()
except:
pass
def execute_mail_abruf(db, zeitplan: Zeitplan) -> Dict:
"""Führt Mail-Abruf aus"""
from ..models.database import VerarbeiteteMail
# Postfächer bestimmen
if zeitplan.postfach_id:
postfaecher = db.query(Postfach).filter(
Postfach.id == zeitplan.postfach_id,
Postfach.aktiv == True
).all()
else:
postfaecher = db.query(Postfach).filter(Postfach.aktiv == True).all()
if not postfaecher:
return {"erfolg": True, "meldung": "Keine aktiven Postfächer gefunden"}
gesamt_dateien = 0
fehler = []
for postfach in postfaecher:
try:
# Bereits verarbeitete Message-IDs laden
bereits_verarbeitet = set(
m.message_id for m in db.query(VerarbeiteteMail)
.filter(VerarbeiteteMail.postfach_id == postfach.id)
.all()
)
config = {
"imap_server": postfach.imap_server,
"imap_port": postfach.imap_port,
"email": postfach.email,
"passwort": postfach.passwort,
"ordner": postfach.ordner,
"erlaubte_typen": postfach.erlaubte_typen or [".pdf"],
"max_groesse_mb": postfach.max_groesse_mb or 25,
"min_groesse_kb": postfach.min_groesse_kb or 10
}
fetcher = MailFetcher(config)
if not fetcher.connect():
fehler.append(f"{postfach.name}: Verbindung fehlgeschlagen")
continue
from pathlib import Path
ziel = Path(postfach.ziel_ordner) if postfach.ziel_ordner else INBOX_DIR
ergebnisse = fetcher.fetch_attachments(
ziel_ordner=ziel,
nur_ungelesen=postfach.nur_ungelesen,
markiere_gelesen=True,
alle_ordner=postfach.alle_ordner,
bereits_verarbeitet=bereits_verarbeitet
)
# Verarbeitete Mails speichern
for ergebnis in ergebnisse:
if ergebnis.get("message_id"):
db.add(VerarbeiteteMail(
postfach_id=postfach.id,
message_id=ergebnis["message_id"],
ordner=ergebnis.get("ordner"),
betreff=ergebnis.get("betreff", "")[:500],
absender=ergebnis.get("absender", "")[:255],
anzahl_attachments=1
))
# Postfach-Status aktualisieren
try:
postfach.letzter_abruf = datetime.utcnow()
postfach.letzte_anzahl = len(ergebnisse)
db.commit()
except Exception as commit_error:
logger.warning(f"Postfach-Status Update fehlgeschlagen: {commit_error}")
db.rollback()
gesamt_dateien += len(ergebnisse)
fetcher.disconnect()
except Exception as e:
db.rollback()
fehler.append(f"{postfach.name}: {str(e)[:100]}")
if fehler:
return {
"erfolg": len(fehler) < len(postfaecher),
"meldung": f"{gesamt_dateien} Dateien geholt. Fehler: {'; '.join(fehler)}"
}
return {"erfolg": True, "meldung": f"{gesamt_dateien} Dateien aus {len(postfaecher)} Postfächern geholt"}
def execute_grobsortierung(db, zeitplan: Zeitplan) -> Dict:
"""Führt Grobsortierung aus (QuellOrdner verarbeiten)"""
from ..models.database import SortierRegel, VerarbeiteteDatei
from ..modules.pdf_processor import PDFProcessor
from pathlib import Path
print("", flush=True)
print("[GROBSORTIERUNG] === START ===", flush=True)
print(f"[GROBSORTIERUNG] Zeitplan: {zeitplan.name} (ID: {zeitplan.id})", flush=True)
# QuellOrdner bestimmen
if zeitplan.quell_ordner_id:
quell_ordner = db.query(QuellOrdner).filter(
QuellOrdner.id == zeitplan.quell_ordner_id,
QuellOrdner.aktiv == True
).all()
else:
quell_ordner = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all()
print(f"[GROBSORTIERUNG] Gefunden: {len(quell_ordner)} aktive Quellordner", flush=True)
if not quell_ordner:
print("[GROBSORTIERUNG] ⚠️ Keine aktiven Quellordner - Abbruch", flush=True)
return {"erfolg": True, "meldung": "Keine aktiven Quellordner gefunden"}
# Regeln laden
regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).order_by(SortierRegel.prioritaet).all()
print(f"[GROBSORTIERUNG] Gefunden: {len(regeln)} aktive Regeln", flush=True)
# Prüfen ob mindestens ein Ordner direkt_verschieben aktiviert hat
hat_direkt_verschieben = any(getattr(qo, 'direkt_verschieben', False) for qo in quell_ordner)
if not regeln and not hat_direkt_verschieben:
print("[GROBSORTIERUNG] ⚠️ Keine aktiven Regeln und kein direkt_verschieben - Abbruch", flush=True)
return {"erfolg": False, "meldung": "Keine aktiven Regeln definiert"}
if not regeln:
print("[GROBSORTIERUNG] Keine Regeln, aber direkt_verschieben aktiv - fahre fort", flush=True)
# Regeln in Dict-Format
regeln_dicts = [{
"id": r.id,
"name": r.name,
"prioritaet": r.prioritaet,
"muster": r.muster,
"extraktion": r.extraktion,
"schema": r.schema,
"unterordner": r.unterordner
} for r in regeln]
sorter = Sorter(regeln_dicts)
pdf_processor = PDFProcessor()
gesamt_sortiert = 0
gesamt_fehler = 0
gesamt_ohne_regel = 0
fehler_meldungen = []
for qo in quell_ordner:
ordner_sortiert = 0 # Zähler pro Ordner
print("", flush=True)
print(f"[GROBSORTIERUNG] --- Verarbeite: {qo.name} ---", flush=True)
print(f"[GROBSORTIERUNG] Quelle: {qo.pfad}", flush=True)
print(f"[GROBSORTIERUNG] Ziel: {qo.ziel_ordner}", flush=True)
print(f"[GROBSORTIERUNG] Einstellungen: direkt_verschieben={qo.direkt_verschieben}, zugferd={qo.zugferd_behandlung}", flush=True)
try:
pfad = Path(qo.pfad)
# Debug: Ordner-Checks
quelle_check = check_folder_permissions(str(pfad), f"{qo.name}/Quelle")
ziel_check = check_folder_permissions(qo.ziel_ordner, f"{qo.name}/Ziel")
if not pfad.exists():
print(f"[GROBSORTIERUNG] ❌ Quellpfad existiert nicht - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Quellpfad existiert nicht")
continue
if not quelle_check["lesbar"]:
print(f"[GROBSORTIERUNG] ❌ Quellpfad nicht lesbar - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Keine Leserechte auf Quelle")
continue
if not ziel_check["schreibbar"]:
print(f"[GROBSORTIERUNG] ❌ Zielpfad nicht beschreibbar - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Keine Schreibrechte auf Ziel")
continue
ziel_basis = Path(qo.ziel_ordner)
# Dateien sammeln
pattern = "**/*" if qo.rekursiv else "*"
erlaubte = [t.lower() for t in (qo.dateitypen or [".pdf"])]
print(f"[GROBSORTIERUNG] Suche nach: {erlaubte} (rekursiv={qo.rekursiv})", flush=True)
dateien = [f for f in pfad.glob(pattern) if f.is_file() and f.suffix.lower() in erlaubte]
print(f"[GROBSORTIERUNG] ✓ Gefunden: {len(dateien)} Dateien", flush=True)
if len(dateien) == 0:
print(f"[GROBSORTIERUNG] Keine passenden Dateien im Ordner", flush=True)
for datei in dateien:
try:
ist_pdf = datei.suffix.lower() == ".pdf"
text = ""
if ist_pdf:
pdf_result = pdf_processor.verarbeite(str(datei))
if pdf_result.get("fehler"):
raise Exception(pdf_result["fehler"])
text = pdf_result.get("text", "")
# ZUGFeRD-Behandlung basierend auf Einstellung
# Optionen: "separieren", "regel", "normal", "ignorieren"
zugferd_behandlung = getattr(qo, 'zugferd_behandlung', 'normal') or 'normal'
ist_zugferd = pdf_result.get("ist_zugferd", False)
if zugferd_behandlung == "separieren":
# NUR ZUGFeRD-PDFs verarbeiten
if not ist_zugferd:
# Keine ZUGFeRD-PDF -> überspringen
continue
# ZUGFeRD-PDF direkt in Zielordner verschieben
ziel_basis.mkdir(parents=True, exist_ok=True)
neuer_pfad = ziel_basis / datei.name
counter = 1
while neuer_pfad.exists():
neuer_pfad = ziel_basis / f"{datei.stem}_{counter}{datei.suffix}"
counter += 1
datei.rename(neuer_pfad)
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(neuer_pfad),
neuer_name=neuer_pfad.name,
ist_zugferd=True,
status="zugferd"
))
gesamt_sortiert += 1
ordner_sortiert += 1
continue
elif zugferd_behandlung == "ignorieren":
# ZUGFeRD-PDFs überspringen, nur normale verarbeiten
if ist_zugferd:
continue
# Weiter mit Regelprüfung für normale PDFs
# Bei "regel" oder "normal": Alle PDFs durch Regeln prüfen
# Direkt verschieben (ohne Regelprüfung)?
direkt_verschieben = getattr(qo, 'direkt_verschieben', False)
if direkt_verschieben:
# Datei direkt in Zielordner verschieben
print(f"[GROBSORTIERUNG] → Verschiebe direkt: {datei.name}", flush=True)
try:
ziel_basis.mkdir(parents=True, exist_ok=True)
except PermissionError as pe:
print(f"[GROBSORTIERUNG] ❌ Kann Zielordner nicht erstellen: {pe}", flush=True)
raise
neuer_pfad = ziel_basis / datei.name
counter = 1
while neuer_pfad.exists():
neuer_pfad = ziel_basis / f"{datei.stem}_{counter}{datei.suffix}"
counter += 1
try:
datei.rename(neuer_pfad)
print(f"[GROBSORTIERUNG] ✓ Verschoben nach: {neuer_pfad}", flush=True)
except PermissionError as pe:
print(f"[GROBSORTIERUNG] ❌ Keine Berechtigung zum Verschieben: {pe}", flush=True)
raise
except Exception as me:
print(f"[GROBSORTIERUNG] ❌ Fehler beim Verschieben: {me}", flush=True)
raise
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(neuer_pfad),
neuer_name=neuer_pfad.name,
status="direkt"
))
gesamt_sortiert += 1
ordner_sortiert += 1
continue
# Regel finden
doc_info = {"text": text, "original_name": datei.name, "absender": "", "dateityp": datei.suffix.lower()}
regel = sorter.finde_passende_regel(doc_info)
if not regel:
gesamt_ohne_regel += 1
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
status="keine_regel",
fehler="Keine passende Regel gefunden"
))
continue
# Felder extrahieren und verschieben
extrahiert = sorter.extrahiere_felder(regel, doc_info)
schema = regel.get("schema", "{datum} - Dokument.pdf")
if schema.endswith(".pdf"):
schema = schema[:-4] + datei.suffix
neuer_name = sorter.generiere_dateinamen({"schema": schema, **regel}, extrahiert)
ziel = ziel_basis
if regel.get("unterordner"):
ziel = ziel / regel["unterordner"]
ziel.mkdir(parents=True, exist_ok=True)
sorter.verschiebe_datei(str(datei), str(ziel), neuer_name)
gesamt_sortiert += 1
ordner_sortiert += 1
except Exception as e:
gesamt_fehler += 1
print(f"[GROBSORTIERUNG] ❌ FEHLER bei {datei.name}: {e}", flush=True)
logger.error(f"Fehler bei Datei {datei}: {e}")
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
status="fehler",
fehler=str(e)[:500]
))
# Ordner-Status aktualisieren (wie bei Postfächern)
qo.letzte_verarbeitung = datetime.utcnow()
qo.letzte_anzahl = ordner_sortiert
print(f"[GROBSORTIERUNG] ✓ {qo.name} abgeschlossen: {ordner_sortiert} Dateien verschoben", flush=True)
except Exception as e:
print(f"[GROBSORTIERUNG] ❌ FEHLER bei Ordner {qo.name}: {e}", flush=True)
fehler_meldungen.append(f"{qo.name}: {str(e)[:100]}")
db.commit()
meldung = f"{gesamt_sortiert} Dateien sortiert"
if gesamt_ohne_regel > 0:
meldung += f", {gesamt_ohne_regel} ohne passende Regel"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen)})"
print("", flush=True)
print(f"[GROBSORTIERUNG] === ENDE === {meldung}", flush=True)
print("", flush=True)
# Erfolg wenn keine echten Fehler (ohne_regel zählt nicht als Fehler)
return {"erfolg": gesamt_fehler == 0 and not fehler_meldungen, "meldung": meldung}
def execute_sortierregeln(db, zeitplan: Zeitplan) -> Dict:
"""Führt Feinsortierung aus (Ordner-Zuweisungen + freie Ordner von Regeln)"""
from ..models.database import SortierRegel, VerarbeiteteDatei, OrdnerRegel
from ..modules.pdf_processor import PDFProcessor
from pathlib import Path
print("", flush=True)
print("[FEINSORTIERUNG] === START ===", flush=True)
print(f"[FEINSORTIERUNG] Zeitplan: {zeitplan.name} (ID: {zeitplan.id})", flush=True)
pdf_processor = PDFProcessor()
gesamt_sortiert = 0
gesamt_fehler = 0
gesamt_ohne_regel = 0
fehler_meldungen = []
# Fallback-Regeln laden (gelten für alle Ordner wenn keine andere Regel passt)
fallback_regeln = db.query(SortierRegel).filter(
SortierRegel.aktiv == True,
SortierRegel.ist_fallback == True
).order_by(SortierRegel.prioritaet).all()
fallback_dicts = [{
"id": r.id, "name": r.name, "prioritaet": r.prioritaet,
"muster": r.muster, "extraktion": r.extraktion,
"schema": r.schema, "unterordner": r.unterordner,
"ziel_ordner": getattr(r, 'ziel_ordner', None),
"nur_umbenennen": getattr(r, 'nur_umbenennen', False)
} for r in fallback_regeln]
fallback_sorter = Sorter(fallback_dicts) if fallback_dicts else None
print(f"[FEINSORTIERUNG] Fallback-Regeln: {len(fallback_dicts)}", flush=True)
# ============ TEIL 1: Ordner-Zuweisungen verarbeiten ============
# (wie der Button /api/sortierung/starten)
quell_ordner_liste = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all()
print(f"[FEINSORTIERUNG] Aktive Quellordner: {len(quell_ordner_liste)}", flush=True)
for quell_ordner in quell_ordner_liste:
# Feinsortierung arbeitet im ZIEL-Ordner (wo Dateien nach Grobsortierung liegen)
pfad = Path(quell_ordner.ziel_ordner)
print(f"[FEINSORTIERUNG] --- Ordner: {quell_ordner.name} ---", flush=True)
print(f"[FEINSORTIERUNG] Ziel-Pfad: {pfad}", flush=True)
if not pfad.exists():
print(f"[FEINSORTIERUNG] ⚠️ Pfad existiert nicht - überspringe", flush=True)
continue
# Dateien sammeln
dateien = [f for f in pfad.glob("**/*") if f.is_file() and f.suffix.lower() == ".pdf"]
print(f"[FEINSORTIERUNG] Gefunden: {len(dateien)} PDF-Dateien", flush=True)
# Lade nur Regeln die diesem Ordner zugewiesen sind
zuweisungen = db.query(OrdnerRegel).filter(OrdnerRegel.ordner_id == quell_ordner.id).all()
zugewiesene_regel_ids = [z.regel_id for z in zuweisungen]
if zugewiesene_regel_ids:
regeln = db.query(SortierRegel).filter(
SortierRegel.id.in_(zugewiesene_regel_ids),
SortierRegel.aktiv == True,
SortierRegel.ist_fallback == False
).order_by(SortierRegel.prioritaet).all()
else:
regeln = []
# Regeln in Dict-Format
regeln_dicts = [{
"id": r.id, "name": r.name, "prioritaet": r.prioritaet,
"muster": r.muster, "extraktion": r.extraktion,
"schema": r.schema, "unterordner": r.unterordner,
"ziel_ordner": getattr(r, 'ziel_ordner', None),
"nur_umbenennen": getattr(r, 'nur_umbenennen', False)
} for r in regeln]
sorter = Sorter(regeln_dicts) if regeln_dicts else None
print(f"[FEINSORTIERUNG] Zugewiesene Regeln: {len(regeln_dicts)}", flush=True)
for datei in dateien:
try:
text = ""
ist_zugferd = False
ocr_gemacht = False
# PDF verarbeiten
ocr_erlaubt = getattr(quell_ordner, 'ocr_aktivieren', True)
original_backup = getattr(quell_ordner, 'original_sichern', None)
pdf_result = pdf_processor.verarbeite(
str(datei),
ocr_erlaubt=ocr_erlaubt,
original_backup_pfad=original_backup
)
if pdf_result.get("fehler"):
raise Exception(pdf_result["fehler"])
text = pdf_result.get("text", "")
ist_zugferd = pdf_result.get("ist_zugferd", False)
ocr_gemacht = pdf_result.get("ocr_durchgefuehrt", False)
# Dokument-Info für Regel-Matching
doc_info = {
"text": text,
"original_name": datei.name,
"absender": "",
"dateityp": datei.suffix.lower()
}
# Passende Regel finden (erst zugewiesene, dann Fallback)
regel = None
if sorter:
regel = sorter.finde_passende_regel(doc_info)
# Fallback-Regel wenn keine zugewiesene passt
ist_fallback = False
if not regel and fallback_sorter:
regel = fallback_sorter.finde_passende_regel(doc_info)
if regel:
ist_fallback = True
if not regel:
print(f"[FEINSORTIERUNG] ⚠️ Keine Regel für: {datei.name}", flush=True)
gesamt_ohne_regel += 1
continue
print(f"[FEINSORTIERUNG] ✓ Regel '{regel.get('name')}' für: {datei.name}", flush=True)
# Felder extrahieren
aktiver_sorter = sorter if not ist_fallback else fallback_sorter
extrahiert = aktiver_sorter.extrahiere_felder(regel, doc_info)
# Dateiendung beibehalten
schema = regel.get("schema", "{datum} - Dokument.pdf")
if schema.endswith(".pdf"):
schema = schema[:-4] + datei.suffix
neuer_name = aktiver_sorter.generiere_dateinamen(
{"schema": schema, **regel}, extrahiert
)
# Zielordner bestimmen
if regel.get("nur_umbenennen"):
ziel = datei.parent
elif regel.get("ziel_ordner"):
ziel = Path(regel["ziel_ordner"])
if regel.get("unterordner"):
ziel = ziel / regel["unterordner"]
else:
ziel = pfad
if regel.get("unterordner"):
ziel = ziel / regel["unterordner"]
ziel.mkdir(parents=True, exist_ok=True)
# Verschieben
print(f"[FEINSORTIERUNG] → Verschiebe: {datei.name} -> {ziel}/{neuer_name}", flush=True)
neuer_pfad = aktiver_sorter.verschiebe_datei(str(datei), str(ziel), neuer_name)
gesamt_sortiert += 1
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=neuer_pfad,
neuer_name=neuer_name,
ist_zugferd=ist_zugferd,
ocr_durchgefuehrt=ocr_gemacht,
status="sortiert",
extrahierte_daten=extrahiert
))
except Exception as e:
gesamt_fehler += 1
print(f"[FEINSORTIERUNG] ❌ Fehler bei {datei.name}: {e}", flush=True)
logger.error(f"Fehler bei Datei {datei}: {e}")
# ============ TEIL 2: Freie Ordner von Regeln verarbeiten ============
print("", flush=True)
print("[FEINSORTIERUNG] --- Freie Ordner ---", flush=True)
alle_regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).all()
for regel in alle_regeln:
freie_ordner = regel.freie_ordner if regel.freie_ordner else []
if not freie_ordner:
continue
print(f"[FEINSORTIERUNG] Regel '{regel.name}' hat {len(freie_ordner)} freie Ordner", flush=True)
regel_dict = {
"id": regel.id,
"name": regel.name,
"prioritaet": regel.prioritaet,
"muster": regel.muster,
"extraktion": regel.extraktion,
"schema": regel.schema,
"unterordner": regel.unterordner,
"ziel_ordner": getattr(regel, 'ziel_ordner', None),
"nur_umbenennen": getattr(regel, 'nur_umbenennen', False)
}
regel_sorter = Sorter([regel_dict])
for freier_ordner_pfad in freie_ordner:
freier_pfad = Path(freier_ordner_pfad)
if not freier_pfad.exists() or not freier_pfad.is_dir():
print(f"[FEINSORTIERUNG] ⚠️ Freier Ordner existiert nicht: {freier_ordner_pfad}", flush=True)
continue
dateien = [f for f in freier_pfad.glob("**/*") if f.is_file() and f.suffix.lower() == ".pdf"]
print(f"[FEINSORTIERUNG] Freier Ordner {freier_ordner_pfad}: {len(dateien)} Dateien", flush=True)
for datei in dateien:
try:
text = ""
ist_zugferd = False
pdf_result = pdf_processor.verarbeite(str(datei))
if pdf_result.get("fehler"):
raise Exception(pdf_result["fehler"])
text = pdf_result.get("text", "")
ist_zugferd = pdf_result.get("ist_zugferd", False)
doc_info = {
"text": text,
"original_name": datei.name,
"absender": "",
"dateityp": datei.suffix.lower()
}
passend = regel_sorter.finde_passende_regel(doc_info)
if not passend:
continue
extrahiert = regel_sorter.extrahiere_felder(passend, doc_info)
schema = passend.get("schema", "{datum} - Dokument.pdf")
if schema.endswith(".pdf"):
schema = schema[:-4] + datei.suffix
neuer_name = regel_sorter.generiere_dateinamen(
{"schema": schema, **passend}, extrahiert
)
if passend.get("nur_umbenennen"):
ziel = datei.parent
elif passend.get("ziel_ordner"):
ziel = Path(passend["ziel_ordner"])
if passend.get("unterordner"):
ziel = ziel / passend["unterordner"]
else:
ziel = freier_pfad
if passend.get("unterordner"):
ziel = ziel / passend["unterordner"]
ziel.mkdir(parents=True, exist_ok=True)
regel_sorter.verschiebe_datei(str(datei), str(ziel), neuer_name)
gesamt_sortiert += 1
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(ziel / neuer_name),
neuer_name=neuer_name,
ist_zugferd=ist_zugferd,
status="sortiert",
extrahierte_daten=extrahiert
))
except Exception as e:
gesamt_fehler += 1
logger.error(f"Fehler bei Datei {datei}: {e}")
db.commit()
meldung = f"{gesamt_sortiert} Dateien sortiert"
if gesamt_ohne_regel > 0:
meldung += f", {gesamt_ohne_regel} ohne passende Regel"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen)})"
print("", flush=True)
print(f"[FEINSORTIERUNG] === ENDE === {meldung}", flush=True)
print("", flush=True)
return {"erfolg": gesamt_fehler == 0, "meldung": meldung}
def execute_db_backup(db, zeitplan: Zeitplan) -> Dict:
"""Führt Datenbank-Backup aus"""
from ..models.database import Datenbank
from .backup_service import erstelle_db_backup
print(f"[SCHEDULER] Starte DB-Backup: {zeitplan.name}", flush=True)
# Datenbanken laden (optional spezifische Datenbank)
if zeitplan.datenbank_id:
datenbanken = db.query(Datenbank).filter(
Datenbank.id == zeitplan.datenbank_id,
Datenbank.aktiv == True
).all()
else:
datenbanken = db.query(Datenbank).filter(Datenbank.aktiv == True).all()
if not datenbanken:
return {"erfolg": True, "meldung": "Keine aktiven Datenbanken gefunden"}
gesamt_erfolg = 0
gesamt_fehler = 0
fehler_meldungen = []
for datenbank in datenbanken:
try:
print(f"[SCHEDULER] Backup für: {datenbank.name}", flush=True)
result = erstelle_db_backup(datenbank.id, db)
gesamt_erfolg += 1
except Exception as e:
gesamt_fehler += 1
fehler_meldungen.append(f"{datenbank.name}: {str(e)}")
logger.error(f"Backup-Fehler für {datenbank.name}: {e}")
meldung = f"{gesamt_erfolg} Backups erstellt"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen[:3])})"
return {"erfolg": gesamt_fehler == 0, "meldung": meldung}
def get_scheduler_status() -> Dict:
"""Gibt den Status aller Zeitpläne zurück"""
global scheduler
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).all()
result = []
for zp in zeitplaene:
job = scheduler.get_job(f"zeitplan_{zp.id}") if scheduler else None
result.append({
"id": zp.id,
"name": zp.name,
"typ": zp.typ,
"intervall": zp.intervall,
"aktiv": zp.aktiv,
"letzte_ausfuehrung": zp.letzte_ausfuehrung.isoformat() if zp.letzte_ausfuehrung else None,
"naechste_ausfuehrung": zp.naechste_ausfuehrung.isoformat() if zp.naechste_ausfuehrung else None,
"letzter_status": zp.letzter_status,
"letzte_meldung": zp.letzte_meldung,
"job_aktiv": job is not None
})
return {
"scheduler_laeuft": scheduler is not None and scheduler.running if scheduler else False,
"zeitplaene": result
}
finally:
db.close()
def trigger_zeitplan_manuell(zeitplan_id: int) -> Dict:
"""Löst einen Zeitplan manuell aus"""
db = SessionLocal()
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zeitplan_id).first()
if not zeitplan:
return {"erfolg": False, "meldung": "Zeitplan nicht gefunden"}
# Synchron ausführen
execute_zeitplan(zeitplan_id)
return {"erfolg": True, "meldung": f"Zeitplan '{zeitplan.name}' wurde ausgeführt"}
finally:
db.close()