461 lines
17 KiB
Python
Executable file
461 lines
17 KiB
Python
Executable file
"""
|
|
Sorter Modul
|
|
Regel-basierte Erkennung und Benennung von Dokumenten
|
|
"""
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import logging
|
|
import shutil
|
|
|
|
from .extraktoren import extrahiere_alle_felder, baue_dateiname
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Sorter:
|
|
"""Sortiert und benennt Dokumente basierend auf Regeln"""
|
|
|
|
def __init__(self, regeln: List[Dict]):
|
|
"""
|
|
Args:
|
|
regeln: Liste von Regel-Dicts, sortiert nach Priorität
|
|
"""
|
|
# Nach Priorität sortieren (niedrig = wichtig)
|
|
self.regeln = sorted(regeln, key=lambda r: r.get("prioritaet", 100))
|
|
|
|
def finde_passende_regel(self, dokument_info: Dict) -> Optional[Dict]:
|
|
"""
|
|
Findet die erste passende Regel für ein Dokument
|
|
|
|
Args:
|
|
dokument_info: Dict mit text, original_name, absender, etc.
|
|
|
|
Returns:
|
|
Passende Regel oder None
|
|
"""
|
|
for regel in self.regeln:
|
|
if not regel.get("aktiv", True):
|
|
continue
|
|
|
|
muster = regel.get("muster", {})
|
|
if self._pruefe_muster(muster, dokument_info):
|
|
logger.info(f"Regel '{regel.get('name')}' matched für {dokument_info.get('original_name')}")
|
|
return regel
|
|
|
|
return None
|
|
|
|
def _pruefe_muster(self, muster: Dict, dokument_info: Dict) -> bool:
|
|
"""Prüft ob alle Muster auf das Dokument zutreffen"""
|
|
text = dokument_info.get("text", "").lower()
|
|
original_name = dokument_info.get("original_name", "").lower()
|
|
absender = dokument_info.get("absender", "").lower()
|
|
|
|
# Option: Auch Dateinamen prüfen (Standard: nur Text)
|
|
auch_dateiname = muster.get("auch_dateiname", False)
|
|
|
|
# keywords (einfache Komma-getrennte Liste - für UI)
|
|
# Verwendet Wortgrenzen (\b) um Teilstring-Matches zu vermeiden
|
|
# z.B. "rechnung" matched nicht "Versicherungsnummer" oder "Berechnung"
|
|
if "keywords" in muster:
|
|
keywords = muster["keywords"]
|
|
if isinstance(keywords, str):
|
|
keywords = [k.strip() for k in keywords.split(",")]
|
|
# Alle Keywords müssen vorkommen (als ganzes Wort)
|
|
for keyword in keywords:
|
|
keyword = keyword.lower().strip()
|
|
if keyword:
|
|
# Regex mit Wortgrenzen für exaktes Wort-Matching
|
|
pattern = r'\b' + re.escape(keyword) + r'\b'
|
|
# Standard: nur im Text suchen, mit Option auch im Dateinamen
|
|
if auch_dateiname:
|
|
if not re.search(pattern, text) and not re.search(pattern, original_name):
|
|
return False
|
|
else:
|
|
if not re.search(pattern, text):
|
|
return False
|
|
|
|
# absender_contains
|
|
if "absender_contains" in muster:
|
|
if muster["absender_contains"].lower() not in absender:
|
|
return False
|
|
|
|
# dateiname_match
|
|
if "dateiname_match" in muster:
|
|
pattern = muster["dateiname_match"]
|
|
if isinstance(pattern, str):
|
|
if pattern.lower() not in original_name:
|
|
return False
|
|
elif isinstance(pattern, list):
|
|
if not any(p.lower() in original_name for p in pattern):
|
|
return False
|
|
|
|
# text_match (alle müssen enthalten sein)
|
|
if "text_match" in muster:
|
|
patterns = muster["text_match"]
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
# Nur prüfen wenn Liste nicht leer
|
|
for pattern in patterns:
|
|
if pattern and pattern.lower() not in text:
|
|
return False
|
|
|
|
# text_match_any (mindestens einer muss enthalten sein)
|
|
if "text_match_any" in muster:
|
|
patterns = muster["text_match_any"]
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
# Nur prüfen wenn Liste nicht leer
|
|
if patterns and not any(p.lower() in text for p in patterns if p):
|
|
return False
|
|
|
|
# text_regex
|
|
if "text_regex" in muster:
|
|
pattern = muster["text_regex"]
|
|
if not re.search(pattern, text, re.IGNORECASE):
|
|
return False
|
|
|
|
# ============ NEGATIVE MUSTER (dürfen NICHT vorkommen) ============
|
|
|
|
# keywords_nicht (keines darf vorkommen - als ganzes Wort)
|
|
if "keywords_nicht" in muster:
|
|
keywords = muster["keywords_nicht"]
|
|
if isinstance(keywords, str):
|
|
keywords = [k.strip() for k in keywords.split(",")]
|
|
for keyword in keywords:
|
|
keyword = keyword.lower().strip()
|
|
if keyword:
|
|
# Regex mit Wortgrenzen für exaktes Wort-Matching
|
|
pattern = r'\b' + re.escape(keyword) + r'\b'
|
|
# Standard: nur im Text prüfen, mit Option auch im Dateinamen
|
|
if auch_dateiname:
|
|
if re.search(pattern, text) or re.search(pattern, original_name):
|
|
return False # Verbotenes Keyword gefunden
|
|
else:
|
|
if re.search(pattern, text):
|
|
return False # Verbotenes Keyword gefunden
|
|
|
|
# text_not_match (keines darf enthalten sein)
|
|
if "text_not_match" in muster:
|
|
patterns = muster["text_not_match"]
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
for pattern in patterns:
|
|
if pattern and pattern.lower() in text:
|
|
return False # Verbotenes Pattern gefunden
|
|
|
|
# text_not_regex (Regex darf nicht matchen)
|
|
if "text_not_regex" in muster:
|
|
pattern = muster["text_not_regex"]
|
|
if re.search(pattern, text, re.IGNORECASE):
|
|
return False # Verbotenes Regex gefunden
|
|
|
|
return True
|
|
|
|
def extrahiere_felder(self, regel: Dict, dokument_info: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Extrahiert Felder aus dem Dokument - nutzt globale Extraktoren mit Fallbacks
|
|
|
|
Returns:
|
|
Dict mit extrahierten Werten
|
|
"""
|
|
text = dokument_info.get("text", "")
|
|
regel_extraktion = regel.get("extraktion", {})
|
|
|
|
# Globale Extraktoren nutzen (mit Regel-spezifischen Überschreibungen)
|
|
felder = extrahiere_alle_felder(text, dokument_info, regel_extraktion)
|
|
|
|
# Regel-spezifische statische Werte überschreiben
|
|
for feld_name, feld_config in regel_extraktion.items():
|
|
if isinstance(feld_config, dict):
|
|
if "wert" in feld_config:
|
|
felder[feld_name] = feld_config["wert"]
|
|
elif "regex" in feld_config:
|
|
# Einzelnes Regex aus Regel
|
|
wert = self._extrahiere_mit_regex(feld_config, text)
|
|
if wert:
|
|
felder[feld_name] = wert
|
|
elif isinstance(feld_config, str):
|
|
# Direkter statischer Wert
|
|
felder[feld_name] = feld_config
|
|
|
|
return felder
|
|
|
|
def _extrahiere_mit_regex(self, config: Dict, text: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert ein Feld mit Regex - unterstützt einzelne oder mehrere Alternativen
|
|
|
|
Unterstützt "auswahl" Option für mehrere Treffer:
|
|
- "first": Erster Treffer (Standard)
|
|
- "last": Letzter Treffer
|
|
- "max": Größter numerischer Wert
|
|
- "min": Kleinster numerischer Wert
|
|
"""
|
|
regex_pattern = config.get("regex")
|
|
if not regex_pattern:
|
|
return None
|
|
|
|
# Mehrere Regex-Alternativen (Array)
|
|
patterns = regex_pattern if isinstance(regex_pattern, list) else [regex_pattern]
|
|
|
|
# Auswahl-Modus (max, min, first, last)
|
|
auswahl = config.get("auswahl", "first")
|
|
|
|
for pattern in patterns:
|
|
try:
|
|
# Bei max/min/last: Alle Treffer finden
|
|
if auswahl in ("max", "min", "last"):
|
|
alle_matches = list(re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE))
|
|
if not alle_matches:
|
|
continue
|
|
|
|
# Werte extrahieren
|
|
werte = []
|
|
for match in alle_matches:
|
|
wert = match.group(1) if match.groups() else match.group(0)
|
|
werte.append(wert.strip())
|
|
|
|
if not werte:
|
|
continue
|
|
|
|
# Auswahl treffen
|
|
if auswahl == "last":
|
|
wert = werte[-1]
|
|
elif auswahl in ("max", "min"):
|
|
# Versuche numerische Auswahl
|
|
wert = self._waehle_numerisch(werte, auswahl)
|
|
else:
|
|
wert = werte[0]
|
|
else:
|
|
# Standard: Erster Treffer (first)
|
|
match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
|
|
if not match:
|
|
continue
|
|
wert = match.group(1) if match.groups() else match.group(0)
|
|
wert = wert.strip()
|
|
|
|
# Datum formatieren
|
|
if "format" in config:
|
|
try:
|
|
datum_str = wert.strip()
|
|
# Deutsche Monatsnamen zu englischen konvertieren
|
|
de_monate = {
|
|
'Januar': 'January', 'Februar': 'February', 'März': 'March',
|
|
'April': 'April', 'Mai': 'May', 'Juni': 'June',
|
|
'Juli': 'July', 'August': 'August', 'September': 'September',
|
|
'Oktober': 'October', 'November': 'November', 'Dezember': 'December'
|
|
}
|
|
for de, en in de_monate.items():
|
|
datum_str = datum_str.replace(de, en)
|
|
datum = datetime.strptime(datum_str, config["format"])
|
|
return datum.strftime("%Y-%m-%d")
|
|
except:
|
|
pass
|
|
|
|
# Auto-Erkennung: Deutsches Datum DD.MM.YYYY zu YYYY-MM-DD
|
|
datum_match = re.match(r'^(\d{1,2})\.(\d{1,2})\.(\d{4})$', wert.strip())
|
|
if datum_match:
|
|
tag, monat, jahr = datum_match.groups()
|
|
try:
|
|
return f"{jahr}-{int(monat):02d}-{int(tag):02d}"
|
|
except:
|
|
pass
|
|
|
|
# Betrag formatieren
|
|
if config.get("typ") == "betrag":
|
|
wert = self._formatiere_betrag(wert)
|
|
|
|
return wert
|
|
except Exception as e:
|
|
logger.debug(f"Regex-Extraktion fehlgeschlagen für '{pattern}': {e}")
|
|
|
|
return None
|
|
|
|
def _waehle_numerisch(self, werte: List[str], modus: str) -> str:
|
|
"""Wählt max oder min aus einer Liste von Werten (versucht numerisch zu parsen)"""
|
|
# Versuche alle Werte als Zahlen zu parsen
|
|
numerische_werte = []
|
|
for wert in werte:
|
|
try:
|
|
# Deutsches Format: 1.234,56 -> 1234.56
|
|
clean = wert.replace(" ", "").replace(".", "").replace(",", ".")
|
|
zahl = float(clean)
|
|
numerische_werte.append((zahl, wert))
|
|
except ValueError:
|
|
# Wenn nicht parsebar, ignorieren
|
|
pass
|
|
|
|
if not numerische_werte:
|
|
# Fallback: Erster Wert wenn keine Zahlen gefunden
|
|
return werte[0]
|
|
|
|
if modus == "max":
|
|
return max(numerische_werte, key=lambda x: x[0])[1]
|
|
else: # min
|
|
return min(numerische_werte, key=lambda x: x[0])[1]
|
|
|
|
def _formatiere_betrag(self, betrag: str) -> str:
|
|
"""Formatiert Betrag einheitlich"""
|
|
betrag = betrag.replace(" ", "").replace(".", "").replace(",", ".")
|
|
|
|
try:
|
|
wert = float(betrag)
|
|
if wert == int(wert):
|
|
return str(int(wert))
|
|
return f"{wert:.2f}".replace(".", ",")
|
|
except:
|
|
return betrag
|
|
|
|
def generiere_dateinamen(self, regel: Dict, extrahierte_felder: Dict) -> str:
|
|
"""
|
|
Generiert den neuen Dateinamen basierend auf Schema
|
|
Nutzt den intelligenten Schema-Builder der fehlende Felder entfernt
|
|
"""
|
|
schema = regel.get("schema", "{datum} - Dokument.pdf")
|
|
return baue_dateiname(schema, extrahierte_felder, ".pdf")
|
|
|
|
def verschiebe_datei(self, quell_pfad: str, ziel_ordner: str, neuer_name: str) -> str:
|
|
"""
|
|
Verschiebt und benennt Datei um
|
|
|
|
Returns:
|
|
Neuer Pfad der Datei
|
|
"""
|
|
ziel_dir = Path(ziel_ordner)
|
|
ziel_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
ziel_pfad = ziel_dir / neuer_name
|
|
|
|
# Eindeutigen Namen sicherstellen
|
|
counter = 1
|
|
original_name = ziel_pfad.stem
|
|
suffix = ziel_pfad.suffix
|
|
while ziel_pfad.exists():
|
|
ziel_pfad = ziel_dir / f"{original_name} ({counter}){suffix}"
|
|
counter += 1
|
|
|
|
# Verschieben
|
|
shutil.move(quell_pfad, ziel_pfad)
|
|
logger.info(f"Verschoben: {quell_pfad} -> {ziel_pfad}")
|
|
|
|
return str(ziel_pfad)
|
|
|
|
|
|
# ============ STANDARD-DOKUMENTTYPEN ============
|
|
# Diese werden für das einfache UI verwendet
|
|
|
|
DOKUMENTTYPEN = {
|
|
"rechnung": {
|
|
"name": "Rechnung",
|
|
"keywords": ["rechnung", "invoice"],
|
|
"schema": "{datum} - Rechnung - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "rechnungen"
|
|
},
|
|
"angebot": {
|
|
"name": "Angebot",
|
|
"keywords": ["angebot", "quotation", "offerte"],
|
|
"schema": "{datum} - Angebot - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "angebote"
|
|
},
|
|
"gutschrift": {
|
|
"name": "Gutschrift",
|
|
"keywords": ["gutschrift", "credit"],
|
|
"schema": "{datum} - Gutschrift - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "gutschriften"
|
|
},
|
|
"lieferschein": {
|
|
"name": "Lieferschein",
|
|
"keywords": ["lieferschein", "delivery"],
|
|
"schema": "{datum} - Lieferschein - {firma} - {nummer}.pdf",
|
|
"unterordner": "lieferscheine"
|
|
},
|
|
"auftragsbestaetigung": {
|
|
"name": "Auftragsbestätigung",
|
|
"keywords": ["auftragsbestätigung", "bestellbestätigung"],
|
|
"schema": "{datum} - Auftragsbestätigung - {firma} - {nummer}.pdf",
|
|
"unterordner": "auftraege"
|
|
},
|
|
"vertrag": {
|
|
"name": "Vertrag",
|
|
"keywords": ["vertrag", "contract"],
|
|
"schema": "Vertrag - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "vertraege"
|
|
},
|
|
"versicherung": {
|
|
"name": "Versicherung",
|
|
"keywords": ["versicherung", "police", "beitrag"],
|
|
"schema": "Versicherung - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "versicherungen"
|
|
},
|
|
"zeugnis": {
|
|
"name": "Zeugnis",
|
|
"keywords": ["zeugnis", "zertifikat"],
|
|
"schema": "Zeugnis - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "zeugnisse"
|
|
},
|
|
"bescheinigung": {
|
|
"name": "Bescheinigung",
|
|
"keywords": ["bescheinigung", "nachweis", "bestätigung"],
|
|
"schema": "Bescheinigung - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "bescheinigungen"
|
|
},
|
|
"kontoauszug": {
|
|
"name": "Kontoauszug",
|
|
"keywords": ["kontoauszug", "account statement"],
|
|
"schema": "{datum} - Kontoauszug - {firma} - {nummer}.pdf",
|
|
"unterordner": "kontoauszuege"
|
|
},
|
|
"sonstiges": {
|
|
"name": "Sonstiges",
|
|
"keywords": [],
|
|
"schema": "{datum} - {typ} - {firma}.pdf",
|
|
"unterordner": "sonstiges"
|
|
}
|
|
}
|
|
|
|
|
|
def erstelle_einfache_regel(name: str, dokumenttyp: str, keywords: str,
|
|
firma_wert: str = None, unterordner: str = None,
|
|
prioritaet: int = 50) -> Dict:
|
|
"""
|
|
Erstellt eine einfache Regel basierend auf Dokumenttyp
|
|
|
|
Args:
|
|
name: Name der Regel (z.B. "Sonepar Rechnung")
|
|
dokumenttyp: Typ aus DOKUMENTTYPEN
|
|
keywords: Komma-getrennte Keywords zur Erkennung
|
|
firma_wert: Optionaler fester Firmenwert
|
|
unterordner: Optionaler Unterordner (überschreibt Standard)
|
|
prioritaet: Priorität (niedriger = wichtiger)
|
|
|
|
Returns:
|
|
Regel-Dict für die Datenbank
|
|
"""
|
|
typ_config = DOKUMENTTYPEN.get(dokumenttyp, DOKUMENTTYPEN["sonstiges"])
|
|
|
|
regel = {
|
|
"name": name,
|
|
"prioritaet": prioritaet,
|
|
"aktiv": True,
|
|
"muster": {
|
|
"keywords": keywords
|
|
},
|
|
"extraktion": {},
|
|
"schema": typ_config["schema"],
|
|
"unterordner": unterordner or typ_config["unterordner"]
|
|
}
|
|
|
|
# Feste Firma wenn angegeben
|
|
if firma_wert:
|
|
regel["extraktion"]["firma"] = {"wert": firma_wert}
|
|
|
|
return regel
|
|
|
|
|
|
def liste_dokumenttypen() -> List[Dict]:
|
|
"""Gibt Liste aller Dokumenttypen für UI zurück"""
|
|
return [
|
|
{"id": key, "name": config["name"], "schema": config["schema"]}
|
|
for key, config in DOKUMENTTYPEN.items()
|
|
]
|