""" Sorter Modul Regel-basierte Erkennung und Benennung von Dokumenten """ import re from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Any import logging import shutil from .extraktoren import extrahiere_alle_felder, baue_dateiname logger = logging.getLogger(__name__) class Sorter: """Sortiert und benennt Dokumente basierend auf Regeln""" def __init__(self, regeln: List[Dict]): """ Args: regeln: Liste von Regel-Dicts, sortiert nach Priorität """ # Nach Priorität sortieren (niedrig = wichtig) self.regeln = sorted(regeln, key=lambda r: r.get("prioritaet", 100)) def finde_passende_regel(self, dokument_info: Dict) -> Optional[Dict]: """ Findet die erste passende Regel für ein Dokument Args: dokument_info: Dict mit text, original_name, absender, etc. Returns: Passende Regel oder None """ for regel in self.regeln: if not regel.get("aktiv", True): continue muster = regel.get("muster", {}) if self._pruefe_muster(muster, dokument_info): logger.info(f"Regel '{regel.get('name')}' matched für {dokument_info.get('original_name')}") return regel return None def _pruefe_muster(self, muster: Dict, dokument_info: Dict) -> bool: """Prüft ob alle Muster auf das Dokument zutreffen""" text = dokument_info.get("text", "").lower() original_name = dokument_info.get("original_name", "").lower() absender = dokument_info.get("absender", "").lower() # Option: Auch Dateinamen prüfen (Standard: nur Text) auch_dateiname = muster.get("auch_dateiname", False) # keywords (einfache Komma-getrennte Liste - für UI) # Verwendet Wortgrenzen (\b) um Teilstring-Matches zu vermeiden # z.B. "rechnung" matched nicht "Versicherungsnummer" oder "Berechnung" if "keywords" in muster: keywords = muster["keywords"] if isinstance(keywords, str): keywords = [k.strip() for k in keywords.split(",")] # Alle Keywords müssen vorkommen (als ganzes Wort) for keyword in keywords: keyword = keyword.lower().strip() if keyword: # Regex mit Wortgrenzen für exaktes Wort-Matching pattern = r'\b' + re.escape(keyword) + r'\b' # Standard: nur im Text suchen, mit Option auch im Dateinamen if auch_dateiname: if not re.search(pattern, text) and not re.search(pattern, original_name): return False else: if not re.search(pattern, text): return False # absender_contains if "absender_contains" in muster: if muster["absender_contains"].lower() not in absender: return False # dateiname_match if "dateiname_match" in muster: pattern = muster["dateiname_match"] if isinstance(pattern, str): if pattern.lower() not in original_name: return False elif isinstance(pattern, list): if not any(p.lower() in original_name for p in pattern): return False # text_match (alle müssen enthalten sein) if "text_match" in muster: patterns = muster["text_match"] if isinstance(patterns, str): patterns = [patterns] # Nur prüfen wenn Liste nicht leer for pattern in patterns: if pattern and pattern.lower() not in text: return False # text_match_any (mindestens einer muss enthalten sein) if "text_match_any" in muster: patterns = muster["text_match_any"] if isinstance(patterns, str): patterns = [patterns] # Nur prüfen wenn Liste nicht leer if patterns and not any(p.lower() in text for p in patterns if p): return False # text_regex if "text_regex" in muster: pattern = muster["text_regex"] if not re.search(pattern, text, re.IGNORECASE): return False # ============ NEGATIVE MUSTER (dürfen NICHT vorkommen) ============ # keywords_nicht (keines darf vorkommen - als ganzes Wort) if "keywords_nicht" in muster: keywords = muster["keywords_nicht"] if isinstance(keywords, str): keywords = [k.strip() for k in keywords.split(",")] for keyword in keywords: keyword = keyword.lower().strip() if keyword: # Regex mit Wortgrenzen für exaktes Wort-Matching pattern = r'\b' + re.escape(keyword) + r'\b' # Standard: nur im Text prüfen, mit Option auch im Dateinamen if auch_dateiname: if re.search(pattern, text) or re.search(pattern, original_name): return False # Verbotenes Keyword gefunden else: if re.search(pattern, text): return False # Verbotenes Keyword gefunden # text_not_match (keines darf enthalten sein) if "text_not_match" in muster: patterns = muster["text_not_match"] if isinstance(patterns, str): patterns = [patterns] for pattern in patterns: if pattern and pattern.lower() in text: return False # Verbotenes Pattern gefunden # text_not_regex (Regex darf nicht matchen) if "text_not_regex" in muster: pattern = muster["text_not_regex"] if re.search(pattern, text, re.IGNORECASE): return False # Verbotenes Regex gefunden return True def extrahiere_felder(self, regel: Dict, dokument_info: Dict) -> Dict[str, Any]: """ Extrahiert Felder aus dem Dokument - nutzt globale Extraktoren mit Fallbacks Returns: Dict mit extrahierten Werten """ text = dokument_info.get("text", "") regel_extraktion = regel.get("extraktion", {}) # Globale Extraktoren nutzen (mit Regel-spezifischen Überschreibungen) felder = extrahiere_alle_felder(text, dokument_info, regel_extraktion) # Regel-spezifische statische Werte überschreiben for feld_name, feld_config in regel_extraktion.items(): if isinstance(feld_config, dict): if "wert" in feld_config: felder[feld_name] = feld_config["wert"] elif "regex" in feld_config: # Einzelnes Regex aus Regel wert = self._extrahiere_mit_regex(feld_config, text) if wert: felder[feld_name] = wert elif isinstance(feld_config, str): # Direkter statischer Wert felder[feld_name] = feld_config return felder def _extrahiere_mit_regex(self, config: Dict, text: str) -> Optional[str]: """ Extrahiert ein Feld mit Regex - unterstützt einzelne oder mehrere Alternativen Unterstützt "auswahl" Option für mehrere Treffer: - "first": Erster Treffer (Standard) - "last": Letzter Treffer - "max": Größter numerischer Wert - "min": Kleinster numerischer Wert """ regex_pattern = config.get("regex") if not regex_pattern: return None # Mehrere Regex-Alternativen (Array) patterns = regex_pattern if isinstance(regex_pattern, list) else [regex_pattern] # Auswahl-Modus (max, min, first, last) auswahl = config.get("auswahl", "first") for pattern in patterns: try: # Bei max/min/last: Alle Treffer finden if auswahl in ("max", "min", "last"): alle_matches = list(re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE)) if not alle_matches: continue # Werte extrahieren werte = [] for match in alle_matches: wert = match.group(1) if match.groups() else match.group(0) werte.append(wert.strip()) if not werte: continue # Auswahl treffen if auswahl == "last": wert = werte[-1] elif auswahl in ("max", "min"): # Versuche numerische Auswahl wert = self._waehle_numerisch(werte, auswahl) else: wert = werte[0] else: # Standard: Erster Treffer (first) match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) if not match: continue wert = match.group(1) if match.groups() else match.group(0) wert = wert.strip() # Datum formatieren if "format" in config: try: datum_str = wert.strip() # Deutsche Monatsnamen zu englischen konvertieren de_monate = { 'Januar': 'January', 'Februar': 'February', 'März': 'March', 'April': 'April', 'Mai': 'May', 'Juni': 'June', 'Juli': 'July', 'August': 'August', 'September': 'September', 'Oktober': 'October', 'November': 'November', 'Dezember': 'December' } for de, en in de_monate.items(): datum_str = datum_str.replace(de, en) datum = datetime.strptime(datum_str, config["format"]) return datum.strftime("%Y-%m-%d") except: pass # Betrag formatieren if config.get("typ") == "betrag": wert = self._formatiere_betrag(wert) return wert except Exception as e: logger.debug(f"Regex-Extraktion fehlgeschlagen für '{pattern}': {e}") return None def _waehle_numerisch(self, werte: List[str], modus: str) -> str: """Wählt max oder min aus einer Liste von Werten (versucht numerisch zu parsen)""" # Versuche alle Werte als Zahlen zu parsen numerische_werte = [] for wert in werte: try: # Deutsches Format: 1.234,56 -> 1234.56 clean = wert.replace(" ", "").replace(".", "").replace(",", ".") zahl = float(clean) numerische_werte.append((zahl, wert)) except ValueError: # Wenn nicht parsebar, ignorieren pass if not numerische_werte: # Fallback: Erster Wert wenn keine Zahlen gefunden return werte[0] if modus == "max": return max(numerische_werte, key=lambda x: x[0])[1] else: # min return min(numerische_werte, key=lambda x: x[0])[1] def _formatiere_betrag(self, betrag: str) -> str: """Formatiert Betrag einheitlich""" betrag = betrag.replace(" ", "").replace(".", "").replace(",", ".") try: wert = float(betrag) if wert == int(wert): return str(int(wert)) return f"{wert:.2f}".replace(".", ",") except: return betrag def generiere_dateinamen(self, regel: Dict, extrahierte_felder: Dict) -> str: """ Generiert den neuen Dateinamen basierend auf Schema Nutzt den intelligenten Schema-Builder der fehlende Felder entfernt """ schema = regel.get("schema", "{datum} - Dokument.pdf") return baue_dateiname(schema, extrahierte_felder, ".pdf") def verschiebe_datei(self, quell_pfad: str, ziel_ordner: str, neuer_name: str) -> str: """ Verschiebt und benennt Datei um Returns: Neuer Pfad der Datei """ ziel_dir = Path(ziel_ordner) ziel_dir.mkdir(parents=True, exist_ok=True) ziel_pfad = ziel_dir / neuer_name # Eindeutigen Namen sicherstellen counter = 1 original_name = ziel_pfad.stem suffix = ziel_pfad.suffix while ziel_pfad.exists(): ziel_pfad = ziel_dir / f"{original_name} ({counter}){suffix}" counter += 1 # Verschieben shutil.move(quell_pfad, ziel_pfad) logger.info(f"Verschoben: {quell_pfad} -> {ziel_pfad}") return str(ziel_pfad) # ============ STANDARD-DOKUMENTTYPEN ============ # Diese werden für das einfache UI verwendet DOKUMENTTYPEN = { "rechnung": { "name": "Rechnung", "keywords": ["rechnung", "invoice"], "schema": "{datum} - Rechnung - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "rechnungen" }, "angebot": { "name": "Angebot", "keywords": ["angebot", "quotation", "offerte"], "schema": "{datum} - Angebot - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "angebote" }, "gutschrift": { "name": "Gutschrift", "keywords": ["gutschrift", "credit"], "schema": "{datum} - Gutschrift - {firma} - {nummer} - {betrag} EUR.pdf", "unterordner": "gutschriften" }, "lieferschein": { "name": "Lieferschein", "keywords": ["lieferschein", "delivery"], "schema": "{datum} - Lieferschein - {firma} - {nummer}.pdf", "unterordner": "lieferscheine" }, "auftragsbestaetigung": { "name": "Auftragsbestätigung", "keywords": ["auftragsbestätigung", "bestellbestätigung"], "schema": "{datum} - Auftragsbestätigung - {firma} - {nummer}.pdf", "unterordner": "auftraege" }, "vertrag": { "name": "Vertrag", "keywords": ["vertrag", "contract"], "schema": "Vertrag - {firma} - {nummer} - {datum}.pdf", "unterordner": "vertraege" }, "versicherung": { "name": "Versicherung", "keywords": ["versicherung", "police", "beitrag"], "schema": "Versicherung - {firma} - {nummer} - {datum}.pdf", "unterordner": "versicherungen" }, "zeugnis": { "name": "Zeugnis", "keywords": ["zeugnis", "zertifikat"], "schema": "Zeugnis - {firma} - {nummer} - {datum}.pdf", "unterordner": "zeugnisse" }, "bescheinigung": { "name": "Bescheinigung", "keywords": ["bescheinigung", "nachweis", "bestätigung"], "schema": "Bescheinigung - {firma} - {nummer} - {datum}.pdf", "unterordner": "bescheinigungen" }, "kontoauszug": { "name": "Kontoauszug", "keywords": ["kontoauszug", "account statement"], "schema": "{datum} - Kontoauszug - {firma} - {nummer}.pdf", "unterordner": "kontoauszuege" }, "sonstiges": { "name": "Sonstiges", "keywords": [], "schema": "{datum} - {typ} - {firma}.pdf", "unterordner": "sonstiges" } } def erstelle_einfache_regel(name: str, dokumenttyp: str, keywords: str, firma_wert: str = None, unterordner: str = None, prioritaet: int = 50) -> Dict: """ Erstellt eine einfache Regel basierend auf Dokumenttyp Args: name: Name der Regel (z.B. "Sonepar Rechnung") dokumenttyp: Typ aus DOKUMENTTYPEN keywords: Komma-getrennte Keywords zur Erkennung firma_wert: Optionaler fester Firmenwert unterordner: Optionaler Unterordner (überschreibt Standard) prioritaet: Priorität (niedriger = wichtiger) Returns: Regel-Dict für die Datenbank """ typ_config = DOKUMENTTYPEN.get(dokumenttyp, DOKUMENTTYPEN["sonstiges"]) regel = { "name": name, "prioritaet": prioritaet, "aktiv": True, "muster": { "keywords": keywords }, "extraktion": {}, "schema": typ_config["schema"], "unterordner": unterordner or typ_config["unterordner"] } # Feste Firma wenn angegeben if firma_wert: regel["extraktion"]["firma"] = {"wert": firma_wert} return regel def liste_dokumenttypen() -> List[Dict]: """Gibt Liste aller Dokumenttypen für UI zurück""" return [ {"id": key, "name": config["name"], "schema": config["schema"]} for key, config in DOKUMENTTYPEN.items() ]