""" PDF-Processor Modul Text-Extraktion, OCR und ZUGFeRD-Erkennung """ import subprocess from pathlib import Path from typing import Dict, Optional, Tuple import logging import re logger = logging.getLogger(__name__) # Versuche Libraries zu importieren try: import pdfplumber PDFPLUMBER_AVAILABLE = True except ImportError: PDFPLUMBER_AVAILABLE = False logger.warning("pdfplumber nicht installiert") try: from pypdf import PdfReader PYPDF_AVAILABLE = True except ImportError: PYPDF_AVAILABLE = False logger.warning("pypdf nicht installiert") class PDFProcessor: """Verarbeitet PDFs: Text-Extraktion, OCR, ZUGFeRD-Erkennung""" def __init__(self, ocr_language: str = "deu", ocr_dpi: int = 300, backup_ordner: str = None): self.ocr_language = ocr_language self.ocr_dpi = ocr_dpi self.backup_ordner = backup_ordner # Optional: Ordner für Original-Backups vor OCR def verarbeite(self, pdf_pfad: str, ocr_einbetten: bool = True, backup_erstellen: bool = None) -> Dict: """ Vollständige PDF-Verarbeitung Args: pdf_pfad: Pfad zur PDF-Datei ocr_einbetten: Wenn True, wird OCR-Text permanent in die PDF eingebettet. ACHTUNG: Wird bei signierten PDFs und ZUGFeRD automatisch deaktiviert! backup_erstellen: Wenn True, wird vor OCR-Einbettung ein Backup erstellt. None = verwendet self.backup_ordner als Indikator Returns: Dict mit: text, ist_zugferd, zugferd_xml, hat_text, ocr_durchgefuehrt, ist_signiert, backup_pfad """ pfad = Path(pdf_pfad) if not pfad.exists(): return {"fehler": f"Datei nicht gefunden: {pdf_pfad}"} ergebnis = { "pfad": str(pfad), "text": "", "ist_zugferd": False, "zugferd_xml": None, "hat_text": False, "ocr_durchgefuehrt": False, "ist_signiert": False, "ocr_uebersprungen_grund": None, "seiten": 0, "backup_pfad": None } # 1. ZUGFeRD prüfen zugferd_result = self.pruefe_zugferd(pdf_pfad) ergebnis["ist_zugferd"] = zugferd_result["ist_zugferd"] ergebnis["zugferd_xml"] = zugferd_result.get("xml") # 2. Digitale Signatur prüfen ergebnis["ist_signiert"] = self.hat_digitale_signatur(pdf_pfad) # 3. Text extrahieren text, seiten = self.extrahiere_text(pdf_pfad) ergebnis["text"] = text ergebnis["seiten"] = seiten ergebnis["hat_text"] = bool(text and len(text.strip()) > 50) # 4. OCR falls kein Text - aber NICHT bei geschützten PDFs! if not ergebnis["hat_text"]: # Prüfen ob OCR-Einbettung sicher ist if ergebnis["ist_zugferd"]: ergebnis["ocr_uebersprungen_grund"] = "ZUGFeRD-Rechnung - keine Modifikation erlaubt" logger.info(f"OCR übersprungen (ZUGFeRD): {pfad.name}") # Trotzdem versuchen Text zu extrahieren ohne einzubetten ocr_text, _ = self.fuehre_ocr_aus(pdf_pfad, in_place=False) if ocr_text: ergebnis["text"] = ocr_text ergebnis["hat_text"] = True elif ergebnis["ist_signiert"]: ergebnis["ocr_uebersprungen_grund"] = "Digital signiert - keine Modifikation erlaubt" logger.info(f"OCR übersprungen (signiert): {pfad.name}") # Trotzdem versuchen Text zu extrahieren ohne einzubetten ocr_text, _ = self.fuehre_ocr_aus(pdf_pfad, in_place=False) if ocr_text: ergebnis["text"] = ocr_text ergebnis["hat_text"] = True elif ocr_einbetten: # Sicher zu modifizieren - OCR einbetten logger.info(f"Kein Text gefunden, starte OCR mit Einbettung für {pfad.name}") # Backup erstellen wenn gewünscht soll_backup = backup_erstellen if backup_erstellen is not None else bool(self.backup_ordner) if soll_backup and self.backup_ordner: backup_pfad = self._erstelle_backup(pdf_pfad) if backup_pfad: ergebnis["backup_pfad"] = backup_pfad logger.info(f"Backup erstellt: {backup_pfad}") ocr_text, ocr_erfolg = self.fuehre_ocr_aus(pdf_pfad, in_place=True) if ocr_erfolg: ergebnis["text"] = ocr_text ergebnis["hat_text"] = bool(ocr_text and len(ocr_text.strip()) > 50) ergebnis["ocr_durchgefuehrt"] = True else: # OCR ohne Einbettung (nur Text extrahieren) ocr_text, ocr_erfolg = self.fuehre_ocr_aus(pdf_pfad, in_place=False) if ocr_erfolg: ergebnis["text"] = ocr_text ergebnis["hat_text"] = bool(ocr_text and len(ocr_text.strip()) > 50) return ergebnis def _erstelle_backup(self, pdf_pfad: str) -> Optional[str]: """ Erstellt ein Backup der Original-PDF vor der OCR-Einbettung. Returns: Pfad zum Backup oder None bei Fehler """ import shutil from datetime import datetime if not self.backup_ordner: return None try: pfad = Path(pdf_pfad) backup_dir = Path(self.backup_ordner) backup_dir.mkdir(parents=True, exist_ok=True) # Dateiname mit Timestamp für Eindeutigkeit timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"{pfad.stem}_original_{timestamp}{pfad.suffix}" backup_pfad = backup_dir / backup_name # Kopieren (nicht verschieben!) shutil.copy2(pdf_pfad, backup_pfad) logger.info(f"Backup erstellt: {backup_pfad}") return str(backup_pfad) except Exception as e: logger.error(f"Backup-Erstellung fehlgeschlagen: {e}") return None def hat_digitale_signatur(self, pdf_pfad: str) -> bool: """ Prüft ob eine PDF eine digitale Signatur enthält. Signierte PDFs dürfen NICHT verändert werden, da dies die Signatur ungültig macht! Returns: True wenn signiert, False sonst """ if not PYPDF_AVAILABLE: return False try: reader = PdfReader(pdf_pfad) # Methode 1: AcroForm mit SigFlags prüfen if reader.trailer.get("/Root"): root = reader.trailer["/Root"] if hasattr(root, "get_object"): root = root.get_object() acro_form = root.get("/AcroForm") if acro_form: if hasattr(acro_form, "get_object"): acro_form = acro_form.get_object() sig_flags = acro_form.get("/SigFlags") if sig_flags and int(sig_flags) > 0: logger.info(f"Digitale Signatur gefunden (SigFlags): {Path(pdf_pfad).name}") return True # Methode 2: Nach Signatur-Feldern in Seiten suchen for page in reader.pages: if "/Annots" in page: annots = page["/Annots"] if hasattr(annots, "get_object"): annots = annots.get_object() if annots: for annot in annots: if hasattr(annot, "get_object"): annot = annot.get_object() if annot.get("/FT") == "/Sig": logger.info(f"Signatur-Feld gefunden: {Path(pdf_pfad).name}") return True # Methode 3: Nach typischen Signatur-Strings suchen # (Manche Signaturen sind nicht in AcroForm) with open(pdf_pfad, 'rb') as f: content = f.read(50000) # Erste 50KB lesen if b'/Type /Sig' in content or b'/SubFilter /adbe.pkcs7' in content: logger.info(f"Signatur-Marker gefunden: {Path(pdf_pfad).name}") return True except Exception as e: logger.debug(f"Signaturprüfung Fehler: {e}") return False def extrahiere_text(self, pdf_pfad: str) -> Tuple[str, int]: """ Extrahiert Text aus PDF Returns: Tuple von (text, seitenanzahl) """ text_parts = [] seiten = 0 # Methode 1: pdfplumber (besser für Tabellen) if PDFPLUMBER_AVAILABLE: try: with pdfplumber.open(pdf_pfad) as pdf: seiten = len(pdf.pages) for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) if text_parts: return "\n\n".join(text_parts), seiten except Exception as e: logger.debug(f"pdfplumber Fehler: {e}") # Methode 2: pypdf (Fallback) if PYPDF_AVAILABLE: try: reader = PdfReader(pdf_pfad) seiten = len(reader.pages) for page in reader.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) if text_parts: return "\n\n".join(text_parts), seiten except Exception as e: logger.debug(f"pypdf Fehler: {e}") # Methode 3: pdftotext CLI (Fallback) try: result = subprocess.run( ["pdftotext", "-layout", pdf_pfad, "-"], capture_output=True, text=True, timeout=30 ) if result.returncode == 0 and result.stdout.strip(): return result.stdout, seiten except Exception as e: logger.debug(f"pdftotext Fehler: {e}") return "", seiten def pruefe_zugferd(self, pdf_pfad: str) -> Dict: """ Prüft ob PDF eine ZUGFeRD/Factur-X Rechnung ist Returns: Dict mit: ist_zugferd, xml (falls vorhanden) """ ergebnis = {"ist_zugferd": False, "xml": None} # Methode 1: factur-x Library try: from facturx import get_facturx_xml_from_pdf xml_bytes = get_facturx_xml_from_pdf(pdf_pfad) if xml_bytes: ergebnis["ist_zugferd"] = True ergebnis["xml"] = xml_bytes.decode("utf-8") if isinstance(xml_bytes, bytes) else xml_bytes logger.info(f"ZUGFeRD erkannt: {Path(pdf_pfad).name}") return ergebnis except ImportError: logger.debug("factur-x nicht installiert") except Exception as e: logger.debug(f"factur-x Fehler: {e}") # Methode 2: Manuell nach XML-Attachment suchen if PYPDF_AVAILABLE: try: reader = PdfReader(pdf_pfad) if "/Names" in reader.trailer.get("/Root", {}): # Embedded Files prüfen pass # Komplexere Logik hier # Alternativ: Im Text nach ZUGFeRD-Markern suchen for page in reader.pages[:1]: # Nur erste Seite text = page.extract_text() or "" if any(marker in text.upper() for marker in ["ZUGFERD", "FACTUR-X", "EN 16931"]): ergebnis["ist_zugferd"] = True logger.info(f"ZUGFeRD-Marker gefunden: {Path(pdf_pfad).name}") break except Exception as e: logger.debug(f"ZUGFeRD-Prüfung Fehler: {e}") return ergebnis def fuehre_ocr_aus(self, pdf_pfad: str, in_place: bool = True) -> Tuple[str, bool]: """ Führt OCR mit ocrmypdf durch und bettet den Text permanent in die PDF ein. Danach ist die PDF durchsuchbar und Copy&Paste funktioniert. Args: pdf_pfad: Pfad zur PDF-Datei in_place: Wenn True, wird die Original-PDF ersetzt (Standard) Returns: Tuple von (text, erfolg) """ pfad = Path(pdf_pfad) temp_pfad = pfad.with_suffix(".ocr.pdf") try: # ocrmypdf ausführen - Text wird permanent eingebettet result = subprocess.run( [ "ocrmypdf", "--language", self.ocr_language, "--deskew", # Schräge Scans korrigieren "--clean", # Bild verbessern "--rotate-pages", # Seiten automatisch drehen "--skip-text", # Seiten mit vorhandenem Text überspringen "--output-type", "pdfa", # PDF/A für bessere Kompatibilität str(pfad), str(temp_pfad) ], capture_output=True, text=True, timeout=180 # 3 Minuten Timeout ) if result.returncode == 0 and temp_pfad.exists(): if in_place: # Original mit OCR-Version ersetzen pfad.unlink() temp_pfad.rename(pfad) logger.info(f"OCR erfolgreich eingebettet: {pfad.name}") # Text aus OCR-PDF extrahieren text, _ = self.extrahiere_text(str(pfad)) return text, True else: # Nur Text extrahieren, temp löschen text, _ = self.extrahiere_text(str(temp_pfad)) temp_pfad.unlink() return text, True else: logger.error(f"OCR Fehler: {result.stderr}") if temp_pfad.exists(): temp_pfad.unlink() return "", False except subprocess.TimeoutExpired: logger.error(f"OCR Timeout für {pfad.name}") if temp_pfad.exists(): temp_pfad.unlink() return "", False except FileNotFoundError: logger.error("ocrmypdf nicht installiert") return "", False except Exception as e: logger.error(f"OCR Fehler: {e}") if temp_pfad.exists(): temp_pfad.unlink() return "", False def ocr_einbetten(self, pdf_pfad: str) -> Dict: """ Bettet OCR-Text permanent in eine PDF ein (macht sie durchsuchbar). Kann unabhängig von der Sortierung verwendet werden. Returns: Dict mit: erfolg, text, nachricht """ pfad = Path(pdf_pfad) if not pfad.exists(): return {"erfolg": False, "nachricht": f"Datei nicht gefunden: {pdf_pfad}"} # Prüfen ob bereits Text vorhanden text, seiten = self.extrahiere_text(pdf_pfad) if text and len(text.strip()) > 50: return { "erfolg": True, "text": text, "nachricht": "PDF enthält bereits durchsuchbaren Text", "ocr_durchgefuehrt": False } # OCR durchführen und einbetten ocr_text, erfolg = self.fuehre_ocr_aus(pdf_pfad, in_place=True) if erfolg: return { "erfolg": True, "text": ocr_text, "nachricht": "OCR erfolgreich eingebettet - PDF ist jetzt durchsuchbar", "ocr_durchgefuehrt": True } else: return { "erfolg": False, "text": "", "nachricht": "OCR fehlgeschlagen", "ocr_durchgefuehrt": False } def extrahiere_metadaten(self, pdf_pfad: str) -> Dict: """Extrahiert PDF-Metadaten""" metadaten = {} if PYPDF_AVAILABLE: try: reader = PdfReader(pdf_pfad) if reader.metadata: metadaten = { "titel": reader.metadata.get("/Title", ""), "autor": reader.metadata.get("/Author", ""), "ersteller": reader.metadata.get("/Creator", ""), "erstellt": reader.metadata.get("/CreationDate", ""), } except Exception as e: logger.debug(f"Metadaten-Fehler: {e}") return metadaten