docker.dateiverwaltung/Source/backend/app/services/scheduler_service.py
data c5ee82e1c2 V 2.0 - Feinsortierung Live-Streaming, Import/Export, PDF-Rotation
Neue Features:
- Feinsortierung mit Live-Streaming (SSE) - zeigt Fortschritt in Echtzeit
- Import/Export für Sortierregeln (JSON)
- Sortierregeln-Liste mit Sortieroptionen (Name A-Z/Z-A, Priorität)
- Checkbox "Auch Dateinamen prüfen" für Keyword-Matching
- Automatische PDF-Seitenrotation bei OCR (90°, 180°, 270°)
- Tab-Persistenz über Page-Reload (localStorage)
- Modals schließen nur noch über X-Button

Bugfixes:
- Keywords nutzen jetzt Wortgrenzen (\b) - "rechnung" matched nicht mehr "Berechnung"
- Keyword-Prüfung standardmäßig nur auf PDF-Text, nicht Dateinamen
- Natürliche Sortierung für Regelnamen (1, 2, 10 statt 1, 10, 2)

Technisch:
- Async SSE-Generator mit asyncio.sleep(0) für sofortiges Streaming
- ocrmypdf mit --rotate-pages Flag
- Timeout für OCR auf 3 Minuten erhöht

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-10 13:42:12 +01:00

