360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""
|
|
Pipeline Service
|
|
Orchestriert die gesamte Dokumentenverarbeitung
|
|
"""
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
import logging
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..models import Pipeline, MailConfig, SortierRegel, Dokument, VerarbeitungsLog
|
|
from ..modules.mail_fetcher import MailFetcher
|
|
from ..modules.pdf_processor import PDFProcessor
|
|
from ..modules.sorter import Sorter
|
|
from ..config import INBOX_DIR, PROCESSED_DIR, ZUGFERD_DIR
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineService:
|
|
"""Führt die komplette Pipeline-Verarbeitung durch"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.pdf_processor = PDFProcessor()
|
|
|
|
def verarbeite_pipeline(self, pipeline_id: int) -> Dict:
|
|
"""
|
|
Führt alle Schritte einer Pipeline aus
|
|
|
|
Returns:
|
|
Dict mit Statistiken und Ergebnissen
|
|
"""
|
|
pipeline = self.db.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
|
|
if not pipeline:
|
|
return {"fehler": f"Pipeline {pipeline_id} nicht gefunden"}
|
|
|
|
if not pipeline.aktiv:
|
|
return {"fehler": f"Pipeline {pipeline.name} ist deaktiviert"}
|
|
|
|
ergebnis = {
|
|
"pipeline": pipeline.name,
|
|
"gestartet": datetime.now().isoformat(),
|
|
"mails_abgerufen": 0,
|
|
"attachments": 0,
|
|
"verarbeitet": 0,
|
|
"zugferd": 0,
|
|
"ocr": 0,
|
|
"sortiert": 0,
|
|
"fehler": []
|
|
}
|
|
|
|
# 1. Mails abrufen
|
|
inbox_pfad = INBOX_DIR / f"pipeline_{pipeline_id}"
|
|
inbox_pfad.mkdir(parents=True, exist_ok=True)
|
|
|
|
for mail_config in pipeline.mail_configs:
|
|
if not mail_config.aktiv:
|
|
continue
|
|
|
|
try:
|
|
attachments = self._rufe_mails_ab(mail_config, inbox_pfad)
|
|
ergebnis["attachments"] += len(attachments)
|
|
|
|
# Dokumente in DB anlegen
|
|
for att in attachments:
|
|
dokument = Dokument(
|
|
pipeline_id=pipeline_id,
|
|
original_name=att["original_name"],
|
|
original_pfad=att["pfad"],
|
|
status="neu",
|
|
extrahierte_daten={
|
|
"absender": att.get("absender"),
|
|
"betreff": att.get("betreff"),
|
|
"mail_datum": att.get("datum")
|
|
}
|
|
)
|
|
self.db.add(dokument)
|
|
self._log(dokument.id, "mail_abruf", "erfolg", att)
|
|
|
|
# Letzten Abruf aktualisieren
|
|
mail_config.letzter_abruf = datetime.utcnow()
|
|
|
|
except Exception as e:
|
|
ergebnis["fehler"].append(f"Mail-Abruf {mail_config.name}: {e}")
|
|
logger.error(f"Mail-Abruf Fehler: {e}")
|
|
|
|
self.db.commit()
|
|
|
|
# 2. PDFs verarbeiten
|
|
neue_dokumente = self.db.query(Dokument).filter(
|
|
Dokument.pipeline_id == pipeline_id,
|
|
Dokument.status == "neu"
|
|
).all()
|
|
|
|
for dokument in neue_dokumente:
|
|
try:
|
|
self._verarbeite_dokument(dokument, pipeline)
|
|
ergebnis["verarbeitet"] += 1
|
|
|
|
if dokument.ist_zugferd:
|
|
ergebnis["zugferd"] += 1
|
|
if dokument.ocr_durchgefuehrt:
|
|
ergebnis["ocr"] += 1
|
|
if dokument.status == "sortiert":
|
|
ergebnis["sortiert"] += 1
|
|
|
|
except Exception as e:
|
|
dokument.status = "fehler"
|
|
dokument.fehler_meldung = str(e)
|
|
ergebnis["fehler"].append(f"Verarbeitung {dokument.original_name}: {e}")
|
|
logger.error(f"Verarbeitungs-Fehler: {e}")
|
|
|
|
self.db.commit()
|
|
|
|
ergebnis["beendet"] = datetime.now().isoformat()
|
|
return ergebnis
|
|
|
|
def _rufe_mails_ab(self, mail_config: MailConfig, ziel_ordner: Path) -> List[Dict]:
|
|
"""Ruft Mails von einem Postfach ab"""
|
|
config = {
|
|
"imap_server": mail_config.imap_server,
|
|
"imap_port": mail_config.imap_port,
|
|
"email": mail_config.email,
|
|
"passwort": mail_config.passwort,
|
|
"ordner": mail_config.ordner,
|
|
"erlaubte_typen": mail_config.erlaubte_typen,
|
|
"max_groesse_mb": mail_config.max_groesse_mb
|
|
}
|
|
|
|
fetcher = MailFetcher(config)
|
|
try:
|
|
attachments = fetcher.fetch_attachments(ziel_ordner)
|
|
return attachments
|
|
finally:
|
|
fetcher.disconnect()
|
|
|
|
def _verarbeite_dokument(self, dokument: Dokument, pipeline: Pipeline):
|
|
"""Verarbeitet ein einzelnes Dokument"""
|
|
pfad = Path(dokument.original_pfad)
|
|
|
|
if not pfad.exists():
|
|
raise FileNotFoundError(f"Datei nicht gefunden: {pfad}")
|
|
|
|
# Nur PDFs verarbeiten
|
|
if pfad.suffix.lower() != ".pdf":
|
|
dokument.status = "uebersprungen"
|
|
self._log(dokument.id, "verarbeitung", "uebersprungen", {"grund": "Kein PDF"})
|
|
return
|
|
|
|
# PDF verarbeiten
|
|
pdf_ergebnis = self.pdf_processor.verarbeite(str(pfad))
|
|
|
|
if "fehler" in pdf_ergebnis:
|
|
raise Exception(pdf_ergebnis["fehler"])
|
|
|
|
dokument.ist_zugferd = pdf_ergebnis["ist_zugferd"]
|
|
dokument.hat_text = pdf_ergebnis["hat_text"]
|
|
dokument.ocr_durchgefuehrt = pdf_ergebnis["ocr_durchgefuehrt"]
|
|
|
|
# ZUGFeRD separat behandeln - NICHT umbenennen!
|
|
if dokument.ist_zugferd:
|
|
self._behandle_zugferd(dokument, pdf_ergebnis)
|
|
return
|
|
|
|
# Sortieren
|
|
self._sortiere_dokument(dokument, pdf_ergebnis, pipeline)
|
|
|
|
def _behandle_zugferd(self, dokument: Dokument, pdf_ergebnis: Dict):
|
|
"""Behandelt ZUGFeRD-Rechnungen (werden nicht verändert)"""
|
|
# In separaten Ordner verschieben
|
|
ziel_dir = ZUGFERD_DIR / datetime.now().strftime("%Y-%m")
|
|
ziel_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
quell_pfad = Path(dokument.original_pfad)
|
|
ziel_pfad = ziel_dir / quell_pfad.name
|
|
|
|
# Eindeutigen Namen sicherstellen
|
|
counter = 1
|
|
while ziel_pfad.exists():
|
|
ziel_pfad = ziel_dir / f"{quell_pfad.stem}_{counter}{quell_pfad.suffix}"
|
|
counter += 1
|
|
|
|
# Verschieben (nicht umbenennen!)
|
|
import shutil
|
|
shutil.move(str(quell_pfad), str(ziel_pfad))
|
|
|
|
dokument.neuer_pfad = str(ziel_pfad)
|
|
dokument.neuer_name = ziel_pfad.name
|
|
dokument.status = "zugferd"
|
|
dokument.extrahierte_daten = {
|
|
**(dokument.extrahierte_daten or {}),
|
|
"zugferd_xml": pdf_ergebnis.get("zugferd_xml")
|
|
}
|
|
|
|
self._log(dokument.id, "zugferd", "erfolg", {
|
|
"ziel": str(ziel_pfad),
|
|
"hinweis": "ZUGFeRD wird nicht umbenannt"
|
|
})
|
|
|
|
def _sortiere_dokument(self, dokument: Dokument, pdf_ergebnis: Dict, pipeline: Pipeline):
|
|
"""Sortiert Dokument nach Regeln"""
|
|
# Regeln laden
|
|
regeln = self.db.query(SortierRegel).filter(
|
|
SortierRegel.pipeline_id == pipeline.id,
|
|
SortierRegel.aktiv == True
|
|
).order_by(SortierRegel.prioritaet).all()
|
|
|
|
# In Sorter-Format konvertieren
|
|
regeln_dicts = []
|
|
for r in regeln:
|
|
regeln_dicts.append({
|
|
"id": r.id,
|
|
"name": r.name,
|
|
"prioritaet": r.prioritaet,
|
|
"aktiv": r.aktiv,
|
|
"muster": r.muster,
|
|
"extraktion": r.extraktion,
|
|
"schema": r.schema,
|
|
"ziel_ordner": r.ziel_ordner
|
|
})
|
|
|
|
sorter = Sorter(regeln_dicts)
|
|
|
|
# Dokument-Info zusammenstellen
|
|
dokument_info = {
|
|
"text": pdf_ergebnis.get("text", ""),
|
|
"original_name": dokument.original_name,
|
|
"absender": (dokument.extrahierte_daten or {}).get("absender", ""),
|
|
"betreff": (dokument.extrahierte_daten or {}).get("betreff", "")
|
|
}
|
|
|
|
# Passende Regel finden
|
|
regel = sorter.finde_passende_regel(dokument_info)
|
|
|
|
if not regel:
|
|
dokument.status = "keine_regel"
|
|
self._log(dokument.id, "sortierung", "keine_regel", {})
|
|
return
|
|
|
|
# Felder extrahieren
|
|
extrahiert = sorter.extrahiere_felder(regel, dokument_info)
|
|
|
|
# Dateinamen generieren
|
|
neuer_name = sorter.generiere_dateinamen(regel, extrahiert)
|
|
|
|
# Zielordner bestimmen
|
|
ziel_ordner = regel.get("ziel_ordner") or str(PROCESSED_DIR / pipeline.name)
|
|
|
|
# Verschieben
|
|
neuer_pfad = sorter.verschiebe_datei(
|
|
dokument.original_pfad,
|
|
ziel_ordner,
|
|
neuer_name
|
|
)
|
|
|
|
# Dokument aktualisieren
|
|
dokument.neuer_name = neuer_name
|
|
dokument.neuer_pfad = neuer_pfad
|
|
dokument.extrahierte_daten = {
|
|
**(dokument.extrahierte_daten or {}),
|
|
"text_auszug": pdf_ergebnis.get("text", "")[:500],
|
|
**extrahiert
|
|
}
|
|
dokument.regel_id = regel.get("id")
|
|
dokument.status = "sortiert"
|
|
dokument.verarbeitet = datetime.utcnow()
|
|
|
|
self._log(dokument.id, "sortierung", "erfolg", {
|
|
"regel": regel.get("name"),
|
|
"neuer_name": neuer_name,
|
|
"ziel": neuer_pfad
|
|
})
|
|
|
|
def _log(self, dokument_id: int, schritt: str, status: str, details: Dict):
|
|
"""Erstellt Log-Eintrag"""
|
|
log = VerarbeitungsLog(
|
|
dokument_id=dokument_id,
|
|
schritt=schritt,
|
|
status=status,
|
|
details=details
|
|
)
|
|
self.db.add(log)
|
|
|
|
|
|
class PipelineManager:
|
|
"""Verwaltet Pipelines (CRUD-Operationen)"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def erstelle_pipeline(self, name: str, beschreibung: str = "") -> Pipeline:
|
|
"""Erstellt neue Pipeline"""
|
|
pipeline = Pipeline(name=name, beschreibung=beschreibung)
|
|
self.db.add(pipeline)
|
|
self.db.commit()
|
|
self.db.refresh(pipeline)
|
|
return pipeline
|
|
|
|
def hole_alle_pipelines(self) -> List[Pipeline]:
|
|
"""Gibt alle Pipelines zurück"""
|
|
return self.db.query(Pipeline).all()
|
|
|
|
def hole_pipeline(self, pipeline_id: int) -> Optional[Pipeline]:
|
|
"""Gibt eine Pipeline zurück"""
|
|
return self.db.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
|
|
|
|
def fuege_mail_config_hinzu(self, pipeline_id: int, config: Dict) -> MailConfig:
|
|
"""Fügt Mail-Konfiguration zu Pipeline hinzu"""
|
|
mail_config = MailConfig(
|
|
pipeline_id=pipeline_id,
|
|
name=config.get("name", "Unbenannt"),
|
|
imap_server=config["imap_server"],
|
|
imap_port=config.get("imap_port", 993),
|
|
email=config["email"],
|
|
passwort=config["passwort"],
|
|
ordner=config.get("ordner", "INBOX"),
|
|
erlaubte_typen=config.get("erlaubte_typen", [".pdf"]),
|
|
max_groesse_mb=config.get("max_groesse_mb", 25)
|
|
)
|
|
self.db.add(mail_config)
|
|
self.db.commit()
|
|
self.db.refresh(mail_config)
|
|
return mail_config
|
|
|
|
def fuege_regel_hinzu(self, pipeline_id: int, regel: Dict) -> SortierRegel:
|
|
"""Fügt Sortier-Regel zu Pipeline hinzu"""
|
|
sortier_regel = SortierRegel(
|
|
pipeline_id=pipeline_id,
|
|
name=regel["name"],
|
|
prioritaet=regel.get("prioritaet", 100),
|
|
muster=regel.get("muster", {}),
|
|
extraktion=regel.get("extraktion", {}),
|
|
schema=regel.get("schema", "{datum} - Dokument.pdf"),
|
|
ziel_ordner=regel.get("ziel_ordner")
|
|
)
|
|
self.db.add(sortier_regel)
|
|
self.db.commit()
|
|
self.db.refresh(sortier_regel)
|
|
return sortier_regel
|
|
|
|
def teste_regel(self, regel: Dict, text: str) -> Dict:
|
|
"""Testet eine Regel gegen einen Text"""
|
|
sorter = Sorter([regel])
|
|
dokument_info = {"text": text, "original_name": "test.pdf", "absender": ""}
|
|
|
|
passend = sorter.finde_passende_regel(dokument_info) is not None
|
|
|
|
extrahiert = {}
|
|
dateiname = ""
|
|
if passend:
|
|
extrahiert = sorter.extrahiere_felder(regel, dokument_info)
|
|
dateiname = sorter.generiere_dateinamen(regel, extrahiert)
|
|
|
|
return {
|
|
"regel_passt": passend,
|
|
"extrahierte_felder": extrahiert,
|
|
"vorgeschlagener_name": dateiname
|
|
}
|