Neue Features: - Feinsortierung mit Live-Streaming (SSE) - zeigt Fortschritt in Echtzeit - Import/Export für Sortierregeln (JSON) - Sortierregeln-Liste mit Sortieroptionen (Name A-Z/Z-A, Priorität) - Checkbox "Auch Dateinamen prüfen" für Keyword-Matching - Automatische PDF-Seitenrotation bei OCR (90°, 180°, 270°) - Tab-Persistenz über Page-Reload (localStorage) - Modals schließen nur noch über X-Button Bugfixes: - Keywords nutzen jetzt Wortgrenzen (\b) - "rechnung" matched nicht mehr "Berechnung" - Keyword-Prüfung standardmäßig nur auf PDF-Text, nicht Dateinamen - Natürliche Sortierung für Regelnamen (1, 2, 10 statt 1, 10, 2) Technisch: - Async SSE-Generator mit asyncio.sleep(0) für sofortiges Streaming - ocrmypdf mit --rotate-pages Flag - Timeout für OCR auf 3 Minuten erhöht Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
510 lines
18 KiB
Python
Executable file
510 lines
18 KiB
Python
Executable file
"""
|
|
PDF-Processor Modul
|
|
Text-Extraktion, OCR und ZUGFeRD-Erkennung
|
|
"""
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Tuple
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Versuche Libraries zu importieren
|
|
try:
|
|
import pdfplumber
|
|
PDFPLUMBER_AVAILABLE = True
|
|
except ImportError:
|
|
PDFPLUMBER_AVAILABLE = False
|
|
logger.warning("pdfplumber nicht installiert")
|
|
|
|
try:
|
|
from pypdf import PdfReader
|
|
PYPDF_AVAILABLE = True
|
|
except ImportError:
|
|
PYPDF_AVAILABLE = False
|
|
logger.warning("pypdf nicht installiert")
|
|
|
|
|
|
class PDFProcessor:
|
|
"""Verarbeitet PDFs: Text-Extraktion, OCR, ZUGFeRD-Erkennung"""
|
|
|
|
def __init__(self, ocr_language: str = "deu", ocr_dpi: int = 300):
|
|
self.ocr_language = ocr_language
|
|
self.ocr_dpi = ocr_dpi
|
|
|
|
def verarbeite(self, pdf_pfad: str, ocr_erlaubt: bool = True, original_backup_pfad: str = None) -> Dict:
|
|
"""
|
|
Vollständige PDF-Verarbeitung
|
|
|
|
Args:
|
|
pdf_pfad: Pfad zur PDF
|
|
ocr_erlaubt: OCR durchführen wenn nötig
|
|
original_backup_pfad: Ordner für Original-Backup vor OCR
|
|
|
|
Returns:
|
|
Dict mit: text, ist_zugferd, zugferd_xml, hat_text, ocr_durchgefuehrt
|
|
"""
|
|
pfad = Path(pdf_pfad)
|
|
if not pfad.exists():
|
|
return {"fehler": f"Datei nicht gefunden: {pdf_pfad}"}
|
|
|
|
ergebnis = {
|
|
"pfad": str(pfad),
|
|
"text": "",
|
|
"ist_zugferd": False,
|
|
"zugferd_xml": None,
|
|
"ist_signiert": False,
|
|
"hat_text": False,
|
|
"ocr_durchgefuehrt": False,
|
|
"original_gesichert": None,
|
|
"seiten": 0
|
|
}
|
|
|
|
# 1. ZUGFeRD prüfen
|
|
zugferd_result = self.pruefe_zugferd(pdf_pfad)
|
|
ergebnis["ist_zugferd"] = zugferd_result["ist_zugferd"]
|
|
ergebnis["zugferd_xml"] = zugferd_result.get("xml")
|
|
|
|
# 2. Signatur prüfen
|
|
ergebnis["ist_signiert"] = self.pruefe_signatur(pdf_pfad)
|
|
|
|
# 3. Text extrahieren
|
|
text, seiten = self.extrahiere_text(pdf_pfad)
|
|
ergebnis["text"] = text
|
|
ergebnis["seiten"] = seiten
|
|
ergebnis["hat_text"] = bool(text and len(text.strip()) > 50)
|
|
|
|
# 4. OCR falls kein Text (aber NICHT bei ZUGFeRD oder signierten PDFs!)
|
|
if ocr_erlaubt and not ergebnis["hat_text"] and not ergebnis["ist_zugferd"] and not ergebnis["ist_signiert"]:
|
|
# Zusätzliche Sicherheitsprüfung: Attachments auf ZUGFeRD prüfen
|
|
# (falls die normale ZUGFeRD-Erkennung fehlgeschlagen ist)
|
|
hat_zugferd_attachment = self._hat_zugferd_attachment(pdf_pfad)
|
|
if hat_zugferd_attachment:
|
|
ergebnis["ist_zugferd"] = True
|
|
logger.info(f"ZUGFeRD-Attachment gefunden, überspringe OCR: {pfad.name}")
|
|
else:
|
|
logger.info(f"Kein Text gefunden, starte OCR für {pfad.name}")
|
|
|
|
# Original sichern falls gewünscht
|
|
if original_backup_pfad:
|
|
backup_pfad = self.sichere_original(pdf_pfad, original_backup_pfad)
|
|
if backup_pfad:
|
|
ergebnis["original_gesichert"] = backup_pfad
|
|
logger.info(f"Original gesichert: {backup_pfad}")
|
|
|
|
ocr_text, ocr_erfolg = self.fuehre_ocr_aus(pdf_pfad)
|
|
if ocr_erfolg:
|
|
ergebnis["text"] = ocr_text
|
|
ergebnis["hat_text"] = bool(ocr_text and len(ocr_text.strip()) > 50)
|
|
ergebnis["ocr_durchgefuehrt"] = True
|
|
|
|
return ergebnis
|
|
|
|
def sichere_original(self, pdf_pfad: str, backup_ordner: str) -> Optional[str]:
|
|
"""Sichert das Original-PDF vor OCR"""
|
|
try:
|
|
import shutil
|
|
pfad = Path(pdf_pfad)
|
|
backup_dir = Path(backup_ordner)
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Eindeutigen Namen generieren
|
|
backup_pfad = backup_dir / pfad.name
|
|
counter = 1
|
|
while backup_pfad.exists():
|
|
backup_pfad = backup_dir / f"{pfad.stem}_{counter}{pfad.suffix}"
|
|
counter += 1
|
|
|
|
shutil.copy2(str(pfad), str(backup_pfad))
|
|
return str(backup_pfad)
|
|
except Exception as e:
|
|
logger.error(f"Original-Sicherung fehlgeschlagen: {e}")
|
|
return None
|
|
|
|
def extrahiere_text(self, pdf_pfad: str) -> Tuple[str, int]:
|
|
"""
|
|
Extrahiert Text aus PDF
|
|
|
|
Returns:
|
|
Tuple von (text, seitenanzahl)
|
|
"""
|
|
text_parts = []
|
|
seiten = 0
|
|
|
|
# Methode 1: pdfplumber (besser für Tabellen)
|
|
if PDFPLUMBER_AVAILABLE:
|
|
try:
|
|
with pdfplumber.open(pdf_pfad) as pdf:
|
|
seiten = len(pdf.pages)
|
|
for page in pdf.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text_parts.append(page_text)
|
|
if text_parts:
|
|
return "\n\n".join(text_parts), seiten
|
|
except Exception as e:
|
|
logger.debug(f"pdfplumber Fehler: {e}")
|
|
|
|
# Methode 2: pypdf (Fallback)
|
|
if PYPDF_AVAILABLE:
|
|
try:
|
|
reader = PdfReader(pdf_pfad)
|
|
seiten = len(reader.pages)
|
|
for page in reader.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text_parts.append(page_text)
|
|
if text_parts:
|
|
return "\n\n".join(text_parts), seiten
|
|
except Exception as e:
|
|
logger.debug(f"pypdf Fehler: {e}")
|
|
|
|
# Methode 3: pdftotext CLI (Fallback)
|
|
try:
|
|
result = subprocess.run(
|
|
["pdftotext", "-layout", pdf_pfad, "-"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return result.stdout, seiten
|
|
except Exception as e:
|
|
logger.debug(f"pdftotext Fehler: {e}")
|
|
|
|
return "", seiten
|
|
|
|
def pruefe_zugferd(self, pdf_pfad: str) -> Dict:
|
|
"""
|
|
Prüft ob PDF eine ZUGFeRD/Factur-X Rechnung ist
|
|
|
|
Returns:
|
|
Dict mit: ist_zugferd, xml (falls vorhanden)
|
|
"""
|
|
ergebnis = {"ist_zugferd": False, "xml": None}
|
|
|
|
# Methode 1: factur-x Library
|
|
try:
|
|
from facturx import get_facturx_xml_from_pdf
|
|
xml_bytes = get_facturx_xml_from_pdf(pdf_pfad)
|
|
if xml_bytes:
|
|
ergebnis["ist_zugferd"] = True
|
|
ergebnis["xml"] = xml_bytes.decode("utf-8") if isinstance(xml_bytes, bytes) else xml_bytes
|
|
logger.info(f"ZUGFeRD erkannt: {Path(pdf_pfad).name}")
|
|
return ergebnis
|
|
except ImportError:
|
|
logger.debug("factur-x nicht installiert")
|
|
except Exception as e:
|
|
logger.debug(f"factur-x Fehler: {e}")
|
|
|
|
# Methode 2: Manuell nach eingebettetem ZUGFeRD-XML suchen
|
|
if PYPDF_AVAILABLE:
|
|
try:
|
|
reader = PdfReader(pdf_pfad)
|
|
|
|
# ZUGFeRD/Factur-X XML-Dateinamen
|
|
zugferd_dateinamen = [
|
|
"factur-x.xml",
|
|
"zugferd-invoice.xml",
|
|
"xrechnung.xml",
|
|
"ZUGFeRD-invoice.xml",
|
|
]
|
|
|
|
# Eingebettete Dateien aus dem Catalog extrahieren
|
|
if reader.trailer and "/Root" in reader.trailer:
|
|
root = reader.trailer["/Root"]
|
|
if hasattr(root, "get_object"):
|
|
root = root.get_object()
|
|
|
|
if "/Names" in root:
|
|
names = root["/Names"]
|
|
if hasattr(names, "get_object"):
|
|
names = names.get_object()
|
|
|
|
if "/EmbeddedFiles" in names:
|
|
embedded = names["/EmbeddedFiles"]
|
|
if hasattr(embedded, "get_object"):
|
|
embedded = embedded.get_object()
|
|
|
|
# Namen-Array durchsuchen
|
|
if "/Names" in embedded:
|
|
names_array = embedded["/Names"]
|
|
# Format: [name1, ref1, name2, ref2, ...]
|
|
for i in range(0, len(names_array), 2):
|
|
if i < len(names_array):
|
|
dateiname = str(names_array[i]).lower()
|
|
if any(zf.lower() in dateiname for zf in zugferd_dateinamen):
|
|
ergebnis["ist_zugferd"] = True
|
|
logger.info(f"ZUGFeRD-XML gefunden: {Path(pdf_pfad).name}")
|
|
return ergebnis
|
|
except Exception as e:
|
|
logger.debug(f"ZUGFeRD-Prüfung Fehler: {e}")
|
|
|
|
return ergebnis
|
|
|
|
def pruefe_signatur(self, pdf_pfad: str) -> bool:
|
|
"""
|
|
Prüft ob PDF digital signiert ist
|
|
|
|
Returns:
|
|
True wenn signiert, False sonst
|
|
"""
|
|
if not PYPDF_AVAILABLE:
|
|
return False
|
|
|
|
try:
|
|
reader = PdfReader(pdf_pfad)
|
|
|
|
# Methode 1: AcroForm mit Sig-Feldern prüfen
|
|
if "/AcroForm" in reader.trailer.get("/Root", {}):
|
|
root = reader.trailer["/Root"]
|
|
if "/AcroForm" in root:
|
|
acro_form = root["/AcroForm"]
|
|
if "/SigFlags" in acro_form:
|
|
sig_flags = acro_form["/SigFlags"]
|
|
if sig_flags and int(sig_flags) > 0:
|
|
logger.info(f"Signatur erkannt (SigFlags): {Path(pdf_pfad).name}")
|
|
return True
|
|
|
|
# Methode 2: Nach /Sig Objekten suchen
|
|
for page in reader.pages:
|
|
if "/Annots" in page:
|
|
annots = page["/Annots"]
|
|
if annots:
|
|
for annot in annots:
|
|
try:
|
|
annot_obj = annot.get_object() if hasattr(annot, 'get_object') else annot
|
|
if annot_obj.get("/Subtype") == "/Widget":
|
|
ft = annot_obj.get("/FT")
|
|
if ft == "/Sig":
|
|
logger.info(f"Signatur erkannt (Annot): {Path(pdf_pfad).name}")
|
|
return True
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Signatur-Prüfung Fehler: {e}")
|
|
|
|
return False
|
|
|
|
def _hat_zugferd_attachment(self, pdf_pfad: str) -> bool:
|
|
"""
|
|
Prüft ob die PDF ein ZUGFeRD/Factur-X XML-Attachment enthält.
|
|
Zusätzliche Sicherheitsprüfung vor OCR.
|
|
|
|
Returns:
|
|
True wenn ZUGFeRD-Attachment gefunden
|
|
"""
|
|
zugferd_dateinamen = [
|
|
"factur-x.xml",
|
|
"zugferd-invoice.xml",
|
|
"xrechnung.xml",
|
|
"zugferd-invoice.xml",
|
|
]
|
|
|
|
attachments = self._extrahiere_attachments(pdf_pfad)
|
|
for dateiname, _ in attachments:
|
|
dateiname_lower = dateiname.lower()
|
|
if any(zf.lower() in dateiname_lower for zf in zugferd_dateinamen):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _extrahiere_attachments(self, pdf_pfad: str) -> list:
|
|
"""
|
|
Extrahiert alle eingebetteten Dateien (Attachments) aus einer PDF
|
|
|
|
Returns:
|
|
Liste von Tuples: (dateiname, daten_bytes)
|
|
"""
|
|
attachments = []
|
|
|
|
if not PYPDF_AVAILABLE:
|
|
return attachments
|
|
|
|
try:
|
|
reader = PdfReader(pdf_pfad)
|
|
|
|
if reader.trailer and "/Root" in reader.trailer:
|
|
root = reader.trailer["/Root"]
|
|
if hasattr(root, "get_object"):
|
|
root = root.get_object()
|
|
|
|
if "/Names" in root:
|
|
names = root["/Names"]
|
|
if hasattr(names, "get_object"):
|
|
names = names.get_object()
|
|
|
|
if "/EmbeddedFiles" in names:
|
|
embedded = names["/EmbeddedFiles"]
|
|
if hasattr(embedded, "get_object"):
|
|
embedded = embedded.get_object()
|
|
|
|
if "/Names" in embedded:
|
|
names_array = embedded["/Names"]
|
|
# Format: [name1, filespec1, name2, filespec2, ...]
|
|
for i in range(0, len(names_array), 2):
|
|
if i + 1 < len(names_array):
|
|
dateiname = str(names_array[i])
|
|
filespec = names_array[i + 1]
|
|
if hasattr(filespec, "get_object"):
|
|
filespec = filespec.get_object()
|
|
|
|
if "/EF" in filespec:
|
|
ef = filespec["/EF"]
|
|
if hasattr(ef, "get_object"):
|
|
ef = ef.get_object()
|
|
|
|
if "/F" in ef:
|
|
stream = ef["/F"]
|
|
if hasattr(stream, "get_object"):
|
|
stream = stream.get_object()
|
|
|
|
daten = stream.get_data()
|
|
attachments.append((dateiname, daten))
|
|
logger.debug(f"Attachment extrahiert: {dateiname}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Attachment-Extraktion Fehler: {e}")
|
|
|
|
return attachments
|
|
|
|
def _fuege_attachments_ein(self, pdf_pfad: str, attachments: list) -> bool:
|
|
"""
|
|
Fügt Attachments in eine PDF ein
|
|
|
|
Args:
|
|
pdf_pfad: Pfad zur PDF
|
|
attachments: Liste von Tuples (dateiname, daten_bytes)
|
|
|
|
Returns:
|
|
True bei Erfolg
|
|
"""
|
|
if not attachments:
|
|
return True
|
|
|
|
if not PYPDF_AVAILABLE:
|
|
return False
|
|
|
|
try:
|
|
from pypdf import PdfWriter
|
|
|
|
# PDF lesen
|
|
reader = PdfReader(pdf_pfad)
|
|
writer = PdfWriter()
|
|
|
|
# Alle Seiten kopieren
|
|
for page in reader.pages:
|
|
writer.add_page(page)
|
|
|
|
# Metadaten kopieren
|
|
if reader.metadata:
|
|
writer.add_metadata(reader.metadata)
|
|
|
|
# Attachments hinzufügen
|
|
for dateiname, daten in attachments:
|
|
writer.add_attachment(dateiname, daten)
|
|
logger.debug(f"Attachment eingefügt: {dateiname}")
|
|
|
|
# Temporäre Datei schreiben
|
|
temp_pfad = Path(pdf_pfad).with_suffix(".attached.pdf")
|
|
with open(temp_pfad, "wb") as f:
|
|
writer.write(f)
|
|
|
|
# Original ersetzen
|
|
Path(pdf_pfad).unlink()
|
|
temp_pfad.rename(pdf_pfad)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Attachment-Einfügung Fehler: {e}")
|
|
return False
|
|
|
|
def fuehre_ocr_aus(self, pdf_pfad: str) -> Tuple[str, bool]:
|
|
"""
|
|
Führt OCR mit ocrmypdf durch, erhält dabei eingebettete Attachments
|
|
|
|
Returns:
|
|
Tuple von (text, erfolg)
|
|
"""
|
|
pfad = Path(pdf_pfad)
|
|
temp_pfad = pfad.with_suffix(".ocr.pdf")
|
|
|
|
# Attachments VOR OCR extrahieren (ocrmypdf verliert diese sonst)
|
|
attachments = self._extrahiere_attachments(pdf_pfad)
|
|
if attachments:
|
|
logger.info(f"{len(attachments)} Attachment(s) gesichert vor OCR")
|
|
|
|
try:
|
|
# ocrmypdf ausführen
|
|
result = subprocess.run(
|
|
[
|
|
"ocrmypdf",
|
|
"--language", self.ocr_language,
|
|
"--deskew", # Schräge Scans korrigieren
|
|
"--rotate-pages", # Automatische Seitenrotation (90°, 180°, 270°)
|
|
"--clean", # Bild verbessern
|
|
"--skip-text", # Seiten mit Text überspringen
|
|
str(pfad),
|
|
str(temp_pfad)
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=180 # 3 Minuten Timeout (Rotation braucht etwas länger)
|
|
)
|
|
|
|
if result.returncode == 0 and temp_pfad.exists():
|
|
# Original mit OCR-Version ersetzen
|
|
pfad.unlink()
|
|
temp_pfad.rename(pfad)
|
|
|
|
# Attachments wieder einfügen
|
|
if attachments:
|
|
if self._fuege_attachments_ein(str(pfad), attachments):
|
|
logger.info(f"Attachments wiederhergestellt nach OCR")
|
|
else:
|
|
logger.warning(f"Attachments konnten nicht wiederhergestellt werden")
|
|
|
|
# Text aus OCR-PDF extrahieren
|
|
text, _ = self.extrahiere_text(str(pfad))
|
|
return text, True
|
|
else:
|
|
logger.error(f"OCR Fehler: {result.stderr}")
|
|
if temp_pfad.exists():
|
|
temp_pfad.unlink()
|
|
return "", False
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error(f"OCR Timeout für {pfad.name}")
|
|
if temp_pfad.exists():
|
|
temp_pfad.unlink()
|
|
return "", False
|
|
except FileNotFoundError:
|
|
logger.error("ocrmypdf nicht installiert")
|
|
return "", False
|
|
except Exception as e:
|
|
logger.error(f"OCR Fehler: {e}")
|
|
if temp_pfad.exists():
|
|
temp_pfad.unlink()
|
|
return "", False
|
|
|
|
def extrahiere_metadaten(self, pdf_pfad: str) -> Dict:
|
|
"""Extrahiert PDF-Metadaten"""
|
|
metadaten = {}
|
|
|
|
if PYPDF_AVAILABLE:
|
|
try:
|
|
reader = PdfReader(pdf_pfad)
|
|
if reader.metadata:
|
|
metadaten = {
|
|
"titel": reader.metadata.get("/Title", ""),
|
|
"autor": reader.metadata.get("/Author", ""),
|
|
"ersteller": reader.metadata.get("/Creator", ""),
|
|
"erstellt": reader.metadata.get("/CreationDate", ""),
|
|
}
|
|
except Exception as e:
|
|
logger.debug(f"Metadaten-Fehler: {e}")
|
|
|
|
return metadaten
|