373 lines
13 KiB
Python
373 lines
13 KiB
Python
"""
|
|
Globale Feld-Extraktoren mit Kaskaden-Regex
|
|
Werden automatisch als Fallback verwendet wenn regel-spezifische Muster nicht greifen
|
|
"""
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============ DATUM ============
|
|
DATUM_MUSTER = [
|
|
# Mit Kontext (zuverlässiger)
|
|
{"regex": r"Rechnungsdatum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
|
|
{"regex": r"Belegdatum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
|
|
{"regex": r"Datum[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
|
|
{"regex": r"Date[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
|
|
{"regex": r"vom[:\s]*(\d{2})[./](\d{2})[./](\d{4})", "order": "dmy"},
|
|
|
|
# ISO Format
|
|
{"regex": r"(\d{4})-(\d{2})-(\d{2})", "order": "ymd"},
|
|
|
|
# Deutsches Format ohne Kontext
|
|
{"regex": r"(\d{2})\.(\d{2})\.(\d{4})", "order": "dmy"},
|
|
{"regex": r"(\d{2})/(\d{2})/(\d{4})", "order": "dmy"},
|
|
|
|
# Amerikanisches Format
|
|
{"regex": r"(\d{2})/(\d{2})/(\d{4})", "order": "mdy"},
|
|
|
|
# Ausgeschriebene Monate
|
|
{"regex": r"(\d{1,2})\.\s*(Januar|Februar|März|April|Mai|Juni|Juli|August|September|Oktober|November|Dezember)\s*(\d{4})", "order": "dMy"},
|
|
{"regex": r"(\d{1,2})\s+(Jan|Feb|Mär|Apr|Mai|Jun|Jul|Aug|Sep|Okt|Nov|Dez)[a-z]*\.?\s+(\d{4})", "order": "dMy"},
|
|
]
|
|
|
|
MONATE_DE = {
|
|
"januar": 1, "februar": 2, "märz": 3, "april": 4, "mai": 5, "juni": 6,
|
|
"juli": 7, "august": 8, "september": 9, "oktober": 10, "november": 11, "dezember": 12,
|
|
"jan": 1, "feb": 2, "mär": 3, "apr": 4, "jun": 6, "jul": 7, "aug": 8, "sep": 9, "okt": 10, "nov": 11, "dez": 12
|
|
}
|
|
|
|
|
|
def extrahiere_datum(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
|
|
"""
|
|
Extrahiert Datum aus Text mit Kaskaden-Ansatz
|
|
Returns: ISO Format YYYY-MM-DD oder None
|
|
"""
|
|
muster_liste = (spezifische_muster or []) + DATUM_MUSTER
|
|
|
|
for muster in muster_liste:
|
|
try:
|
|
match = re.search(muster["regex"], text, re.IGNORECASE)
|
|
if match:
|
|
groups = match.groups()
|
|
order = muster.get("order", "dmy")
|
|
|
|
if order == "dmy":
|
|
tag, monat, jahr = int(groups[0]), int(groups[1]), int(groups[2])
|
|
elif order == "ymd":
|
|
jahr, monat, tag = int(groups[0]), int(groups[1]), int(groups[2])
|
|
elif order == "mdy":
|
|
monat, tag, jahr = int(groups[0]), int(groups[1]), int(groups[2])
|
|
elif order == "dMy":
|
|
tag = int(groups[0])
|
|
monat = MONATE_DE.get(groups[1].lower(), 1)
|
|
jahr = int(groups[2])
|
|
else:
|
|
continue
|
|
|
|
# Validierung
|
|
if 1 <= tag <= 31 and 1 <= monat <= 12 and 1900 <= jahr <= 2100:
|
|
return f"{jahr:04d}-{monat:02d}-{tag:02d}"
|
|
except Exception as e:
|
|
logger.debug(f"Datum-Extraktion fehlgeschlagen: {e}")
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
# ============ BETRAG ============
|
|
BETRAG_MUSTER = [
|
|
# Mit Kontext (zuverlässiger)
|
|
{"regex": r"Gesamtbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Rechnungsbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Endbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Summe[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Total[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Brutto[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"zu zahlen[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
{"regex": r"Zahlbetrag[:\s]*([\d.,]+)\s*(?:EUR|€)?", "context": True},
|
|
|
|
# Mit Währung (weniger zuverlässig)
|
|
{"regex": r"([\d.,]+)\s*(?:EUR|€)", "context": False},
|
|
{"regex": r"€\s*([\d.,]+)", "context": False},
|
|
]
|
|
|
|
|
|
def extrahiere_betrag(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
|
|
"""
|
|
Extrahiert Betrag aus Text mit Kaskaden-Ansatz
|
|
Returns: Formatierter Betrag (z.B. "1234,56") oder None
|
|
"""
|
|
muster_liste = (spezifische_muster or []) + BETRAG_MUSTER
|
|
|
|
for muster in muster_liste:
|
|
try:
|
|
match = re.search(muster["regex"], text, re.IGNORECASE)
|
|
if match:
|
|
betrag_str = match.group(1)
|
|
betrag = _parse_betrag(betrag_str)
|
|
if betrag is not None and betrag > 0:
|
|
# Formatierung: Ganzzahl wenn möglich, sonst 2 Dezimalstellen
|
|
if betrag == int(betrag):
|
|
return str(int(betrag))
|
|
return f"{betrag:.2f}".replace(".", ",")
|
|
except Exception as e:
|
|
logger.debug(f"Betrag-Extraktion fehlgeschlagen: {e}")
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
def _parse_betrag(betrag_str: str) -> Optional[float]:
|
|
"""Parst Betrag-String zu Float"""
|
|
betrag_str = betrag_str.strip()
|
|
|
|
# Leerzeichen entfernen
|
|
betrag_str = betrag_str.replace(" ", "")
|
|
|
|
# Deutsches Format: 1.234,56 -> 1234.56
|
|
if "," in betrag_str and "." in betrag_str:
|
|
if betrag_str.rfind(",") > betrag_str.rfind("."):
|
|
# Deutsches Format
|
|
betrag_str = betrag_str.replace(".", "").replace(",", ".")
|
|
else:
|
|
# Englisches Format
|
|
betrag_str = betrag_str.replace(",", "")
|
|
elif "," in betrag_str:
|
|
# Nur Komma: deutsches Dezimaltrennzeichen
|
|
betrag_str = betrag_str.replace(",", ".")
|
|
|
|
try:
|
|
return float(betrag_str)
|
|
except:
|
|
return None
|
|
|
|
|
|
# ============ RECHNUNGSNUMMER ============
|
|
NUMMER_MUSTER = [
|
|
# Mit Kontext
|
|
{"regex": r"Rechnungsnummer[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Rechnung\s*Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Rechnungs-Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Invoice\s*(?:No\.?|Number)?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Beleg-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Dokumentnummer[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Bestell-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
{"regex": r"Auftrags-?Nr\.?[:\s#]*([A-Z0-9][\w\-/]+)", "context": True},
|
|
|
|
# Typische Formate ohne Kontext
|
|
{"regex": r"RE-?(\d{4,})", "context": False},
|
|
{"regex": r"INV-?(\d{4,})", "context": False},
|
|
]
|
|
|
|
|
|
def extrahiere_nummer(text: str, spezifische_muster: List[Dict] = None) -> Optional[str]:
|
|
"""
|
|
Extrahiert Rechnungs-/Belegnummer aus Text
|
|
"""
|
|
muster_liste = (spezifische_muster or []) + NUMMER_MUSTER
|
|
|
|
for muster in muster_liste:
|
|
try:
|
|
match = re.search(muster["regex"], text, re.IGNORECASE)
|
|
if match:
|
|
nummer = match.group(1).strip()
|
|
if len(nummer) >= 3: # Mindestens 3 Zeichen
|
|
return nummer
|
|
except Exception as e:
|
|
logger.debug(f"Nummer-Extraktion fehlgeschlagen: {e}")
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
# ============ FIRMA/ABSENDER ============
|
|
FIRMA_MUSTER = [
|
|
# Absender-Zeile
|
|
{"regex": r"^([A-ZÄÖÜ][A-Za-zäöüÄÖÜß\s&\-\.]+(?:GmbH|AG|KG|e\.K\.|Inc|Ltd|SE|UG))", "context": True},
|
|
{"regex": r"Absender[:\s]*([A-Za-zäöüÄÖÜß\s&\-\.]+)", "context": True},
|
|
{"regex": r"Von[:\s]*([A-Za-zäöüÄÖÜß\s&\-\.]+)", "context": True},
|
|
]
|
|
|
|
# Bekannte Firmen (werden im Text gesucht)
|
|
BEKANNTE_FIRMEN = [
|
|
"Sonepar", "Amazon", "Ebay", "MediaMarkt", "Saturn", "Conrad", "Reichelt",
|
|
"Hornbach", "Bauhaus", "OBI", "Hagebau", "Toom", "Hellweg",
|
|
"Telekom", "Vodafone", "O2", "1&1",
|
|
"Allianz", "HUK", "Provinzial", "DEVK", "Gothaer",
|
|
"IKEA", "Poco", "XXXLutz", "Roller",
|
|
"Alternate", "Mindfactory", "Caseking", "Notebooksbilliger",
|
|
"DHL", "DPD", "Hermes", "UPS", "GLS",
|
|
]
|
|
|
|
|
|
def extrahiere_firma(text: str, absender_email: str = "", spezifische_muster: List[Dict] = None) -> Optional[str]:
|
|
"""
|
|
Extrahiert Firmennamen aus Text oder E-Mail-Absender
|
|
"""
|
|
text_lower = text.lower()
|
|
|
|
# 1. Bekannte Firmen im Text suchen
|
|
for firma in BEKANNTE_FIRMEN:
|
|
if firma.lower() in text_lower:
|
|
return firma
|
|
|
|
# 2. Aus E-Mail-Domain extrahieren
|
|
if absender_email:
|
|
match = re.search(r"@([\w\-]+)\.", absender_email)
|
|
if match:
|
|
domain = match.group(1)
|
|
# Bekannte Domain-Namen kapitalisieren
|
|
for firma in BEKANNTE_FIRMEN:
|
|
if firma.lower() == domain.lower():
|
|
return firma
|
|
return domain.capitalize()
|
|
|
|
# 3. Regex-Muster
|
|
muster_liste = (spezifische_muster or []) + FIRMA_MUSTER
|
|
for muster in muster_liste:
|
|
try:
|
|
match = re.search(muster["regex"], text, re.MULTILINE)
|
|
if match:
|
|
firma = match.group(1).strip()
|
|
if len(firma) >= 2:
|
|
return firma
|
|
except:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
# ============ DOKUMENTTYP ============
|
|
DOKUMENTTYP_KEYWORDS = {
|
|
"Rechnung": ["rechnung", "invoice", "faktura", "bill"],
|
|
"Angebot": ["angebot", "quotation", "quote", "offerte"],
|
|
"Gutschrift": ["gutschrift", "credit note", "erstattung"],
|
|
"Mahnung": ["mahnung", "zahlungserinnerung", "payment reminder"],
|
|
"Lieferschein": ["lieferschein", "delivery note", "packing slip"],
|
|
"Auftragsbestätigung": ["auftragsbestätigung", "order confirmation", "bestellbestätigung"],
|
|
"Vertrag": ["vertrag", "contract", "vereinbarung"],
|
|
"Versicherungsschein": ["versicherungsschein", "police", "versicherungspolice"],
|
|
"Zeugnis": ["zeugnis", "certificate", "zertifikat"],
|
|
"Bescheinigung": ["bescheinigung", "nachweis", "bestätigung"],
|
|
"Kontoauszug": ["kontoauszug", "account statement", "bankbeleg"],
|
|
"Beitragsrechnung": ["beitragsrechnung", "beitragsberechnung", "mitgliedsbeitrag"],
|
|
}
|
|
|
|
|
|
def extrahiere_dokumenttyp(text: str, dateiname: str = "") -> Optional[str]:
|
|
"""
|
|
Erkennt den Dokumenttyp anhand von Keywords
|
|
"""
|
|
text_lower = text.lower() + " " + dateiname.lower()
|
|
|
|
for typ, keywords in DOKUMENTTYP_KEYWORDS.items():
|
|
for keyword in keywords:
|
|
if keyword in text_lower:
|
|
return typ
|
|
|
|
return None
|
|
|
|
|
|
# ============ HAUPTFUNKTION ============
|
|
def extrahiere_alle_felder(text: str, dokument_info: Dict = None,
|
|
regel_extraktion: Dict = None) -> Dict[str, Any]:
|
|
"""
|
|
Extrahiert alle verfügbaren Felder aus einem Dokument
|
|
|
|
Args:
|
|
text: Der extrahierte Text aus dem PDF
|
|
dokument_info: Zusätzliche Infos (absender, original_name, etc.)
|
|
regel_extraktion: Spezifische Extraktionsregeln aus der Regel
|
|
|
|
Returns:
|
|
Dict mit allen extrahierten Feldern
|
|
"""
|
|
dokument_info = dokument_info or {}
|
|
regel_extraktion = regel_extraktion or {}
|
|
|
|
felder = {}
|
|
|
|
# Datum
|
|
datum_muster = regel_extraktion.get("datum", {}).get("muster", [])
|
|
datum = extrahiere_datum(text, datum_muster if isinstance(datum_muster, list) else None)
|
|
if datum:
|
|
felder["datum"] = datum
|
|
|
|
# Betrag
|
|
betrag_muster = regel_extraktion.get("betrag", {}).get("muster", [])
|
|
betrag = extrahiere_betrag(text, betrag_muster if isinstance(betrag_muster, list) else None)
|
|
if betrag:
|
|
felder["betrag"] = betrag
|
|
|
|
# Nummer
|
|
nummer_muster = regel_extraktion.get("nummer", {}).get("muster", [])
|
|
nummer = extrahiere_nummer(text, nummer_muster if isinstance(nummer_muster, list) else None)
|
|
if nummer:
|
|
felder["nummer"] = nummer
|
|
|
|
# Firma
|
|
absender = dokument_info.get("absender", "")
|
|
firma = extrahiere_firma(text, absender)
|
|
if firma:
|
|
felder["firma"] = firma
|
|
|
|
# Dokumenttyp
|
|
dateiname = dokument_info.get("original_name", "")
|
|
typ = extrahiere_dokumenttyp(text, dateiname)
|
|
if typ:
|
|
felder["typ"] = typ
|
|
|
|
# Statische Werte aus Regel übernehmen
|
|
for feld_name, feld_config in regel_extraktion.items():
|
|
if isinstance(feld_config, dict) and "wert" in feld_config:
|
|
felder[feld_name] = feld_config["wert"]
|
|
|
|
return felder
|
|
|
|
|
|
# ============ SCHEMA-BUILDER ============
|
|
def baue_dateiname(schema: str, felder: Dict[str, Any], endung: str = ".pdf") -> str:
|
|
"""
|
|
Baut Dateinamen aus Schema und Feldern.
|
|
Entfernt automatisch Platzhalter und deren Trennzeichen wenn Feld fehlt.
|
|
|
|
Schema-Beispiel: "{datum} - {typ} - {firma} - {nummer} - {betrag} EUR"
|
|
Mit felder = {datum: "2026-10-01", typ: "Rechnung", firma: "Sonepar"}
|
|
Ergebnis: "2026-10-01 - Rechnung - Sonepar.pdf"
|
|
"""
|
|
# Schema ohne Endung verarbeiten
|
|
if schema.lower().endswith(".pdf"):
|
|
schema = schema[:-4]
|
|
|
|
# Platzhalter ersetzen
|
|
result = schema
|
|
for key, value in felder.items():
|
|
placeholder = "{" + key + "}"
|
|
if placeholder in result and value:
|
|
result = result.replace(placeholder, str(value))
|
|
|
|
# Nicht ersetzte Platzhalter und ihre Trennzeichen entfernen
|
|
# Muster: " - {feld}" oder "{feld} - " oder "{feld}"
|
|
result = re.sub(r'\s*-\s*\{[^}]+\}', '', result)
|
|
result = re.sub(r'\{[^}]+\}\s*-\s*', '', result)
|
|
result = re.sub(r'\{[^}]+\}', '', result)
|
|
|
|
# Aufräumen: Doppelte Trennzeichen, Leerzeichen
|
|
result = re.sub(r'\s*-\s*-\s*', ' - ', result)
|
|
result = re.sub(r'\s+', ' ', result)
|
|
result = result.strip(' -')
|
|
|
|
# Ungültige Zeichen entfernen
|
|
invalid_chars = '<>:"/\\|?*'
|
|
for char in invalid_chars:
|
|
result = result.replace(char, "_")
|
|
|
|
# Endung anhängen
|
|
if not result:
|
|
result = "Dokument"
|
|
|
|
return result + endung
|