323 lines
11 KiB
Python
323 lines
11 KiB
Python
"""
|
|
Sorter Modul
|
|
Regel-basierte Erkennung und Benennung von Dokumenten
|
|
"""
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import logging
|
|
import shutil
|
|
|
|
from .extraktoren import extrahiere_alle_felder, baue_dateiname
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Sorter:
|
|
"""Sortiert und benennt Dokumente basierend auf Regeln"""
|
|
|
|
def __init__(self, regeln: List[Dict]):
|
|
"""
|
|
Args:
|
|
regeln: Liste von Regel-Dicts, sortiert nach Priorität
|
|
"""
|
|
# Nach Priorität sortieren (niedrig = wichtig)
|
|
self.regeln = sorted(regeln, key=lambda r: r.get("prioritaet", 100))
|
|
|
|
def finde_passende_regel(self, dokument_info: Dict) -> Optional[Dict]:
|
|
"""
|
|
Findet die erste passende Regel für ein Dokument
|
|
|
|
Args:
|
|
dokument_info: Dict mit text, original_name, absender, etc.
|
|
|
|
Returns:
|
|
Passende Regel oder None
|
|
"""
|
|
for regel in self.regeln:
|
|
if not regel.get("aktiv", True):
|
|
continue
|
|
|
|
muster = regel.get("muster", {})
|
|
if self._pruefe_muster(muster, dokument_info):
|
|
logger.info(f"Regel '{regel.get('name')}' matched für {dokument_info.get('original_name')}")
|
|
return regel
|
|
|
|
return None
|
|
|
|
def _pruefe_muster(self, muster: Dict, dokument_info: Dict) -> bool:
|
|
"""Prüft ob alle Muster auf das Dokument zutreffen"""
|
|
text = dokument_info.get("text", "").lower()
|
|
original_name = dokument_info.get("original_name", "").lower()
|
|
absender = dokument_info.get("absender", "").lower()
|
|
|
|
# keywords (einfache Komma-getrennte Liste - für UI)
|
|
if "keywords" in muster:
|
|
keywords = muster["keywords"]
|
|
if isinstance(keywords, str):
|
|
keywords = [k.strip() for k in keywords.split(",")]
|
|
# Alle Keywords müssen vorkommen
|
|
for keyword in keywords:
|
|
keyword = keyword.lower().strip()
|
|
if keyword and keyword not in text and keyword not in original_name:
|
|
return False
|
|
|
|
# absender_contains
|
|
if "absender_contains" in muster:
|
|
if muster["absender_contains"].lower() not in absender:
|
|
return False
|
|
|
|
# dateiname_match
|
|
if "dateiname_match" in muster:
|
|
pattern = muster["dateiname_match"]
|
|
if isinstance(pattern, str):
|
|
if pattern.lower() not in original_name:
|
|
return False
|
|
elif isinstance(pattern, list):
|
|
if not any(p.lower() in original_name for p in pattern):
|
|
return False
|
|
|
|
# text_match (alle müssen enthalten sein)
|
|
if "text_match" in muster:
|
|
patterns = muster["text_match"]
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
for pattern in patterns:
|
|
if pattern.lower() not in text:
|
|
return False
|
|
|
|
# text_match_any (mindestens einer muss enthalten sein)
|
|
if "text_match_any" in muster:
|
|
patterns = muster["text_match_any"]
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
if not any(p.lower() in text for p in patterns):
|
|
return False
|
|
|
|
# text_regex
|
|
if "text_regex" in muster:
|
|
pattern = muster["text_regex"]
|
|
if not re.search(pattern, text, re.IGNORECASE):
|
|
return False
|
|
|
|
return True
|
|
|
|
def extrahiere_felder(self, regel: Dict, dokument_info: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Extrahiert Felder aus dem Dokument - nutzt globale Extraktoren mit Fallbacks
|
|
|
|
Returns:
|
|
Dict mit extrahierten Werten
|
|
"""
|
|
text = dokument_info.get("text", "")
|
|
regel_extraktion = regel.get("extraktion", {})
|
|
|
|
# Globale Extraktoren nutzen (mit Regel-spezifischen Überschreibungen)
|
|
felder = extrahiere_alle_felder(text, dokument_info, regel_extraktion)
|
|
|
|
# Regel-spezifische statische Werte überschreiben
|
|
for feld_name, feld_config in regel_extraktion.items():
|
|
if isinstance(feld_config, dict):
|
|
if "wert" in feld_config:
|
|
felder[feld_name] = feld_config["wert"]
|
|
elif "regex" in feld_config:
|
|
# Einzelnes Regex aus Regel
|
|
wert = self._extrahiere_mit_regex(feld_config, text)
|
|
if wert:
|
|
felder[feld_name] = wert
|
|
elif isinstance(feld_config, str):
|
|
# Direkter statischer Wert
|
|
felder[feld_name] = feld_config
|
|
|
|
return felder
|
|
|
|
def _extrahiere_mit_regex(self, config: Dict, text: str) -> Optional[str]:
|
|
"""Extrahiert ein Feld mit einem einzelnen Regex"""
|
|
try:
|
|
match = re.search(config["regex"], text, re.IGNORECASE | re.MULTILINE)
|
|
if match:
|
|
wert = match.group(1) if match.groups() else match.group(0)
|
|
|
|
# Datum formatieren
|
|
if "format" in config:
|
|
try:
|
|
datum = datetime.strptime(wert.strip(), config["format"])
|
|
return datum.strftime("%Y-%m-%d")
|
|
except:
|
|
pass
|
|
|
|
# Betrag formatieren
|
|
if config.get("typ") == "betrag":
|
|
wert = self._formatiere_betrag(wert)
|
|
|
|
return wert.strip()
|
|
except Exception as e:
|
|
logger.debug(f"Regex-Extraktion fehlgeschlagen: {e}")
|
|
|
|
return None
|
|
|
|
def _formatiere_betrag(self, betrag: str) -> str:
|
|
"""Formatiert Betrag einheitlich"""
|
|
betrag = betrag.replace(" ", "").replace(".", "").replace(",", ".")
|
|
|
|
try:
|
|
wert = float(betrag)
|
|
if wert == int(wert):
|
|
return str(int(wert))
|
|
return f"{wert:.2f}".replace(".", ",")
|
|
except:
|
|
return betrag
|
|
|
|
def generiere_dateinamen(self, regel: Dict, extrahierte_felder: Dict) -> str:
|
|
"""
|
|
Generiert den neuen Dateinamen basierend auf Schema
|
|
Nutzt den intelligenten Schema-Builder der fehlende Felder entfernt
|
|
"""
|
|
schema = regel.get("schema", "{datum} - Dokument.pdf")
|
|
return baue_dateiname(schema, extrahierte_felder, ".pdf")
|
|
|
|
def verschiebe_datei(self, quell_pfad: str, ziel_ordner: str, neuer_name: str) -> str:
|
|
"""
|
|
Verschiebt und benennt Datei um
|
|
|
|
Returns:
|
|
Neuer Pfad der Datei
|
|
"""
|
|
ziel_dir = Path(ziel_ordner)
|
|
ziel_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
ziel_pfad = ziel_dir / neuer_name
|
|
|
|
# Eindeutigen Namen sicherstellen
|
|
counter = 1
|
|
original_name = ziel_pfad.stem
|
|
suffix = ziel_pfad.suffix
|
|
while ziel_pfad.exists():
|
|
ziel_pfad = ziel_dir / f"{original_name} ({counter}){suffix}"
|
|
counter += 1
|
|
|
|
# Verschieben
|
|
shutil.move(quell_pfad, ziel_pfad)
|
|
logger.info(f"Verschoben: {quell_pfad} -> {ziel_pfad}")
|
|
|
|
return str(ziel_pfad)
|
|
|
|
|
|
# ============ STANDARD-DOKUMENTTYPEN ============
|
|
# Diese werden für das einfache UI verwendet
|
|
|
|
DOKUMENTTYPEN = {
|
|
"rechnung": {
|
|
"name": "Rechnung",
|
|
"keywords": ["rechnung", "invoice"],
|
|
"schema": "{datum} - Rechnung - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "rechnungen"
|
|
},
|
|
"angebot": {
|
|
"name": "Angebot",
|
|
"keywords": ["angebot", "quotation", "offerte"],
|
|
"schema": "{datum} - Angebot - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "angebote"
|
|
},
|
|
"gutschrift": {
|
|
"name": "Gutschrift",
|
|
"keywords": ["gutschrift", "credit"],
|
|
"schema": "{datum} - Gutschrift - {firma} - {nummer} - {betrag} EUR.pdf",
|
|
"unterordner": "gutschriften"
|
|
},
|
|
"lieferschein": {
|
|
"name": "Lieferschein",
|
|
"keywords": ["lieferschein", "delivery"],
|
|
"schema": "{datum} - Lieferschein - {firma} - {nummer}.pdf",
|
|
"unterordner": "lieferscheine"
|
|
},
|
|
"auftragsbestaetigung": {
|
|
"name": "Auftragsbestätigung",
|
|
"keywords": ["auftragsbestätigung", "bestellbestätigung"],
|
|
"schema": "{datum} - Auftragsbestätigung - {firma} - {nummer}.pdf",
|
|
"unterordner": "auftraege"
|
|
},
|
|
"vertrag": {
|
|
"name": "Vertrag",
|
|
"keywords": ["vertrag", "contract"],
|
|
"schema": "Vertrag - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "vertraege"
|
|
},
|
|
"versicherung": {
|
|
"name": "Versicherung",
|
|
"keywords": ["versicherung", "police", "beitrag"],
|
|
"schema": "Versicherung - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "versicherungen"
|
|
},
|
|
"zeugnis": {
|
|
"name": "Zeugnis",
|
|
"keywords": ["zeugnis", "zertifikat"],
|
|
"schema": "Zeugnis - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "zeugnisse"
|
|
},
|
|
"bescheinigung": {
|
|
"name": "Bescheinigung",
|
|
"keywords": ["bescheinigung", "nachweis", "bestätigung"],
|
|
"schema": "Bescheinigung - {firma} - {nummer} - {datum}.pdf",
|
|
"unterordner": "bescheinigungen"
|
|
},
|
|
"kontoauszug": {
|
|
"name": "Kontoauszug",
|
|
"keywords": ["kontoauszug", "account statement"],
|
|
"schema": "{datum} - Kontoauszug - {firma} - {nummer}.pdf",
|
|
"unterordner": "kontoauszuege"
|
|
},
|
|
"sonstiges": {
|
|
"name": "Sonstiges",
|
|
"keywords": [],
|
|
"schema": "{datum} - {typ} - {firma}.pdf",
|
|
"unterordner": "sonstiges"
|
|
}
|
|
}
|
|
|
|
|
|
def erstelle_einfache_regel(name: str, dokumenttyp: str, keywords: str,
|
|
firma_wert: str = None, unterordner: str = None,
|
|
prioritaet: int = 50) -> Dict:
|
|
"""
|
|
Erstellt eine einfache Regel basierend auf Dokumenttyp
|
|
|
|
Args:
|
|
name: Name der Regel (z.B. "Sonepar Rechnung")
|
|
dokumenttyp: Typ aus DOKUMENTTYPEN
|
|
keywords: Komma-getrennte Keywords zur Erkennung
|
|
firma_wert: Optionaler fester Firmenwert
|
|
unterordner: Optionaler Unterordner (überschreibt Standard)
|
|
prioritaet: Priorität (niedriger = wichtiger)
|
|
|
|
Returns:
|
|
Regel-Dict für die Datenbank
|
|
"""
|
|
typ_config = DOKUMENTTYPEN.get(dokumenttyp, DOKUMENTTYPEN["sonstiges"])
|
|
|
|
regel = {
|
|
"name": name,
|
|
"prioritaet": prioritaet,
|
|
"aktiv": True,
|
|
"muster": {
|
|
"keywords": keywords
|
|
},
|
|
"extraktion": {},
|
|
"schema": typ_config["schema"],
|
|
"unterordner": unterordner or typ_config["unterordner"]
|
|
}
|
|
|
|
# Feste Firma wenn angegeben
|
|
if firma_wert:
|
|
regel["extraktion"]["firma"] = {"wert": firma_wert}
|
|
|
|
return regel
|
|
|
|
|
|
def liste_dokumenttypen() -> List[Dict]:
|
|
"""Gibt Liste aller Dokumenttypen für UI zurück"""
|
|
return [
|
|
{"id": key, "name": config["name"], "schema": config["schema"]}
|
|
for key, config in DOKUMENTTYPEN.items()
|
|
]
|