docker.dateiverwaltung/backend/app/modules/extraktoren.py

373 lines
13 KiB
Python

"""
Globale Feld-Extraktoren mit Kaskaden-Regex
Werden automatisch als Fallback verwendet wenn regel-spezifische Muster nicht greifen
"""
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
import logging
logger = logging.getLogger(__name__)
# ============ DATUM ============
DATUM_MUSTER = [
# Mit Kontext (zuverlässiger)
{"regex": r"Rechnungsdatum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
{"regex": r"Belegdatum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
{"regex": r"Datum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
{"regex": r"Date[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
{"regex": r"vom[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
# ISO Format
{"regex": r"(\d{4})-(\d{2})-(\d{2})", "order": "ymd"},
# Deutsches Format ohne Kontext
{"regex": r"(\d{2})\.(\d{2})\.(\d{4})", "order": "dmy"},
{"regex": r"(\d{2})/(\d{2})/(\d{4})", "order": "dmy"},
# Amerikanisches Format
{"regex": r"(\d{2})/(\d{2})/(\d{4})", "order": "mdy"},
# Ausgeschriebene Monate
{"regex": r"(\d{1,2})\.\s*(Januar|Februar|März|April|Mai|Juni|Juli|August|September|Oktober|November|Dezember)\s*(\d{4})", "order": "dMy"},
{"regex": r"(\d{1,2})\s+(Jan|Feb|Mär|Apr|Mai|Jun|Jul|Aug|Sep|Okt|Nov|Dez)[a-z]*\.?\s+(\d{4})", "order": "dMy"},
]
MONATE_DE = {
"januar": 1, "februar": 2, "märz": 3, "april": 4, "mai": 5, "juni": 6,
"juli": 7, "august": 8, "september": 9, "oktober": 10, "november": 11, "dezember": 12,
"jan": 1, "feb": 2, "mär": 3, "apr": 4, "jun": 6, "jul": 7, "aug": 8, "sep": 9, "okt": 10, "nov": 11, "dez": 12
}
def extrahiere_datum(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
"""
Extrahiert Datum aus Text mit Kaskaden-Ansatz
Returns: ISO Format YYYY-MM-DD oder None
"""
muster_liste = (spezifische_muster or []) + DATUM_MUSTER
for muster in muster_liste:
try:
match = re.search(muster["regex"], text, re.IGNORECASE)
if match:
groups = match.groups()
order = muster.get("order", "dmy")
if order == "dmy":
tag, monat, jahr = int(groups[0]), int(groups[1]), int(groups[2])
elif order == "ymd":
jahr, monat, tag = int(groups[0]), int(groups[1]), int(groups[2])
elif order == "mdy":
monat, tag, jahr = int(groups[0]), int(groups[1]), int(groups[2])
elif order == "dMy":
tag = int(groups[0])
monat = MONATE_DE.get(groups[1].lower(), 1)
jahr = int(groups[2])
else:
continue
# Validierung
if 1 <= tag <= 31 and 1 <= monat <= 12 and 1900 <= jahr <= 2100:
return f"{jahr:04d}-{monat:02d}-{tag:02d}"
except Exception as e:
logger.debug(f"Datum-Extraktion fehlgeschlagen: {e}")
continue
return None
# ============ BETRAG ============
BETRAG_MUSTER = [
# Mit Kontext (zuverlässiger)
{"regex": r"Gesamtbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Rechnungsbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Endbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Summe[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Total[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Brutto[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"zu zahlen[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
{"regex": r"Zahlbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
# Mit Währung (weniger zuverlässig)
{"regex": r"([\d.,]+)\s*(?:EUR|€)", "context": False},
{"regex": r"\s*([\d.,]+)", "context": False},
]
def extrahiere_betrag(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
"""
Extrahiert Betrag aus Text mit Kaskaden-Ansatz
Returns: Formatierter Betrag (z.B. "1234,56") oder None
"""
muster_liste = (spezifische_muster or []) + BETRAG_MUSTER
for muster in muster_liste:
try:
match = re.search(muster["regex"], text, re.IGNORECASE)
if match:
betrag_str = match.group(1)
betrag = _parse_betrag(betrag_str)
if betrag is not None and betrag > 0:
# Formatierung: Ganzzahl wenn möglich, sonst 2 Dezimalstellen
if betrag == int(betrag):
return str(int(betrag))
return f"{betrag:.2f}".replace(".", ",")
except Exception as e:
logger.debug(f"Betrag-Extraktion fehlgeschlagen: {e}")
continue
return None
def _parse_betrag(betrag_str: str) -> Optional[float]:
"""Parst Betrag-String zu Float"""
betrag_str = betrag_str.strip()
# Leerzeichen entfernen
betrag_str = betrag_str.replace(" ", "")
# Deutsches Format: 1.234,56 -> 1234.56
if "," in betrag_str and "." in betrag_str:
if betrag_str.rfind(",") > betrag_str.rfind("."):
# Deutsches Format
betrag_str = betrag_str.replace(".", "").replace(",", ".")
else:
# Englisches Format
betrag_str = betrag_str.replace(",", "")
elif "," in betrag_str:
# Nur Komma: deutsches Dezimaltrennzeichen
betrag_str = betrag_str.replace(",", ".")
try:
return float(betrag_str)
except:
return None
# ============ RECHNUNGSNUMMER ============
NUMMER_MUSTER = [
# Mit Kontext
{"regex": r"Rechnungsnummer[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Rechnung\s*Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Rechnungs-Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Invoice\s*(?:No\.?|Number)?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Beleg-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Dokumentnummer[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Bestell-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
{"regex": r"Auftrags-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
# Typische Formate ohne Kontext
{"regex": r"RE-?(\d{4,})", "context": False},
{"regex": r"INV-?(\d{4,})", "context": False},
]
def extrahiere_nummer(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
"""
Extrahiert Rechnungs-/Belegnummer aus Text
"""
muster_liste = (spezifische_muster or []) + NUMMER_MUSTER
for muster in muster_liste:
try:
match = re.search(muster["regex"], text, re.IGNORECASE)
if match:
nummer = match.group(1).strip()
if len(nummer) >= 3: # Mindestens 3 Zeichen
return nummer
except Exception as e:
logger.debug(f"Nummer-Extraktion fehlgeschlagen: {e}")
continue
return None
# ============ FIRMA/ABSENDER ============
FIRMA_MUSTER = [
# Absender-Zeile
{"regex": r"^([A-ZÄÖÜ][A-Za-zäöüÄÖÜß\s&\-\.]+(?:GmbH|AG|KG|e\.K\.|Inc|Ltd|SE|UG))", "context": True},
{"regex": r"Absender[:\s]*([A-Za-zäöüÄÖÜß\s&\-\.]+)", "context": True},
{"regex": r"Von[:\s]*([A-Za-zäöüÄÖÜß\s&\-\.]+)", "context": True},
]
# Bekannte Firmen (werden im Text gesucht)
BEKANNTE_FIRMEN = [
"Sonepar", "Amazon", "Ebay", "MediaMarkt", "Saturn", "Conrad", "Reichelt",
"Hornbach", "Bauhaus", "OBI", "Hagebau", "Toom", "Hellweg",
"Telekom", "Vodafone", "O2", "1&1",
"Allianz", "HUK", "Provinzial", "DEVK", "Gothaer",
"IKEA", "Poco", "XXXLutz", "Roller",
"Alternate", "Mindfactory", "Caseking", "Notebooksbilliger",
"DHL", "DPD", "Hermes", "UPS", "GLS",
]
def extrahiere_firma(text: str, absender_email: str = "", spezifische_muster: List[Dict] = None) -> Optional[str]:
"""
Extrahiert Firmennamen aus Text oder E-Mail-Absender
"""
text_lower = text.lower()
# 1. Bekannte Firmen im Text suchen
for firma in BEKANNTE_FIRMEN:
if firma.lower() in text_lower:
return firma
# 2. Aus E-Mail-Domain extrahieren
if absender_email:
match = re.search(r"@([\w\-]+)\.", absender_email)
if match:
domain = match.group(1)
# Bekannte Domain-Namen kapitalisieren
for firma in BEKANNTE_FIRMEN:
if firma.lower() == domain.lower():
return firma
return domain.capitalize()
# 3. Regex-Muster
muster_liste = (spezifische_muster or []) + FIRMA_MUSTER
for muster in muster_liste:
try:
match = re.search(muster["regex"], text, re.MULTILINE)
if match:
firma = match.group(1).strip()
if len(firma) >= 2:
return firma
except:
continue
return None
# ============ DOKUMENTTYP ============
DOKUMENTTYP_KEYWORDS = {
"Rechnung": ["rechnung", "invoice", "faktura", "bill"],
"Angebot": ["angebot", "quotation", "quote", "offerte"],
"Gutschrift": ["gutschrift", "credit note", "erstattung"],
"Mahnung": ["mahnung", "zahlungserinnerung", "payment reminder"],
"Lieferschein": ["lieferschein", "delivery note", "packing slip"],
"Auftragsbestätigung": ["auftragsbestätigung", "order confirmation", "bestellbestätigung"],
"Vertrag": ["vertrag", "contract", "vereinbarung"],
"Versicherungsschein": ["versicherungsschein", "police", "versicherungspolice"],
"Zeugnis": ["zeugnis", "certificate", "zertifikat"],
"Bescheinigung": ["bescheinigung", "nachweis", "bestätigung"],
"Kontoauszug": ["kontoauszug", "account statement", "bankbeleg"],
"Beitragsrechnung": ["beitragsrechnung", "beitragsberechnung", "mitgliedsbeitrag"],
}
def extrahiere_dokumenttyp(text: str, dateiname: str = "") -> Optional[str]:
"""
Erkennt den Dokumenttyp anhand von Keywords
"""
text_lower = text.lower() + " " + dateiname.lower()
for typ, keywords in DOKUMENTTYP_KEYWORDS.items():
for keyword in keywords:
if keyword in text_lower:
return typ
return None
# ============ HAUPTFUNKTION ============
def extrahiere_alle_felder(text: str, dokument_info: Dict = None,
regel_extraktion: Dict = None) -> Dict[str, Any]:
"""
Extrahiert alle verfügbaren Felder aus einem Dokument
Args:
text: Der extrahierte Text aus dem PDF
dokument_info: Zusätzliche Infos (absender, original_name, etc.)
regel_extraktion: Spezifische Extraktionsregeln aus der Regel
Returns:
Dict mit allen extrahierten Feldern
"""
dokument_info = dokument_info or {}
regel_extraktion = regel_extraktion or {}
felder = {}
# Datum
datum_muster = regel_extraktion.get("datum", {}).get("muster", [])
datum = extrahiere_datum(text, datum_muster if isinstance(datum_muster, list) else None)
if datum:
felder["datum"] = datum
# Betrag
betrag_muster = regel_extraktion.get("betrag", {}).get("muster", [])
betrag = extrahiere_betrag(text, betrag_muster if isinstance(betrag_muster, list) else None)
if betrag:
felder["betrag"] = betrag
# Nummer
nummer_muster = regel_extraktion.get("nummer", {}).get("muster", [])
nummer = extrahiere_nummer(text, nummer_muster if isinstance(nummer_muster, list) else None)
if nummer:
felder["nummer"] = nummer
# Firma
absender = dokument_info.get("absender", "")
firma = extrahiere_firma(text, absender)
if firma:
felder["firma"] = firma
# Dokumenttyp
dateiname = dokument_info.get("original_name", "")
typ = extrahiere_dokumenttyp(text, dateiname)
if typ:
felder["typ"] = typ
# Statische Werte aus Regel übernehmen
for feld_name, feld_config in regel_extraktion.items():
if isinstance(feld_config, dict) and "wert" in feld_config:
felder[feld_name] = feld_config["wert"]
return felder
# ============ SCHEMA-BUILDER ============
def baue_dateiname(schema: str, felder: Dict[str, Any], endung: str = ".pdf") -> str:
"""
Baut Dateinamen aus Schema und Feldern.
Entfernt automatisch Platzhalter und deren Trennzeichen wenn Feld fehlt.
Schema-Beispiel: "{datum} - {typ} - {firma} - {nummer} - {betrag} EUR"
Mit felder = {datum: "2026-10-01", typ: "Rechnung", firma: "Sonepar"}
Ergebnis: "2026-10-01 - Rechnung - Sonepar.pdf"
"""
# Schema ohne Endung verarbeiten
if schema.lower().endswith(".pdf"):
schema = schema[:-4]
# Platzhalter ersetzen
result = schema
for key, value in felder.items():
placeholder = "{" + key + "}"
if placeholder in result and value:
result = result.replace(placeholder, str(value))
# Nicht ersetzte Platzhalter und ihre Trennzeichen entfernen
# Muster: " - {feld}" oder "{feld} - " oder "{feld}"
result = re.sub(r'\s*-\s*\{[^}]+\}', '', result)
result = re.sub(r'\{[^}]+\}\s*-\s*', '', result)
result = re.sub(r'\{[^}]+\}', '', result)
# Aufräumen: Doppelte Trennzeichen, Leerzeichen
result = re.sub(r'\s*-\s*-\s*', ' - ', result)
result = re.sub(r'\s+', ' ', result)
result = result.strip(' -')
# Ungültige Zeichen entfernen
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
result = result.replace(char, "_")
# Endung anhängen
if not result:
result = "Dokument"
return result + endung