docker.dateiverwaltung/Source/backend/app/modules/pdf_processor.py

510 lines
18 KiB
Python
Executable file

"""
PDF-Processor Modul
Text-Extraktion, OCR und ZUGFeRD-Erkennung
"""
import subprocess
from pathlib import Path
from typing import Dict, Optional, Tuple
import logging
import re
logger = logging.getLogger(__name__)
# Versuche Libraries zu importieren
try:
import pdfplumber
PDFPLUMBER_AVAILABLE = True
except ImportError:
PDFPLUMBER_AVAILABLE = False
logger.warning("pdfplumber nicht installiert")
try:
from pypdf import PdfReader
PYPDF_AVAILABLE = True
except ImportError:
PYPDF_AVAILABLE = False
logger.warning("pypdf nicht installiert")
class PDFProcessor:
"""Verarbeitet PDFs: Text-Extraktion, OCR, ZUGFeRD-Erkennung"""
def __init__(self, ocr_language: str = "deu", ocr_dpi: int = 300):
self.ocr_language = ocr_language
self.ocr_dpi = ocr_dpi
def verarbeite(self, pdf_pfad: str, ocr_erlaubt: bool = True, original_backup_pfad: str = None) -> Dict:
"""
Vollständige PDF-Verarbeitung
Args:
pdf_pfad: Pfad zur PDF
ocr_erlaubt: OCR durchführen wenn nötig
original_backup_pfad: Ordner für Original-Backup vor OCR
Returns:
Dict mit: text, ist_zugferd, zugferd_xml, hat_text, ocr_durchgefuehrt
"""
pfad = Path(pdf_pfad)
if not pfad.exists():
return {"fehler": f"Datei nicht gefunden: {pdf_pfad}"}
ergebnis = {
"pfad": str(pfad),
"text": "",
"ist_zugferd": False,
"zugferd_xml": None,
"ist_signiert": False,
"hat_text": False,
"ocr_durchgefuehrt": False,
"original_gesichert": None,
"seiten": 0
}
# 1. ZUGFeRD prüfen
zugferd_result = self.pruefe_zugferd(pdf_pfad)
ergebnis["ist_zugferd"] = zugferd_result["ist_zugferd"]
ergebnis["zugferd_xml"] = zugferd_result.get("xml")
# 2. Signatur prüfen
ergebnis["ist_signiert"] = self.pruefe_signatur(pdf_pfad)
# 3. Text extrahieren
text, seiten = self.extrahiere_text(pdf_pfad)
ergebnis["text"] = text
ergebnis["seiten"] = seiten
ergebnis["hat_text"] = bool(text and len(text.strip()) > 50)
# 4. OCR falls kein Text (aber NICHT bei ZUGFeRD oder signierten PDFs!)
if ocr_erlaubt and not ergebnis["hat_text"] and not ergebnis["ist_zugferd"] and not ergebnis["ist_signiert"]:
# Zusätzliche Sicherheitsprüfung: Attachments auf ZUGFeRD prüfen
# (falls die normale ZUGFeRD-Erkennung fehlgeschlagen ist)
hat_zugferd_attachment = self._hat_zugferd_attachment(pdf_pfad)
if hat_zugferd_attachment:
ergebnis["ist_zugferd"] = True
logger.info(f"ZUGFeRD-Attachment gefunden, überspringe OCR: {pfad.name}")
else:
logger.info(f"Kein Text gefunden, starte OCR für {pfad.name}")
# Original sichern falls gewünscht
if original_backup_pfad:
backup_pfad = self.sichere_original(pdf_pfad, original_backup_pfad)
if backup_pfad:
ergebnis["original_gesichert"] = backup_pfad
logger.info(f"Original gesichert: {backup_pfad}")
ocr_text, ocr_erfolg = self.fuehre_ocr_aus(pdf_pfad)
if ocr_erfolg:
ergebnis["text"] = ocr_text
ergebnis["hat_text"] = bool(ocr_text and len(ocr_text.strip()) > 50)
ergebnis["ocr_durchgefuehrt"] = True
return ergebnis
def sichere_original(self, pdf_pfad: str, backup_ordner: str) -> Optional[str]:
"""Sichert das Original-PDF vor OCR"""
try:
import shutil
pfad = Path(pdf_pfad)
backup_dir = Path(backup_ordner)
backup_dir.mkdir(parents=True, exist_ok=True)
# Eindeutigen Namen generieren
backup_pfad = backup_dir / pfad.name
counter = 1
while backup_pfad.exists():
backup_pfad = backup_dir / f"{pfad.stem}_{counter}{pfad.suffix}"
counter += 1
shutil.copy2(str(pfad), str(backup_pfad))
return str(backup_pfad)
except Exception as e:
logger.error(f"Original-Sicherung fehlgeschlagen: {e}")
return None
def extrahiere_text(self, pdf_pfad: str) -> Tuple[str, int]:
"""
Extrahiert Text aus PDF
Returns:
Tuple von (text, seitenanzahl)
"""
text_parts = []
seiten = 0
# Methode 1: pdfplumber (besser für Tabellen)
if PDFPLUMBER_AVAILABLE:
try:
with pdfplumber.open(pdf_pfad) as pdf:
seiten = len(pdf.pages)
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
if text_parts:
return "\n\n".join(text_parts), seiten
except Exception as e:
logger.debug(f"pdfplumber Fehler: {e}")
# Methode 2: pypdf (Fallback)
if PYPDF_AVAILABLE:
try:
reader = PdfReader(pdf_pfad)
seiten = len(reader.pages)
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
if text_parts:
return "\n\n".join(text_parts), seiten
except Exception as e:
logger.debug(f"pypdf Fehler: {e}")
# Methode 3: pdftotext CLI (Fallback)
try:
result = subprocess.run(
["pdftotext", "-layout", pdf_pfad, "-"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout, seiten
except Exception as e:
logger.debug(f"pdftotext Fehler: {e}")
return "", seiten
def pruefe_zugferd(self, pdf_pfad: str) -> Dict:
"""
Prüft ob PDF eine ZUGFeRD/Factur-X Rechnung ist
Returns:
Dict mit: ist_zugferd, xml (falls vorhanden)
"""
ergebnis = {"ist_zugferd": False, "xml": None}
# Methode 1: factur-x Library
try:
from facturx import get_facturx_xml_from_pdf
xml_bytes = get_facturx_xml_from_pdf(pdf_pfad)
if xml_bytes:
ergebnis["ist_zugferd"] = True
ergebnis["xml"] = xml_bytes.decode("utf-8") if isinstance(xml_bytes, bytes) else xml_bytes
logger.info(f"ZUGFeRD erkannt: {Path(pdf_pfad).name}")
return ergebnis
except ImportError:
logger.debug("factur-x nicht installiert")
except Exception as e:
logger.debug(f"factur-x Fehler: {e}")
# Methode 2: Manuell nach eingebettetem ZUGFeRD-XML suchen
if PYPDF_AVAILABLE:
try:
reader = PdfReader(pdf_pfad)
# ZUGFeRD/Factur-X XML-Dateinamen
zugferd_dateinamen = [
"factur-x.xml",
"zugferd-invoice.xml",
"xrechnung.xml",
"ZUGFeRD-invoice.xml",
]
# Eingebettete Dateien aus dem Catalog extrahieren
if reader.trailer and "/Root" in reader.trailer:
root = reader.trailer["/Root"]
if hasattr(root, "get_object"):
root = root.get_object()
if "/Names" in root:
names = root["/Names"]
if hasattr(names, "get_object"):
names = names.get_object()
if "/EmbeddedFiles" in names:
embedded = names["/EmbeddedFiles"]
if hasattr(embedded, "get_object"):
embedded = embedded.get_object()
# Namen-Array durchsuchen
if "/Names" in embedded:
names_array = embedded["/Names"]
# Format: [name1, ref1, name2, ref2, ...]
for i in range(0, len(names_array), 2):
if i < len(names_array):
dateiname = str(names_array[i]).lower()
if any(zf.lower() in dateiname for zf in zugferd_dateinamen):
ergebnis["ist_zugferd"] = True
logger.info(f"ZUGFeRD-XML gefunden: {Path(pdf_pfad).name}")
return ergebnis
except Exception as e:
logger.debug(f"ZUGFeRD-Prüfung Fehler: {e}")
return ergebnis
def pruefe_signatur(self, pdf_pfad: str) -> bool:
"""
Prüft ob PDF digital signiert ist
Returns:
True wenn signiert, False sonst
"""
if not PYPDF_AVAILABLE:
return False
try:
reader = PdfReader(pdf_pfad)
# Methode 1: AcroForm mit Sig-Feldern prüfen
if "/AcroForm" in reader.trailer.get("/Root", {}):
root = reader.trailer["/Root"]
if "/AcroForm" in root:
acro_form = root["/AcroForm"]
if "/SigFlags" in acro_form:
sig_flags = acro_form["/SigFlags"]
if sig_flags and int(sig_flags) > 0:
logger.info(f"Signatur erkannt (SigFlags): {Path(pdf_pfad).name}")
return True
# Methode 2: Nach /Sig Objekten suchen
for page in reader.pages:
if "/Annots" in page:
annots = page["/Annots"]
if annots:
for annot in annots:
try:
annot_obj = annot.get_object() if hasattr(annot, 'get_object') else annot
if annot_obj.get("/Subtype") == "/Widget":
ft = annot_obj.get("/FT")
if ft == "/Sig":
logger.info(f"Signatur erkannt (Annot): {Path(pdf_pfad).name}")
return True
except:
pass
except Exception as e:
logger.debug(f"Signatur-Prüfung Fehler: {e}")
return False
def _hat_zugferd_attachment(self, pdf_pfad: str) -> bool:
"""
Prüft ob die PDF ein ZUGFeRD/Factur-X XML-Attachment enthält.
Zusätzliche Sicherheitsprüfung vor OCR.
Returns:
True wenn ZUGFeRD-Attachment gefunden
"""
zugferd_dateinamen = [
"factur-x.xml",
"zugferd-invoice.xml",
"xrechnung.xml",
"zugferd-invoice.xml",
]
attachments = self._extrahiere_attachments(pdf_pfad)
for dateiname, _ in attachments:
dateiname_lower = dateiname.lower()
if any(zf.lower() in dateiname_lower for zf in zugferd_dateinamen):
return True
return False
def _extrahiere_attachments(self, pdf_pfad: str) -> list:
"""
Extrahiert alle eingebetteten Dateien (Attachments) aus einer PDF
Returns:
Liste von Tuples: (dateiname, daten_bytes)
"""
attachments = []
if not PYPDF_AVAILABLE:
return attachments
try:
reader = PdfReader(pdf_pfad)
if reader.trailer and "/Root" in reader.trailer:
root = reader.trailer["/Root"]
if hasattr(root, "get_object"):
root = root.get_object()
if "/Names" in root:
names = root["/Names"]
if hasattr(names, "get_object"):
names = names.get_object()
if "/EmbeddedFiles" in names:
embedded = names["/EmbeddedFiles"]
if hasattr(embedded, "get_object"):
embedded = embedded.get_object()
if "/Names" in embedded:
names_array = embedded["/Names"]
# Format: [name1, filespec1, name2, filespec2, ...]
for i in range(0, len(names_array), 2):
if i + 1 < len(names_array):
dateiname = str(names_array[i])
filespec = names_array[i + 1]
if hasattr(filespec, "get_object"):
filespec = filespec.get_object()
if "/EF" in filespec:
ef = filespec["/EF"]
if hasattr(ef, "get_object"):
ef = ef.get_object()
if "/F" in ef:
stream = ef["/F"]
if hasattr(stream, "get_object"):
stream = stream.get_object()
daten = stream.get_data()
attachments.append((dateiname, daten))
logger.debug(f"Attachment extrahiert: {dateiname}")
except Exception as e:
logger.debug(f"Attachment-Extraktion Fehler: {e}")
return attachments
def _fuege_attachments_ein(self, pdf_pfad: str, attachments: list) -> bool:
"""
Fügt Attachments in eine PDF ein
Args:
pdf_pfad: Pfad zur PDF
attachments: Liste von Tuples (dateiname, daten_bytes)
Returns:
True bei Erfolg
"""
if not attachments:
return True
if not PYPDF_AVAILABLE:
return False
try:
from pypdf import PdfWriter
# PDF lesen
reader = PdfReader(pdf_pfad)
writer = PdfWriter()
# Alle Seiten kopieren
for page in reader.pages:
writer.add_page(page)
# Metadaten kopieren
if reader.metadata:
writer.add_metadata(reader.metadata)
# Attachments hinzufügen
for dateiname, daten in attachments:
writer.add_attachment(dateiname, daten)
logger.debug(f"Attachment eingefügt: {dateiname}")
# Temporäre Datei schreiben
temp_pfad = Path(pdf_pfad).with_suffix(".attached.pdf")
with open(temp_pfad, "wb") as f:
writer.write(f)
# Original ersetzen
Path(pdf_pfad).unlink()
temp_pfad.rename(pdf_pfad)
return True
except Exception as e:
logger.error(f"Attachment-Einfügung Fehler: {e}")
return False
def fuehre_ocr_aus(self, pdf_pfad: str) -> Tuple[str, bool]:
"""
Führt OCR mit ocrmypdf durch, erhält dabei eingebettete Attachments
Returns:
Tuple von (text, erfolg)
"""
pfad = Path(pdf_pfad)
temp_pfad = pfad.with_suffix(".ocr.pdf")
# Attachments VOR OCR extrahieren (ocrmypdf verliert diese sonst)
attachments = self._extrahiere_attachments(pdf_pfad)
if attachments:
logger.info(f"{len(attachments)} Attachment(s) gesichert vor OCR")
try:
# ocrmypdf ausführen
result = subprocess.run(
[
"ocrmypdf",
"--language", self.ocr_language,
"--deskew", # Schräge Scans korrigieren
"--clean", # Bild verbessern
"--skip-text", # Seiten mit Text überspringen
"--force-ocr", # OCR erzwingen falls nötig
str(pfad),
str(temp_pfad)
],
capture_output=True,
text=True,
timeout=120 # 2 Minuten Timeout
)
if result.returncode == 0 and temp_pfad.exists():
# Original mit OCR-Version ersetzen
pfad.unlink()
temp_pfad.rename(pfad)
# Attachments wieder einfügen
if attachments:
if self._fuege_attachments_ein(str(pfad), attachments):
logger.info(f"Attachments wiederhergestellt nach OCR")
else:
logger.warning(f"Attachments konnten nicht wiederhergestellt werden")
# Text aus OCR-PDF extrahieren
text, _ = self.extrahiere_text(str(pfad))
return text, True
else:
logger.error(f"OCR Fehler: {result.stderr}")
if temp_pfad.exists():
temp_pfad.unlink()
return "", False
except subprocess.TimeoutExpired:
logger.error(f"OCR Timeout für {pfad.name}")
if temp_pfad.exists():
temp_pfad.unlink()
return "", False
except FileNotFoundError:
logger.error("ocrmypdf nicht installiert")
return "", False
except Exception as e:
logger.error(f"OCR Fehler: {e}")
if temp_pfad.exists():
temp_pfad.unlink()
return "", False
def extrahiere_metadaten(self, pdf_pfad: str) -> Dict:
"""Extrahiert PDF-Metadaten"""
metadaten = {}
if PYPDF_AVAILABLE:
try:
reader = PdfReader(pdf_pfad)
if reader.metadata:
metadaten = {
"titel": reader.metadata.get("/Title", ""),
"autor": reader.metadata.get("/Author", ""),
"ersteller": reader.metadata.get("/Creator", ""),
"erstellt": reader.metadata.get("/CreationDate", ""),
}
except Exception as e:
logger.debug(f"Metadaten-Fehler: {e}")
return metadaten