959 lines
36 KiB
Python
Executable file
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Scheduler-Service für automatische Ausführung von Aufgaben
"""
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import logging
import os
import stat
from pathlib import Path
from typing import Optional, Dict, List
from zoneinfo import ZoneInfo
from ..models.database import SessionLocal, Zeitplan, Postfach, QuellOrdner
from ..modules.mail_fetcher import MailFetcher
from ..modules.sorter import Sorter
from ..config import INBOX_DIR
logger = logging.getLogger(__name__)
def check_folder_permissions(pfad: str, name: str = "") -> Dict:
"""
Prüft Ordner-Berechtigungen und gibt detaillierte Infos zurück.
Gibt Debug-Infos auf stdout aus für Container-Logs.
"""
result = {
"pfad": pfad,
"name": name,
"existiert": False,
"lesbar": False,
"schreibbar": False,
"ist_ordner": False,
"dateien_anzahl": 0,
"fehler": None
}
prefix = f"[DEBUG] [{name}]" if name else "[DEBUG]"
try:
p = Path(pfad)
result["existiert"] = p.exists()
if not p.exists():
print(f"{prefix} ❌ Pfad existiert NICHT: {pfad}", flush=True)
result["fehler"] = "Pfad existiert nicht"
return result
result["ist_ordner"] = p.is_dir()
if not p.is_dir():
print(f"{prefix} ❌ Pfad ist KEIN Ordner: {pfad}", flush=True)
result["fehler"] = "Pfad ist kein Ordner"
return result
# Berechtigungen prüfen
result["lesbar"] = os.access(pfad, os.R_OK)
result["schreibbar"] = os.access(pfad, os.W_OK)
# Dateien zählen
try:
dateien = list(p.glob("*"))
result["dateien_anzahl"] = len([f for f in dateien if f.is_file()])
except PermissionError:
result["dateien_anzahl"] = -1
# Stat-Infos
try:
st = p.stat()
mode = stat.filemode(st.st_mode)
uid = st.st_uid
gid = st.st_gid
except Exception:
mode = "?"
uid = "?"
gid = "?"
# Status-Symbol
if result["lesbar"] and result["schreibbar"]:
status = ""
elif result["lesbar"]:
status = "⚠️ NUR LESBAR"
else:
status = "❌ KEIN ZUGRIFF"
print(f"{prefix} {status} {pfad}", flush=True)
print(f"{prefix} Rechte: {mode} | UID: {uid} | GID: {gid} | Dateien: {result['dateien_anzahl']}", flush=True)
if not result["schreibbar"]:
result["fehler"] = "Keine Schreibrechte"
except Exception as e:
print(f"{prefix} ❌ FEHLER beim Prüfen: {pfad} - {e}", flush=True)
result["fehler"] = str(e)
return result
def check_all_folders_on_startup():
"""Prüft alle konfigurierten Ordner beim Start"""
print("", flush=True)
print("=" * 60, flush=True)
print("[DEBUG] === ORDNER-BERECHTIGUNGEN PRÜFEN ===", flush=True)
print("=" * 60, flush=True)
# Prozess-Info
print(f"[DEBUG] Prozess läuft als UID: {os.getuid()}, GID: {os.getgid()}", flush=True)
# Umgebungsvariablen für PUID/PGID (Unraid-Style)
puid = os.environ.get("PUID", "nicht gesetzt")
pgid = os.environ.get("PGID", "nicht gesetzt")
print(f"[DEBUG] Umgebung: PUID={puid}, PGID={pgid}", flush=True)
# Aktueller User
try:
import pwd
import grp
user_info = pwd.getpwuid(os.getuid())
group_info = grp.getgrgid(os.getgid())
print(f"[DEBUG] User: {user_info.pw_name}, Gruppe: {group_info.gr_name}", flush=True)
except Exception:
print("[DEBUG] User/Gruppe konnte nicht ermittelt werden", flush=True)
db = SessionLocal()
try:
# Quellordner prüfen
quell_ordner = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all()
print(f"[DEBUG] Gefunden: {len(quell_ordner)} aktive Quellordner", flush=True)
print("", flush=True)
for qo in quell_ordner:
print(f"[DEBUG] --- {qo.name} ---", flush=True)
# Quellpfad prüfen
check_folder_permissions(qo.pfad, f"{qo.name}/Quelle")
# Zielpfad prüfen
check_folder_permissions(qo.ziel_ordner, f"{qo.name}/Ziel")
print("", flush=True)
# Postfächer Zielordner prüfen
postfaecher = db.query(Postfach).filter(Postfach.aktiv == True).all()
print(f"[DEBUG] Gefunden: {len(postfaecher)} aktive Postfächer", flush=True)
for pf in postfaecher:
if pf.ziel_ordner:
check_folder_permissions(pf.ziel_ordner, f"Postfach/{pf.name}")
print("=" * 60, flush=True)
print("", flush=True)
except Exception as e:
print(f"[DEBUG] ❌ Fehler bei Ordner-Prüfung: {e}", flush=True)
finally:
db.close()
# Globaler Scheduler
scheduler: Optional[BackgroundScheduler] = None
# Timezone für Scheduler (robust gegen ungültige TZ-Variablen)
def get_timezone():
"""Ermittelt eine gültige Timezone"""
tz_env = os.environ.get("TZ", "Europe/Berlin")
# Prüfen ob gültiger Timezone-String (nicht ${...} oder ähnlich)
if tz_env.startswith("$") or "/" not in tz_env:
tz_env = "Europe/Berlin"
try:
return ZoneInfo(tz_env)
except Exception:
return ZoneInfo("Europe/Berlin")
def get_scheduler() -> BackgroundScheduler:
"""Gibt den globalen Scheduler zurück"""
global scheduler
if scheduler is None:
scheduler = BackgroundScheduler(timezone=get_timezone())
return scheduler
def init_scheduler():
"""Initialisiert den Scheduler beim App-Start"""
global scheduler
scheduler = BackgroundScheduler(timezone=get_timezone())
# Ordner-Berechtigungen beim Start prüfen
check_all_folders_on_startup()
# Zeitpläne aus DB laden und Jobs erstellen
sync_zeitplaene()
scheduler.start()
logger.info("Scheduler gestartet")
# Überfällige Zeitpläne beim Start ausführen (asynchron nach 5 Sekunden)
import threading
def delayed_overdue_check():
import time
time.sleep(5) # Warte bis App vollständig gestartet
execute_overdue_zeitplaene()
thread = threading.Thread(target=delayed_overdue_check, daemon=True)
thread.start()
def execute_overdue_zeitplaene():
"""Führt alle überfälligen Zeitpläne aus"""
from datetime import datetime
print("Prüfe überfällige Zeitpläne...", flush=True)
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).filter(Zeitplan.aktiv == True).all()
now = datetime.utcnow()
print(f"Gefunden: {len(zeitplaene)} aktive Zeitpläne, aktuelle Zeit (UTC): {now}", flush=True)
for zp in zeitplaene:
print(f" Zeitplan '{zp.name}': nächste={zp.naechste_ausfuehrung}, letzte={zp.letzte_ausfuehrung}", flush=True)
# Prüfen ob überfällig (naechste_ausfuehrung liegt in der Vergangenheit)
if zp.naechste_ausfuehrung and zp.naechste_ausfuehrung < now:
print(f" -> Führe überfälligen Zeitplan aus: {zp.name}", flush=True)
try:
execute_zeitplan(zp.id)
except Exception as e:
print(f" -> Fehler bei überfälligem Zeitplan {zp.name}: {e}", flush=True)
# Oder: Noch nie ausgeführt
elif zp.letzte_ausfuehrung is None:
print(f" -> Führe noch nie ausgeführten Zeitplan aus: {zp.name}", flush=True)
try:
execute_zeitplan(zp.id)
except Exception as e:
print(f" -> Fehler bei Zeitplan {zp.name}: {e}", flush=True)
else:
print(f" -> Nicht überfällig", flush=True)
except Exception as e:
print(f"Fehler bei Zeitplan-Prüfung: {e}", flush=True)
finally:
db.close()
def shutdown_scheduler():
"""Beendet den Scheduler"""
global scheduler
if scheduler:
scheduler.shutdown()
logger.info("Scheduler beendet")
def sync_zeitplaene():
"""Synchronisiert Zeitpläne aus der DB mit dem Scheduler"""
global scheduler
if not scheduler:
return
# Alle bestehenden Jobs entfernen
scheduler.remove_all_jobs()
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).filter(Zeitplan.aktiv == True).all()
for zp in zeitplaene:
add_job_for_zeitplan(zp)
logger.info(f"Job hinzugefügt: {zp.name} ({zp.intervall})")
finally:
db.close()
def add_job_for_zeitplan(zp: Zeitplan):
"""Fügt einen Job für einen Zeitplan hinzu"""
global scheduler
if not scheduler:
return
job_id = f"zeitplan_{zp.id}"
# CronTrigger basierend auf Intervall erstellen
trigger = create_trigger(zp)
if not trigger:
return
# Job hinzufügen
scheduler.add_job(
func=execute_zeitplan,
trigger=trigger,
args=[zp.id],
id=job_id,
name=zp.name,
replace_existing=True
)
# Nächste Ausführungszeit berechnen und speichern
job = scheduler.get_job(job_id)
if job:
try:
# APScheduler 3.x: next_run_time als Attribut
# APScheduler 4.x: get_next_fire_time() oder scheduled_fire_time
next_time = getattr(job, 'next_run_time', None)
if next_time is None and hasattr(job, 'get_next_fire_time'):
next_time = job.get_next_fire_time()
if next_time:
db = SessionLocal()
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zp.id).first()
if zeitplan:
zeitplan.naechste_ausfuehrung = next_time
db.commit()
finally:
db.close()
except Exception as e:
logger.warning(f"Konnte nächste Ausführungszeit nicht ermitteln: {e}")
def create_trigger(zp: Zeitplan) -> Optional[CronTrigger]:
"""Erstellt einen CronTrigger basierend auf dem Zeitplan"""
try:
tz = get_timezone()
if zp.intervall == "stündlich":
return CronTrigger(minute=zp.minute or 0, timezone=tz)
elif zp.intervall == "täglich":
return CronTrigger(hour=zp.stunde or 6, minute=zp.minute or 0, timezone=tz)
elif zp.intervall == "wöchentlich":
return CronTrigger(
day_of_week=zp.wochentag or 0,
hour=zp.stunde or 6,
minute=zp.minute or 0,
timezone=tz
)
elif zp.intervall == "monatlich":
return CronTrigger(
day=zp.monatstag or 1,
hour=zp.stunde or 6,
minute=zp.minute or 0,
timezone=tz
)
return None
except Exception as e:
logger.error(f"Fehler beim Erstellen des Triggers: {e}")
return None
def execute_zeitplan(zeitplan_id: int):
"""Führt einen Zeitplan aus"""
db = SessionLocal()
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zeitplan_id).first()
if not zeitplan:
return
logger.info(f"Starte Zeitplan: {zeitplan.name}")
try:
if zeitplan.typ == "mail_abruf":
result = execute_mail_abruf(db, zeitplan)
elif zeitplan.typ == "grobsortierung":
result = execute_grobsortierung(db, zeitplan)
elif zeitplan.typ == "sortierregeln":
result = execute_sortierregeln(db, zeitplan)
elif zeitplan.typ == "sortierung":
# Legacy: alte "sortierung" wird wie "grobsortierung" behandelt
result = execute_grobsortierung(db, zeitplan)
elif zeitplan.typ == "db_backup":
result = execute_db_backup(db, zeitplan)
else:
result = {"erfolg": False, "meldung": f"Unbekannter Typ: {zeitplan.typ}"}
# Status aktualisieren
zeitplan.letzte_ausfuehrung = datetime.utcnow()
zeitplan.letzter_status = "erfolg" if result.get("erfolg") else "fehler"
zeitplan.letzte_meldung = result.get("meldung", "")[:500]
# Nächste Ausführung berechnen
job = scheduler.get_job(f"zeitplan_{zeitplan_id}")
if job:
try:
next_time = getattr(job, 'next_run_time', None)
if next_time is None and hasattr(job, 'get_next_fire_time'):
next_time = job.get_next_fire_time()
if next_time:
zeitplan.naechste_ausfuehrung = next_time
except Exception:
pass
db.commit()
logger.info(f"Zeitplan abgeschlossen: {zeitplan.name} - {zeitplan.letzter_status}")
except Exception as e:
zeitplan.letzte_ausfuehrung = datetime.utcnow()
zeitplan.letzter_status = "fehler"
zeitplan.letzte_meldung = str(e)[:500]
db.commit()
logger.error(f"Fehler bei Zeitplan {zeitplan.name}: {e}")
finally:
db.close()
def execute_mail_abruf(db, zeitplan: Zeitplan) -> Dict:
"""Führt Mail-Abruf aus"""
from ..models.database import VerarbeiteteMail
# Postfächer bestimmen
if zeitplan.postfach_id:
postfaecher = db.query(Postfach).filter(
Postfach.id == zeitplan.postfach_id,
Postfach.aktiv == True
).all()
else:
postfaecher = db.query(Postfach).filter(Postfach.aktiv == True).all()
if not postfaecher:
return {"erfolg": True, "meldung": "Keine aktiven Postfächer gefunden"}
gesamt_dateien = 0
fehler = []
for postfach in postfaecher:
try:
# Bereits verarbeitete Message-IDs laden
bereits_verarbeitet = set(
m.message_id for m in db.query(VerarbeiteteMail)
.filter(VerarbeiteteMail.postfach_id == postfach.id)
.all()
)
config = {
"imap_server": postfach.imap_server,
"imap_port": postfach.imap_port,
"email": postfach.email,
"passwort": postfach.passwort,
"ordner": postfach.ordner,
"erlaubte_typen": postfach.erlaubte_typen or [".pdf"],
"max_groesse_mb": postfach.max_groesse_mb or 25,
"min_groesse_kb": postfach.min_groesse_kb or 10
}
fetcher = MailFetcher(config)
if not fetcher.connect():
fehler.append(f"{postfach.name}: Verbindung fehlgeschlagen")
continue
from pathlib import Path
ziel = Path(postfach.ziel_ordner) if postfach.ziel_ordner else INBOX_DIR
ergebnisse = fetcher.fetch_attachments(
ziel_ordner=ziel,
nur_ungelesen=postfach.nur_ungelesen,
markiere_gelesen=True,
alle_ordner=postfach.alle_ordner,
bereits_verarbeitet=bereits_verarbeitet
)
# Verarbeitete Mails speichern
for ergebnis in ergebnisse:
if ergebnis.get("message_id"):
db.add(VerarbeiteteMail(
postfach_id=postfach.id,
message_id=ergebnis["message_id"],
ordner=ergebnis.get("ordner"),
betreff=ergebnis.get("betreff", "")[:500],
absender=ergebnis.get("absender", "")[:255],
anzahl_attachments=1
))
# Postfach-Status aktualisieren
postfach.letzter_abruf = datetime.utcnow()
postfach.letzte_anzahl = len(ergebnisse)
db.commit()
gesamt_dateien += len(ergebnisse)
fetcher.disconnect()
except Exception as e:
fehler.append(f"{postfach.name}: {str(e)[:100]}")
if fehler:
return {
"erfolg": len(fehler) < len(postfaecher),
"meldung": f"{gesamt_dateien} Dateien geholt. Fehler: {'; '.join(fehler)}"
}
return {"erfolg": True, "meldung": f"{gesamt_dateien} Dateien aus {len(postfaecher)} Postfächern geholt"}
def execute_grobsortierung(db, zeitplan: Zeitplan) -> Dict:
"""Führt Grobsortierung aus (QuellOrdner verarbeiten)"""
from ..models.database import SortierRegel, VerarbeiteteDatei
from ..modules.pdf_processor import PDFProcessor
from pathlib import Path
print("", flush=True)
print("[GROBSORTIERUNG] === START ===", flush=True)
print(f"[GROBSORTIERUNG] Zeitplan: {zeitplan.name} (ID: {zeitplan.id})", flush=True)
# QuellOrdner bestimmen
if zeitplan.quell_ordner_id:
quell_ordner = db.query(QuellOrdner).filter(
QuellOrdner.id == zeitplan.quell_ordner_id,
QuellOrdner.aktiv == True
).all()
else:
quell_ordner = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all()
print(f"[GROBSORTIERUNG] Gefunden: {len(quell_ordner)} aktive Quellordner", flush=True)
if not quell_ordner:
print("[GROBSORTIERUNG] ⚠️ Keine aktiven Quellordner - Abbruch", flush=True)
return {"erfolg": True, "meldung": "Keine aktiven Quellordner gefunden"}
# Regeln laden
regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).order_by(SortierRegel.prioritaet).all()
print(f"[GROBSORTIERUNG] Gefunden: {len(regeln)} aktive Regeln", flush=True)
# Prüfen ob mindestens ein Ordner direkt_verschieben aktiviert hat
hat_direkt_verschieben = any(getattr(qo, 'direkt_verschieben', False) for qo in quell_ordner)
if not regeln and not hat_direkt_verschieben:
print("[GROBSORTIERUNG] ⚠️ Keine aktiven Regeln und kein direkt_verschieben - Abbruch", flush=True)
return {"erfolg": False, "meldung": "Keine aktiven Regeln definiert"}
if not regeln:
print("[GROBSORTIERUNG] Keine Regeln, aber direkt_verschieben aktiv - fahre fort", flush=True)
# Regeln in Dict-Format
regeln_dicts = [{
"id": r.id,
"name": r.name,
"prioritaet": r.prioritaet,
"muster": r.muster,
"extraktion": r.extraktion,
"schema": r.schema,
"unterordner": r.unterordner
} for r in regeln]
sorter = Sorter(regeln_dicts)
pdf_processor = PDFProcessor()
gesamt_sortiert = 0
gesamt_fehler = 0
gesamt_ohne_regel = 0
fehler_meldungen = []
for qo in quell_ordner:
ordner_sortiert = 0 # Zähler pro Ordner
print("", flush=True)
print(f"[GROBSORTIERUNG] --- Verarbeite: {qo.name} ---", flush=True)
print(f"[GROBSORTIERUNG] Quelle: {qo.pfad}", flush=True)
print(f"[GROBSORTIERUNG] Ziel: {qo.ziel_ordner}", flush=True)
print(f"[GROBSORTIERUNG] Einstellungen: direkt_verschieben={qo.direkt_verschieben}, zugferd={qo.zugferd_behandlung}", flush=True)
try:
pfad = Path(qo.pfad)
# Debug: Ordner-Checks
quelle_check = check_folder_permissions(str(pfad), f"{qo.name}/Quelle")
ziel_check = check_folder_permissions(qo.ziel_ordner, f"{qo.name}/Ziel")
if not pfad.exists():
print(f"[GROBSORTIERUNG] ❌ Quellpfad existiert nicht - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Quellpfad existiert nicht")
continue
if not quelle_check["lesbar"]:
print(f"[GROBSORTIERUNG] ❌ Quellpfad nicht lesbar - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Keine Leserechte auf Quelle")
continue
if not ziel_check["schreibbar"]:
print(f"[GROBSORTIERUNG] ❌ Zielpfad nicht beschreibbar - überspringe", flush=True)
fehler_meldungen.append(f"{qo.name}: Keine Schreibrechte auf Ziel")
continue
ziel_basis = Path(qo.ziel_ordner)
# Dateien sammeln
pattern = "**/*" if qo.rekursiv else "*"
erlaubte = [t.lower() for t in (qo.dateitypen or [".pdf"])]
print(f"[GROBSORTIERUNG] Suche nach: {erlaubte} (rekursiv={qo.rekursiv})", flush=True)
dateien = [f for f in pfad.glob(pattern) if f.is_file() and f.suffix.lower() in erlaubte]
print(f"[GROBSORTIERUNG] ✓ Gefunden: {len(dateien)} Dateien", flush=True)
if len(dateien) == 0:
print(f"[GROBSORTIERUNG] Keine passenden Dateien im Ordner", flush=True)
for datei in dateien:
try:
ist_pdf = datei.suffix.lower() == ".pdf"
text = ""
if ist_pdf:
pdf_result = pdf_processor.verarbeite(str(datei))
if pdf_result.get("fehler"):
raise Exception(pdf_result["fehler"])
text = pdf_result.get("text", "")
# ZUGFeRD-Behandlung basierend auf Einstellung
# Optionen: "separieren", "regel", "normal", "ignorieren"
zugferd_behandlung = getattr(qo, 'zugferd_behandlung', 'normal') or 'normal'
ist_zugferd = pdf_result.get("ist_zugferd", False)
if zugferd_behandlung == "separieren":
# NUR ZUGFeRD-PDFs verarbeiten
if not ist_zugferd:
# Keine ZUGFeRD-PDF -> überspringen
continue
# ZUGFeRD-PDF in separaten Ordner verschieben
zugferd_ziel = ziel_basis / "zugferd"
zugferd_ziel.mkdir(parents=True, exist_ok=True)
neuer_pfad = zugferd_ziel / datei.name
counter = 1
while neuer_pfad.exists():
neuer_pfad = zugferd_ziel / f"{datei.stem}_{counter}{datei.suffix}"
counter += 1
datei.rename(neuer_pfad)
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(neuer_pfad),
neuer_name=neuer_pfad.name,
ist_zugferd=True,
status="zugferd"
))
gesamt_sortiert += 1
ordner_sortiert += 1
continue
elif zugferd_behandlung == "ignorieren":
# ZUGFeRD-PDFs überspringen, nur normale verarbeiten
if ist_zugferd:
continue
# Weiter mit Regelprüfung für normale PDFs
# Bei "regel" oder "normal": Alle PDFs durch Regeln prüfen
# Direkt verschieben (ohne Regelprüfung)?
direkt_verschieben = getattr(qo, 'direkt_verschieben', False)
if direkt_verschieben:
# Datei direkt in Zielordner verschieben
print(f"[GROBSORTIERUNG] → Verschiebe direkt: {datei.name}", flush=True)
try:
ziel_basis.mkdir(parents=True, exist_ok=True)
except PermissionError as pe:
print(f"[GROBSORTIERUNG] ❌ Kann Zielordner nicht erstellen: {pe}", flush=True)
raise
neuer_pfad = ziel_basis / datei.name
counter = 1
while neuer_pfad.exists():
neuer_pfad = ziel_basis / f"{datei.stem}_{counter}{datei.suffix}"
counter += 1
try:
datei.rename(neuer_pfad)
print(f"[GROBSORTIERUNG] ✓ Verschoben nach: {neuer_pfad}", flush=True)
except PermissionError as pe:
print(f"[GROBSORTIERUNG] ❌ Keine Berechtigung zum Verschieben: {pe}", flush=True)
raise
except Exception as me:
print(f"[GROBSORTIERUNG] ❌ Fehler beim Verschieben: {me}", flush=True)
raise
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(neuer_pfad),
neuer_name=neuer_pfad.name,
status="direkt"
))
gesamt_sortiert += 1
ordner_sortiert += 1
continue
# Regel finden
doc_info = {"text": text, "original_name": datei.name, "absender": "", "dateityp": datei.suffix.lower()}
regel = sorter.finde_passende_regel(doc_info)
if not regel:
gesamt_ohne_regel += 1
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
status="keine_regel",
fehler="Keine passende Regel gefunden"
))
continue
# Felder extrahieren und verschieben
extrahiert = sorter.extrahiere_felder(regel, doc_info)
schema = regel.get("schema", "{datum} - Dokument.pdf")
if schema.endswith(".pdf"):
schema = schema[:-4] + datei.suffix
neuer_name = sorter.generiere_dateinamen({"schema": schema, **regel}, extrahiert)
ziel = ziel_basis
if regel.get("unterordner"):
ziel = ziel / regel["unterordner"]
ziel.mkdir(parents=True, exist_ok=True)
sorter.verschiebe_datei(str(datei), str(ziel), neuer_name)
gesamt_sortiert += 1
ordner_sortiert += 1
except Exception as e:
gesamt_fehler += 1
print(f"[GROBSORTIERUNG] ❌ FEHLER bei {datei.name}: {e}", flush=True)
logger.error(f"Fehler bei Datei {datei}: {e}")
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
status="fehler",
fehler=str(e)[:500]
))
# Ordner-Status aktualisieren (wie bei Postfächern)
qo.letzte_verarbeitung = datetime.utcnow()
qo.letzte_anzahl = ordner_sortiert
print(f"[GROBSORTIERUNG] ✓ {qo.name} abgeschlossen: {ordner_sortiert} Dateien verschoben", flush=True)
except Exception as e:
print(f"[GROBSORTIERUNG] ❌ FEHLER bei Ordner {qo.name}: {e}", flush=True)
fehler_meldungen.append(f"{qo.name}: {str(e)[:100]}")
db.commit()
meldung = f"{gesamt_sortiert} Dateien sortiert"
if gesamt_ohne_regel > 0:
meldung += f", {gesamt_ohne_regel} ohne passende Regel"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen)})"
print("", flush=True)
print(f"[GROBSORTIERUNG] === ENDE === {meldung}", flush=True)
print("", flush=True)
# Erfolg wenn keine echten Fehler (ohne_regel zählt nicht als Fehler)
return {"erfolg": gesamt_fehler == 0 and not fehler_meldungen, "meldung": meldung}
def execute_sortierregeln(db, zeitplan: Zeitplan) -> Dict:
"""Führt nur Sortierregeln aus (freie Ordner von Regeln)"""
from ..models.database import SortierRegel, VerarbeiteteDatei
from ..modules.pdf_processor import PDFProcessor
from pathlib import Path
# Regeln laden (optional spezifische Regel)
if zeitplan.regel_id:
regeln = db.query(SortierRegel).filter(
SortierRegel.id == zeitplan.regel_id,
SortierRegel.aktiv == True
).all()
else:
regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).all()
if not regeln:
return {"erfolg": True, "meldung": "Keine aktiven Regeln gefunden"}
pdf_processor = PDFProcessor()
gesamt_sortiert = 0
gesamt_fehler = 0
fehler_meldungen = []
for regel in regeln:
freie_ordner = regel.freie_ordner if regel.freie_ordner else []
if not freie_ordner:
continue
regel_dict = {
"id": regel.id,
"name": regel.name,
"prioritaet": regel.prioritaet,
"muster": regel.muster,
"extraktion": regel.extraktion,
"schema": regel.schema,
"unterordner": regel.unterordner,
"ziel_ordner": getattr(regel, 'ziel_ordner', None),
"nur_umbenennen": getattr(regel, 'nur_umbenennen', False)
}
regel_sorter = Sorter([regel_dict])
for freier_ordner_pfad in freie_ordner:
freier_pfad = Path(freier_ordner_pfad)
if not freier_pfad.exists() or not freier_pfad.is_dir():
continue
# Dateien sammeln
dateien = [f for f in freier_pfad.glob("**/*") if f.is_file() and f.suffix.lower() == ".pdf"]
for datei in dateien:
try:
ist_pdf = datei.suffix.lower() == ".pdf"
text = ""
ist_zugferd = False
if ist_pdf:
pdf_result = pdf_processor.verarbeite(str(datei))
if pdf_result.get("fehler"):
raise Exception(pdf_result["fehler"])
text = pdf_result.get("text", "")
ist_zugferd = pdf_result.get("ist_zugferd", False)
doc_info = {
"text": text,
"original_name": datei.name,
"absender": "",
"dateityp": datei.suffix.lower()
}
# Prüfe ob Regel passt
passend = regel_sorter.finde_passende_regel(doc_info)
if not passend:
continue
# Felder extrahieren
extrahiert = regel_sorter.extrahiere_felder(passend, doc_info)
# Dateiname generieren
schema = passend.get("schema", "{datum} - Dokument.pdf")
if schema.endswith(".pdf"):
schema = schema[:-4] + datei.suffix
neuer_name = regel_sorter.generiere_dateinamen(
{"schema": schema, **passend}, extrahiert
)
# Zielordner bestimmen
if passend.get("nur_umbenennen"):
# Nur umbenennen - Datei bleibt im aktuellen Ordner
ziel = datei.parent
elif passend.get("ziel_ordner"):
# Regel hat eigenen Zielordner
ziel = Path(passend["ziel_ordner"])
if passend.get("unterordner"):
ziel = ziel / passend["unterordner"]
else:
# Kein Zielordner - bleibt im freien Ordner
ziel = freier_pfad
if passend.get("unterordner"):
ziel = ziel / passend["unterordner"]
ziel.mkdir(parents=True, exist_ok=True)
# Verschieben/Umbenennen
regel_sorter.verschiebe_datei(str(datei), str(ziel), neuer_name)
gesamt_sortiert += 1
db.add(VerarbeiteteDatei(
original_pfad=str(datei),
original_name=datei.name,
neuer_pfad=str(ziel / neuer_name),
neuer_name=neuer_name,
ist_zugferd=ist_zugferd,
status="sortiert",
extrahierte_daten=extrahiert
))
except Exception as e:
gesamt_fehler += 1
logger.error(f"Fehler bei Datei {datei}: {e}")
db.commit()
meldung = f"{gesamt_sortiert} Dateien mit Regeln sortiert"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen)})"
return {"erfolg": gesamt_fehler == 0, "meldung": meldung}
def execute_db_backup(db, zeitplan: Zeitplan) -> Dict:
"""Führt Datenbank-Backup aus"""
from ..models.database import Datenbank
from .backup_service import erstelle_db_backup
print(f"[SCHEDULER] Starte DB-Backup: {zeitplan.name}", flush=True)
# Datenbanken laden (optional spezifische Datenbank)
if zeitplan.datenbank_id:
datenbanken = db.query(Datenbank).filter(
Datenbank.id == zeitplan.datenbank_id,
Datenbank.aktiv == True
).all()
else:
datenbanken = db.query(Datenbank).filter(Datenbank.aktiv == True).all()
if not datenbanken:
return {"erfolg": True, "meldung": "Keine aktiven Datenbanken gefunden"}
gesamt_erfolg = 0
gesamt_fehler = 0
fehler_meldungen = []
for datenbank in datenbanken:
try:
print(f"[SCHEDULER] Backup für: {datenbank.name}", flush=True)
result = erstelle_db_backup(datenbank.id, db)
gesamt_erfolg += 1
except Exception as e:
gesamt_fehler += 1
fehler_meldungen.append(f"{datenbank.name}: {str(e)}")
logger.error(f"Backup-Fehler für {datenbank.name}: {e}")
meldung = f"{gesamt_erfolg} Backups erstellt"
if gesamt_fehler > 0:
meldung += f", {gesamt_fehler} Fehler"
if fehler_meldungen:
meldung += f" ({'; '.join(fehler_meldungen[:3])})"
return {"erfolg": gesamt_fehler == 0, "meldung": meldung}
def get_scheduler_status() -> Dict:
"""Gibt den Status aller Zeitpläne zurück"""
global scheduler
db = SessionLocal()
try:
zeitplaene = db.query(Zeitplan).all()
result = []
for zp in zeitplaene:
job = scheduler.get_job(f"zeitplan_{zp.id}") if scheduler else None
result.append({
"id": zp.id,
"name": zp.name,
"typ": zp.typ,
"intervall": zp.intervall,
"aktiv": zp.aktiv,
"letzte_ausfuehrung": zp.letzte_ausfuehrung.isoformat() if zp.letzte_ausfuehrung else None,
"naechste_ausfuehrung": zp.naechste_ausfuehrung.isoformat() if zp.naechste_ausfuehrung else None,
"letzter_status": zp.letzter_status,
"letzte_meldung": zp.letzte_meldung,
"job_aktiv": job is not None
})
return {
"scheduler_laeuft": scheduler is not None and scheduler.running if scheduler else False,
"zeitplaene": result
}
finally:
db.close()
def trigger_zeitplan_manuell(zeitplan_id: int) -> Dict:
"""Löst einen Zeitplan manuell aus"""
db = SessionLocal()
try:
zeitplan = db.query(Zeitplan).filter(Zeitplan.id == zeitplan_id).first()
if not zeitplan:
return {"erfolg": False, "meldung": "Zeitplan nicht gefunden"}
# Synchron ausführen
execute_zeitplan(zeitplan_id)
return {"erfolg": True, "meldung": f"Zeitplan '{zeitplan.name}' wurde ausgeführt"}
finally:
db.close()