""" Sorter Modul Regel-basierte Erkennung und Benennung von Dokumenten """ import re from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Any import logging import shutil from .extraktoren import extrahiere_alle_felder, baue_dateiname logger = logging.getLogger(__name__) class Sorter: """Sortiert und benennt Dokumente basierend auf Regeln""" def __init__(self, regeln: List[Dict]): """ Args: regeln: Liste von Regel-Dicts, sortiert nach Priorität """ # Nach Priorität sortieren (niedrig = wichtig) self.regeln = sorted(regeln, key=lambda r: r.get("prioritaet", 100)) def finde_passende_regel(self, dokument_info: Dict) -> Optional[Dict]: """ Findet die erste passende Regel für ein Dokument Args: dokument_info: Dict mit text, original_name, absender, etc. Returns: Passende Regel oder None """ for regel in self.regeln: if not regel.get("aktiv", True): continue muster = regel.get("muster", {}) if self._pruefe_muster(muster, dokument_info): logger.info(f"Regel '{regel.get('name')}' matched für {dokument_info.get('original_name')}") return regel return None def _pruefe_muster(self, muster: Dict, dokument_info: Dict) -> bool: """Prüft ob alle Muster auf das Dokument zutreffen""" text = dokument_info.get("text", "").lower() original_name = dokument_info.get("original_name", "").lower() absender = dokument_info.get("absender", "").lower() dateityp = dokument_info.get("dateityp", "").lower() # z.B. ".pdf", ".jpg" # ========== TYP-BASIERTE REGELN (Stufe 1: Grob-Sortierung) ========== # dateityp_ist - Nur bestimmte Dateitypen (z.B. [".pdf", ".PDF"]) if "dateityp_ist" in muster: erlaubte = muster["dateityp_ist"] if isinstance(erlaubte, str): erlaubte = [erlaubte] erlaubte_lower = [t.lower() for t in erlaubte] if dateityp not in erlaubte_lower: return False # dateityp_nicht - Ausschluss bestimmter Dateitypen if "dateityp_nicht" in muster: verbotene = muster["dateityp_nicht"] if isinstance(verbotene, str): verbotene = [verbotene] verbotene_lower = [t.lower() for t in verbotene] if dateityp in verbotene_lower: return False # ist_zugferd - Nur ZUGFeRD/E-Rechnungen if "ist_zugferd" in muster: ist_zugferd = dokument_info.get("ist_zugferd", False) if muster["ist_zugferd"] and not ist_zugferd: return False if not muster["ist_zugferd"] and ist_zugferd: return False # ist_signiert - Nur signierte PDFs if "ist_signiert" in muster: ist_signiert = dokument_info.get("ist_signiert", False) if muster["ist_signiert"] and not ist_signiert: return False if not muster["ist_signiert"] and ist_signiert: return False # hat_text - Nur PDFs mit/ohne Text if "hat_text" in muster: hat_text = dokument_info.get("hat_text", False) if muster["hat_text"] and not hat_text: return False if not muster["hat_text"] and hat_text: return False # ist_bild - Prüft ob Datei ein Bild ist if "ist_bild" in muster: bild_typen = [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp"] ist_bild = dateityp in bild_typen if muster["ist_bild"] and not ist_bild: return False if not muster["ist_bild"] and ist_bild: return False # ist_pdf - Prüft ob Datei ein PDF ist if "ist_pdf" in muster: ist_pdf = dateityp == ".pdf" if muster["ist_pdf"] and not ist_pdf: return False if not muster["ist_pdf"] and ist_pdf: return False # ========== INHALT-BASIERTE REGELN (Stufe 2: Fein-Sortierung) ========== # keywords (einfache Komma-getrennte Liste - für UI) if "keywords" in muster: keywords = muster["keywords"] if isinstance(keywords, str): keywords = [k.strip() for k in keywords.split(",")] # Alle Keywords müssen vorkommen for keyword in keywords: keyword = keyword.lower().strip() if keyword and keyword not in text and keyword not in original_name: return False # absender_contains if "absender_contains" in muster: if muster["absender_contains"].lower() not in absender: return False # dateiname_match if "dateiname_match" in muster: pattern = muster["dateiname_match"] if isinstance(pattern, str): if pattern.lower() not in original_name: return False elif isinstance(pattern, list): if not any(p.lower() in original_name for p in pattern): return False # text_match (alle müssen enthalten sein) if "text_match" in muster: patterns = muster["text_match"] if isinstance(patterns, str): patterns = [patterns] for pattern in patterns: if pattern.lower() not in text: return False # text_match_any (mindestens einer muss enthalten sein) if "text_match_any" in muster: patterns = muster["text_match_any"] if isinstance(patterns, str): patterns = [patterns] if not any(p.lower() in text for p in patterns): return False # text_regex if "text_regex" in muster: pattern = muster["text_regex"] if not re.search(pattern, text, re.IGNORECASE): return False return True def extrahiere_felder(self, regel: Dict, dokument_info: Dict) -> Dict[str, Any]: """ Extrahiert Felder aus dem Dokument - nutzt globale Extraktoren mit Fallbacks Returns: Dict mit extrahierten Werten """ text = dokument_info.get("text", "") regel_extraktion = regel.get("extraktion", {}) # Globale Extraktoren nutzen (mit Regel-spezifischen Überschreibungen) felder = extrahiere_alle_felder(text, dokument_info, regel_extraktion) # Regel-spezifische statische Werte überschreiben for feld_name, feld_config in regel_extraktion.items(): if isinstance(feld_config, dict): if "wert" in feld_config: felder[feld_name] = feld_config["wert"] elif "regex" in feld_config: # Einzelnes Regex aus Regel wert = self._extrahiere_mit_regex(feld_config, text) if wert: felder[feld_name] = wert elif isinstance(feld_config, str): # Direkter statischer Wert felder[feld_name] = feld_config return felder def _extrahiere_mit_regex(self, config: Dict, text: str) -> Optional[str]: """Extrahiert ein Feld mit einem einzelnen Regex""" try: match = re.search(config["regex"], text, re.IGNORECASE | re.MULTILINE) if match: wert = match.group(1) if match.groups() else match.group(0) # Datum formatieren if "format" in config: try: datum = datetime.strptime(wert.strip(), config["format"]) return datum.strftime("%Y-%m-%d") except: pass # Betrag formatieren if config.get("typ") == "betrag": wert = self._formatiere_betrag(wert) return wert.strip() except Exception as e: logger.debug(f"Regex-Extraktion fehlgeschlagen: {e}") return None def _formatiere_betrag(self, betrag: str) -> str: """Formatiert Betrag einheitlich""" betrag = betrag.replace(" ", "").replace(".", "").replace(",", ".") try: wert = float(betrag) if wert == int(wert): return str(int(wert)) return f"{wert:.2f}".replace(".", ",") except: return betrag def generiere_dateinamen(self, regel: Dict, extrahierte_felder: Dict) -> str: """ Generiert den neuen Dateinamen basierend auf Schema Nutzt den intelligenten Schema-Builder der fehlende Felder entfernt """ schema = regel.get("schema", "{datum} - Dokument.pdf") return baue_dateiname(schema, extrahierte_felder, ".pdf") def verschiebe_datei(self, quell_pfad: str, ziel_ordner: str, neuer_name: str) -> str: """ Verschiebt und benennt Datei um Returns: Neuer Pfad der Datei """ ziel_dir = Path(ziel_ordner) ziel_dir.mkdir(parents=True, exist_ok=True) ziel_pfad = ziel_dir / neuer_name # Eindeutigen Namen sicherstellen counter = 1 original_name = ziel_pfad.stem suffix = ziel_pfad.suffix while ziel_pfad.exists(): ziel_pfad = ziel_dir / f"{original_name} ({counter}){suffix}" counter += 1 # Verschieben shutil.move(quell_pfad, ziel_pfad) logger.info(f"Verschoben: {quell_pfad} -> {ziel_pfad}") return str(ziel_pfad) # ============ STANDARD-DOKUMENTTYPEN ============ # Diese werden für das einfache UI verwendet DOKUMENTTYPEN = { "rechnung": { "name": "Rechnung", "keywords": ["rechnung", "invoice"], "schema": "{datum} - Rechnung - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "rechnungen" }, "angebot": { "name": "Angebot", "keywords": ["angebot", "quotation", "offerte"], "schema": "{datum} - Angebot - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "angebote" }, "gutschrift": { "name": "Gutschrift", "keywords": ["gutschrift", "credit"], "schema": "{datum} - Gutschrift - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "gutschriften" }, "lieferschein": { "name": "Lieferschein", "keywords": ["lieferschein", "delivery"], "schema": "{datum} - Lieferschein - {firma} - {nummer}.pdf", "unterordner": "lieferscheine" }, "auftragsbestaetigung": { "name": "Auftragsbestätigung", "keywords": ["auftragsbestätigung", "bestellbestätigung"], "schema": "{datum} - Auftragsbestätigung - {firma} - {nummer}.pdf", "unterordner": "auftraege" }, "vertrag": { "name": "Vertrag", "keywords": ["vertrag", "contract"], "schema": "Vertrag - {firma} - {nummer} - {datum}.pdf", "unterordner": "vertraege" }, "versicherung": { "name": "Versicherung", "keywords": ["versicherung", "police", "beitrag"], "schema": "Versicherung - {firma} - {nummer} - {datum}.pdf", "unterordner": "versicherungen" }, "zeugnis": { "name": "Zeugnis", "keywords": ["zeugnis", "zertifikat"], "schema": "Zeugnis - {firma} - {nummer} - {datum}.pdf", "unterordner": "zeugnisse" }, "bescheinigung": { "name": "Bescheinigung", "keywords": ["bescheinigung", "nachweis", "bestätigung"], "schema": "Bescheinigung - {firma} - {nummer} - {datum}.pdf", "unterordner": "bescheinigungen" }, "kontoauszug": { "name": "Kontoauszug", "keywords": ["kontoauszug", "account statement"], "schema": "{datum} - Kontoauszug - {firma} - {nummer}.pdf", "unterordner": "kontoauszuege" }, "sonstiges": { "name": "Sonstiges", "keywords": [], "schema": "{datum} - {typ} - {firma}.pdf", "unterordner": "sonstiges" } } def erstelle_einfache_regel(name: str, dokumenttyp: str, keywords: str, firma_wert: str = None, unterordner: str = None, prioritaet: int = 50) -> Dict: """ Erstellt eine einfache Regel basierend auf Dokumenttyp Args: name: Name der Regel (z.B. "Sonepar Rechnung") dokumenttyp: Typ aus DOKUMENTTYPEN keywords: Komma-getrennte Keywords zur Erkennung firma_wert: Optionaler fester Firmenwert unterordner: Optionaler Unterordner (überschreibt Standard) prioritaet: Priorität (niedriger = wichtiger) Returns: Regel-Dict für die Datenbank """ typ_config = DOKUMENTTYPEN.get(dokumenttyp, DOKUMENTTYPEN["sonstiges"]) regel = { "name": name, "prioritaet": prioritaet, "aktiv": True, "muster": { "keywords": keywords }, "extraktion": {}, "schema": typ_config["schema"], "unterordner": unterordner or typ_config["unterordner"] } # Feste Firma wenn angegeben if firma_wert: regel["extraktion"]["firma"] = {"wert": firma_wert} return regel def liste_dokumenttypen() -> List[Dict]: """Gibt Liste aller Dokumenttypen für UI zurück""" return [ {"id": key, "name": config["name"], "schema": config["schema"]} for key, config in DOKUMENTTYPEN.items() ] # ============ TYP-BASIERTE STANDARD-REGELN ============ # Diese Regeln sortieren nach Dateityp/Eigenschaften (Stufe 1: Grob-Sortierung) TYP_REGELN = { "zugferd": { "name": "E-Rechnungen (ZUGFeRD/XRechnung)", "beschreibung": "Elektronische Rechnungen mit maschinenlesbaren XML-Daten", "prioritaet": 5, # Sehr hohe Priorität - vor anderen PDF-Regeln "muster": { "ist_pdf": True, "ist_zugferd": True }, "schema": "{original}", # Originalname behalten "unterordner": "e-rechnungen", "ist_fallback": False }, "signierte_pdfs": { "name": "Signierte PDFs", "beschreibung": "Digital signierte PDF-Dokumente (Verträge, Bescheide)", "prioritaet": 10, "muster": { "ist_pdf": True, "ist_signiert": True }, "schema": "{original}", "unterordner": "signierte_dokumente", "ist_fallback": False }, "bilder": { "name": "Bilder", "beschreibung": "Alle Bilddateien (JPG, PNG, TIFF, etc.)", "prioritaet": 20, "muster": { "ist_bild": True }, "schema": "{original}", "unterordner": "bilder", "ist_fallback": False }, "pdfs_ohne_text": { "name": "Gescannte PDFs (ohne OCR)", "beschreibung": "PDFs ohne durchsuchbaren Text (Scans)", "prioritaet": 30, "muster": { "ist_pdf": True, "hat_text": False }, "schema": "{original}", "unterordner": "scans", "ist_fallback": False }, "alle_pdfs": { "name": "Alle PDFs (Fallback)", "beschreibung": "Alle PDF-Dokumente die keiner anderen Regel entsprechen", "prioritaet": 900, # Sehr niedrige Priorität - als Fallback "muster": { "ist_pdf": True }, "schema": "{original}", "unterordner": "dokumente", "ist_fallback": True }, "alle_bilder_fallback": { "name": "Alle Bilder (Fallback)", "beschreibung": "Alle Bilddateien die keiner anderen Regel entsprechen", "prioritaet": 910, "muster": { "ist_bild": True }, "schema": "{original}", "unterordner": "bilder", "ist_fallback": True }, "alle_dateien_fallback": { "name": "Alle anderen Dateien (Fallback)", "beschreibung": "Alle Dateien die keiner Regel entsprechen - letzte Auffang-Regel", "prioritaet": 999, # Absolut letzte Regel "muster": {}, # Leeres Muster = passt auf alles "schema": "{original}", "unterordner": "sonstiges", "ist_fallback": True } } def liste_typ_regeln(nur_fallback: bool = None) -> List[Dict]: """ Gibt Liste aller Typ-basierten Regeln für UI zurück Args: nur_fallback: None = alle, True = nur Fallbacks, False = keine Fallbacks """ result = [] for key, config in TYP_REGELN.items(): ist_fallback = config.get("ist_fallback", False) # Filtern nach Fallback-Status if nur_fallback is True and not ist_fallback: continue if nur_fallback is False and ist_fallback: continue result.append({ "id": key, "name": config["name"], "beschreibung": config["beschreibung"], "prioritaet": config["prioritaet"], "muster": config["muster"], "unterordner": config["unterordner"], "ist_fallback": ist_fallback }) # Nach Priorität sortieren return sorted(result, key=lambda x: x["prioritaet"]) def erstelle_typ_regel(typ_id: str, unterordner: str = None, prioritaet: int = None) -> Dict: """ Erstellt eine Typ-basierte Regel Args: typ_id: ID aus TYP_REGELN (z.B. "zugferd", "bilder") unterordner: Optionaler Unterordner (überschreibt Standard) prioritaet: Optionale Priorität (überschreibt Standard) Returns: Regel-Dict für die Datenbank """ if typ_id not in TYP_REGELN: raise ValueError(f"Unbekannter Typ: {typ_id}") typ_config = TYP_REGELN[typ_id] return { "name": typ_config["name"], "prioritaet": prioritaet or typ_config["prioritaet"], "aktiv": True, "muster": typ_config["muster"].copy(), "extraktion": {}, "schema": typ_config["schema"], "unterordner": unterordner or typ_config["unterordner"] }