""" API Routes - Getrennte Bereiche: Mail-Abruf und Datei-Sortierung """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from typing import List, Optional from pydantic import BaseModel from datetime import datetime from pathlib import Path import json import asyncio from ..models.database import get_db, Postfach, QuellOrdner, SortierRegel, VerarbeiteteDatei, VerarbeiteteMail from ..modules.mail_fetcher import MailFetcher, abruf_manager from ..modules.pdf_processor import PDFProcessor from ..modules.sorter import Sorter router = APIRouter(prefix="/api", tags=["api"]) # ============ Pydantic Models ============ class PostfachCreate(BaseModel): name: str imap_server: str imap_port: int = 993 email: str passwort: str ordner: str = "INBOX" alle_ordner: bool = False # Alle IMAP-Ordner durchsuchen nur_ungelesen: bool = False # Nur ungelesene Mails (False = alle) ab_datum: Optional[datetime] = None # Nur Mails ab diesem Datum ziel_ordner: str erlaubte_typen: List[str] = [".pdf"] max_groesse_mb: int = 25 class PostfachResponse(BaseModel): id: int name: str imap_server: str email: str ordner: str alle_ordner: bool nur_ungelesen: bool ab_datum: Optional[datetime] ziel_ordner: str erlaubte_typen: List[str] max_groesse_mb: int letzter_abruf: Optional[datetime] letzte_anzahl: int class Config: from_attributes = True class OrdnerCreate(BaseModel): name: str pfad: str ziel_ordner: str rekursiv: bool = True dateitypen: List[str] = [".pdf", ".jpg", ".jpeg", ".png", ".tiff"] class OrdnerResponse(BaseModel): id: int name: str pfad: str ziel_ordner: str rekursiv: bool dateitypen: List[str] aktiv: bool class Config: from_attributes = True class RegelCreate(BaseModel): name: str prioritaet: int = 100 muster: dict = {} extraktion: dict = {} schema: str = "{datum} - Dokument.pdf" unterordner: Optional[str] = None class RegelResponse(BaseModel): id: int name: str prioritaet: int aktiv: bool muster: dict extraktion: dict schema: str unterordner: Optional[str] class Config: from_attributes = True class RegelTestRequest(BaseModel): regel: dict text: str # ============ Verzeichnis-Browser ============ @router.get("/browse") def browse_directory(path: str = "/"): """Listet Verzeichnisse für File-Browser""" import os # Sicherheit: Nur bestimmte Basispfade erlauben allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] path = os.path.abspath(path) # Prüfen ob Pfad erlaubt is_allowed = any(path.startswith(base) for base in allowed_bases) or path == "/" if not is_allowed: return {"error": "Pfad nicht erlaubt", "entries": []} if not os.path.exists(path): return {"error": "Pfad existiert nicht", "entries": []} if not os.path.isdir(path): return {"error": "Kein Verzeichnis", "entries": []} # Berechtigungen des aktuellen Verzeichnisses prüfen current_readable = os.access(path, os.R_OK) current_writable = os.access(path, os.W_OK) try: entries = [] for entry in sorted(os.listdir(path)): full_path = os.path.join(path, entry) if os.path.isdir(full_path): # Berechtigungen für jeden Unterordner prüfen readable = os.access(full_path, os.R_OK) writable = os.access(full_path, os.W_OK) entries.append({ "name": entry, "path": full_path, "type": "directory", "readable": readable, "writable": writable }) return { "current": path, "parent": os.path.dirname(path) if path != "/" else None, "entries": entries, "readable": current_readable, "writable": current_writable } except PermissionError: return {"error": "Zugriff verweigert", "entries": []} # ============ BEREICH 1: Postfächer ============ @router.get("/postfaecher", response_model=List[PostfachResponse]) def liste_postfaecher(db: Session = Depends(get_db)): return db.query(Postfach).all() @router.post("/postfaecher", response_model=PostfachResponse) def erstelle_postfach(data: PostfachCreate, db: Session = Depends(get_db)): postfach = Postfach(**data.dict()) db.add(postfach) db.commit() db.refresh(postfach) return postfach @router.put("/postfaecher/{id}", response_model=PostfachResponse) def aktualisiere_postfach(id: int, data: PostfachCreate, db: Session = Depends(get_db)): postfach = db.query(Postfach).filter(Postfach.id == id).first() if not postfach: raise HTTPException(status_code=404, detail="Nicht gefunden") update_data = data.dict() # Passwort nur aktualisieren wenn nicht leer if not update_data.get("passwort"): del update_data["passwort"] for key, value in update_data.items(): setattr(postfach, key, value) db.commit() db.refresh(postfach) return postfach @router.delete("/postfaecher/{id}") def loesche_postfach(id: int, db: Session = Depends(get_db)): postfach = db.query(Postfach).filter(Postfach.id == id).first() if not postfach: raise HTTPException(status_code=404, detail="Nicht gefunden") db.delete(postfach) db.commit() return {"message": "Gelöscht"} @router.post("/postfaecher/{id}/test") def teste_postfach(id: int, db: Session = Depends(get_db)): postfach = db.query(Postfach).filter(Postfach.id == id).first() if not postfach: raise HTTPException(status_code=404, detail="Nicht gefunden") fetcher = MailFetcher({ "imap_server": postfach.imap_server, "imap_port": postfach.imap_port, "email": postfach.email, "passwort": postfach.passwort, "ordner": postfach.ordner }) return fetcher.test_connection() @router.get("/postfaecher/{id}/abrufen/stream") def rufe_postfach_ab_stream(id: int, db: Session = Depends(get_db)): """Streaming-Endpoint für Mail-Abruf mit Live-Updates""" postfach = db.query(Postfach).filter(Postfach.id == id).first() if not postfach: raise HTTPException(status_code=404, detail="Nicht gefunden") # Prüfen ob bereits ein Abruf läuft if not abruf_manager.starten(id): raise HTTPException(status_code=409, detail="Ein Abruf läuft bereits für dieses Postfach") # Daten kopieren für Generator (Session ist nach return nicht mehr verfügbar) pf_data = { "id": postfach.id, "name": postfach.name, "imap_server": postfach.imap_server, "imap_port": postfach.imap_port, "email": postfach.email, "passwort": postfach.passwort, "ordner": postfach.ordner, "alle_ordner": postfach.alle_ordner, "erlaubte_typen": postfach.erlaubte_typen, "max_groesse_mb": postfach.max_groesse_mb, "ziel_ordner": postfach.ziel_ordner, "ab_datum": postfach.ab_datum } # Bereits verarbeitete Message-IDs laden bereits_verarbeitet = set( row.message_id for row in db.query(VerarbeiteteMail.message_id) .filter(VerarbeiteteMail.postfach_id == id) .all() ) def event_generator(): from ..models.database import SessionLocal def send_event(data): return f"data: {json.dumps(data)}\n\n" yield send_event({"type": "start", "postfach": pf_data["name"], "bereits_verarbeitet": len(bereits_verarbeitet)}) # Zielordner erstellen ziel = Path(pf_data["ziel_ordner"]) ziel.mkdir(parents=True, exist_ok=True) fetcher = MailFetcher({ "imap_server": pf_data["imap_server"], "imap_port": pf_data["imap_port"], "email": pf_data["email"], "passwort": pf_data["passwort"], "ordner": pf_data["ordner"], "erlaubte_typen": pf_data["erlaubte_typen"], "max_groesse_mb": pf_data["max_groesse_mb"] }) attachments = [] abgebrochen = False try: # Generator für streaming mit Abbruch-Callback for event in fetcher.fetch_attachments_generator( ziel, nur_ungelesen=False, alle_ordner=pf_data["alle_ordner"], bereits_verarbeitet=bereits_verarbeitet, abbruch_callback=lambda: abruf_manager.soll_abbrechen(pf_data["id"]), ab_datum=pf_data["ab_datum"] ): yield send_event(event) if event.get("type") == "datei": attachments.append(event) elif event.get("type") == "abgebrochen": abgebrochen = True # DB-Session für Speicherung session = SessionLocal() try: verarbeitete_msg_ids = set() for att in attachments: msg_id = att.get("message_id") if msg_id and msg_id not in verarbeitete_msg_ids: verarbeitete_msg_ids.add(msg_id) session.add(VerarbeiteteMail( postfach_id=pf_data["id"], message_id=msg_id, ordner=att.get("ordner", ""), betreff=att.get("betreff", "")[:500] if att.get("betreff") else None, absender=att.get("absender", "")[:255] if att.get("absender") else None, anzahl_attachments=1 )) # Postfach aktualisieren pf = session.query(Postfach).filter(Postfach.id == pf_data["id"]).first() if pf: pf.letzter_abruf = datetime.utcnow() pf.letzte_anzahl = len(attachments) session.commit() finally: session.close() if abgebrochen: yield send_event({"type": "fertig", "anzahl": len(attachments), "abgebrochen": True}) else: yield send_event({"type": "fertig", "anzahl": len(attachments)}) except Exception as e: yield send_event({"type": "fehler", "nachricht": str(e)}) finally: fetcher.disconnect() abruf_manager.beenden(pf_data["id"]) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @router.post("/postfaecher/{id}/abrufen/stoppen") def stoppe_postfach_abruf(id: int): """Stoppt einen laufenden Mail-Abruf""" if abruf_manager.stoppen(id): return {"erfolg": True, "nachricht": "Abruf wird gestoppt"} return {"erfolg": False, "nachricht": "Kein aktiver Abruf für dieses Postfach"} @router.get("/postfaecher/status") def postfach_status(): """Gibt Status aller laufenden Abrufe zurück""" aktive = abruf_manager.alle_aktiven() return { "aktive_abrufe": [ { "postfach_id": pid, "gestartet": info["gestartet"].isoformat(), "wird_abgebrochen": info["abbrechen"] } for pid, info in aktive.items() ] } @router.post("/postfaecher/{id}/abrufen") def rufe_postfach_ab(id: int, db: Session = Depends(get_db)): postfach = db.query(Postfach).filter(Postfach.id == id).first() if not postfach: raise HTTPException(status_code=404, detail="Nicht gefunden") # Bereits verarbeitete Message-IDs laden bereits_verarbeitet = set( row.message_id for row in db.query(VerarbeiteteMail.message_id) .filter(VerarbeiteteMail.postfach_id == id) .all() ) # Zielordner erstellen ziel = Path(postfach.ziel_ordner) ziel.mkdir(parents=True, exist_ok=True) fetcher = MailFetcher({ "imap_server": postfach.imap_server, "imap_port": postfach.imap_port, "email": postfach.email, "passwort": postfach.passwort, "ordner": postfach.ordner, "erlaubte_typen": postfach.erlaubte_typen, "max_groesse_mb": postfach.max_groesse_mb }) try: attachments = fetcher.fetch_attachments( ziel, nur_ungelesen=False, # Alle Mails durchsuchen alle_ordner=postfach.alle_ordner, bereits_verarbeitet=bereits_verarbeitet ) # Verarbeitete Mails in DB speichern verarbeitete_msg_ids = set() for att in attachments: msg_id = att.get("message_id") if msg_id and msg_id not in verarbeitete_msg_ids: verarbeitete_msg_ids.add(msg_id) db.add(VerarbeiteteMail( postfach_id=id, message_id=msg_id, ordner=att.get("ordner", ""), betreff=att.get("betreff", "")[:500] if att.get("betreff") else None, absender=att.get("absender", "")[:255] if att.get("absender") else None, anzahl_attachments=1 )) postfach.letzter_abruf = datetime.utcnow() postfach.letzte_anzahl = len(attachments) db.commit() return { "ergebnisse": [{ "postfach": postfach.name, "anzahl": len(attachments), "dateien": [a["original_name"] for a in attachments], "bereits_verarbeitet": len(bereits_verarbeitet) }] } except Exception as e: return { "ergebnisse": [{ "postfach": postfach.name, "fehler": str(e) }] } finally: fetcher.disconnect() @router.post("/postfaecher/abrufen-alle") def rufe_alle_postfaecher_ab(db: Session = Depends(get_db)): postfaecher = db.query(Postfach).filter(Postfach.aktiv == True).all() ergebnisse = [] for postfach in postfaecher: ziel = Path(postfach.ziel_ordner) ziel.mkdir(parents=True, exist_ok=True) fetcher = MailFetcher({ "imap_server": postfach.imap_server, "imap_port": postfach.imap_port, "email": postfach.email, "passwort": postfach.passwort, "ordner": postfach.ordner, "erlaubte_typen": postfach.erlaubte_typen, "max_groesse_mb": postfach.max_groesse_mb }) try: attachments = fetcher.fetch_attachments(ziel) postfach.letzter_abruf = datetime.utcnow() postfach.letzte_anzahl = len(attachments) ergebnisse.append({ "postfach": postfach.name, "anzahl": len(attachments), "dateien": [a["original_name"] for a in attachments] }) except Exception as e: ergebnisse.append({ "postfach": postfach.name, "fehler": str(e) }) finally: fetcher.disconnect() db.commit() return {"ergebnisse": ergebnisse} # ============ BEREICH 2: Quell-Ordner ============ @router.get("/ordner", response_model=List[OrdnerResponse]) def liste_ordner(db: Session = Depends(get_db)): return db.query(QuellOrdner).all() @router.post("/ordner", response_model=OrdnerResponse) def erstelle_ordner(data: OrdnerCreate, db: Session = Depends(get_db)): ordner = QuellOrdner(**data.dict()) db.add(ordner) db.commit() db.refresh(ordner) return ordner @router.delete("/ordner/{id}") def loesche_ordner(id: int, db: Session = Depends(get_db)): ordner = db.query(QuellOrdner).filter(QuellOrdner.id == id).first() if not ordner: raise HTTPException(status_code=404, detail="Nicht gefunden") db.delete(ordner) db.commit() return {"message": "Gelöscht"} @router.get("/ordner/{id}/scannen") def scanne_ordner(id: int, db: Session = Depends(get_db)): ordner = db.query(QuellOrdner).filter(QuellOrdner.id == id).first() if not ordner: raise HTTPException(status_code=404, detail="Nicht gefunden") pfad = Path(ordner.pfad) if not pfad.exists(): return {"anzahl": 0, "fehler": "Ordner existiert nicht"} # Dateien sammeln (rekursiv oder nicht) dateien = [] pattern = "**/*" if ordner.rekursiv else "*" for f in pfad.glob(pattern): if f.is_file() and f.suffix.lower() in [t.lower() for t in ordner.dateitypen]: dateien.append(f) return {"anzahl": len(dateien), "dateien": [str(f.relative_to(pfad)) for f in dateien[:30]]} # ============ Regeln ============ @router.get("/regeln", response_model=List[RegelResponse]) def liste_regeln(db: Session = Depends(get_db)): return db.query(SortierRegel).order_by(SortierRegel.prioritaet).all() @router.post("/regeln", response_model=RegelResponse) def erstelle_regel(data: RegelCreate, db: Session = Depends(get_db)): regel = SortierRegel(**data.dict()) db.add(regel) db.commit() db.refresh(regel) return regel @router.put("/regeln/{id}", response_model=RegelResponse) def aktualisiere_regel(id: int, data: RegelCreate, db: Session = Depends(get_db)): regel = db.query(SortierRegel).filter(SortierRegel.id == id).first() if not regel: raise HTTPException(status_code=404, detail="Nicht gefunden") for key, value in data.dict().items(): setattr(regel, key, value) db.commit() db.refresh(regel) return regel @router.delete("/regeln/{id}") def loesche_regel(id: int, db: Session = Depends(get_db)): regel = db.query(SortierRegel).filter(SortierRegel.id == id).first() if not regel: raise HTTPException(status_code=404, detail="Nicht gefunden") db.delete(regel) db.commit() return {"message": "Gelöscht"} class PrioritaetUpdate(BaseModel): prioritaet: int @router.put("/regeln/{id}/prioritaet") def aendere_prioritaet(id: int, data: PrioritaetUpdate, db: Session = Depends(get_db)): """Ändert die Priorität einer Regel""" regel = db.query(SortierRegel).filter(SortierRegel.id == id).first() if not regel: raise HTTPException(status_code=404, detail="Nicht gefunden") regel.prioritaet = data.prioritaet db.commit() return {"id": regel.id, "prioritaet": regel.prioritaet} class RegelReihenfolge(BaseModel): reihenfolge: List[int] # Liste von Regel-IDs in gewünschter Reihenfolge @router.post("/regeln/reihenfolge") def setze_reihenfolge(data: RegelReihenfolge, db: Session = Depends(get_db)): """ Setzt die Reihenfolge aller Regeln basierend auf der übergebenen ID-Liste. Prioritäten werden automatisch vergeben (10, 20, 30, ...) """ for index, regel_id in enumerate(data.reihenfolge): regel = db.query(SortierRegel).filter(SortierRegel.id == regel_id).first() if regel: regel.prioritaet = (index + 1) * 10 db.commit() return {"erfolg": True, "nachricht": f"{len(data.reihenfolge)} Regeln neu sortiert"} @router.post("/regeln/test") def teste_regel(data: RegelTestRequest): regel = data.regel regel["aktiv"] = True regel["prioritaet"] = 1 sorter = Sorter([regel]) doc_info = {"text": data.text, "original_name": "test.pdf", "absender": ""} passend = sorter.finde_passende_regel(doc_info) if passend: extrahiert = sorter.extrahiere_felder(passend, doc_info) dateiname = sorter.generiere_dateinamen(passend, extrahiert) return {"passt": True, "extrahiert": extrahiert, "dateiname": dateiname} return {"passt": False} # ============ Sortierung ============ def sammle_dateien(ordner: QuellOrdner) -> list: """Sammelt alle Dateien aus einem Ordner (rekursiv oder nicht)""" pfad = Path(ordner.pfad) if not pfad.exists(): return [] dateien = [] pattern = "**/*" if ordner.rekursiv else "*" erlaubte = [t.lower() for t in (ordner.dateitypen or [".pdf"])] for f in pfad.glob(pattern): if f.is_file() and f.suffix.lower() in erlaubte: dateien.append(f) return dateien # Sortierungs-Manager für Abbruch class SortierungsManager: """Verwaltet laufende Sortierungen""" def __init__(self): self._aktiv = False self._abbrechen = False self._lock = threading.Lock() def starten(self) -> bool: with self._lock: if self._aktiv: return False self._aktiv = True self._abbrechen = False return True def stoppen(self): with self._lock: self._abbrechen = True def beenden(self): with self._lock: self._aktiv = False self._abbrechen = False def soll_abbrechen(self) -> bool: with self._lock: return self._abbrechen def ist_aktiv(self) -> bool: with self._lock: return self._aktiv import threading sortierungs_manager = SortierungsManager() @router.get("/sortierung/stream") def starte_sortierung_stream( db: Session = Depends(get_db), testmodus: bool = False, ocr_backup_ordner: str = None ): """ Streaming-Endpoint für Sortierung mit Live-Updates Args: testmodus: Wenn True, werden Dateien nur analysiert aber NICHT verschoben. Perfekt zum Testen von Regeln ohne Dateien zu bewegen. ocr_backup_ordner: Optionaler Pfad für Backups von PDFs vor OCR-Einbettung. Wenn gesetzt, werden Originale vor OCR dorthin kopiert. """ ordner_liste = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all() regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).order_by(SortierRegel.prioritaet).all() if not ordner_liste: raise HTTPException(status_code=400, detail="Keine Quell-Ordner konfiguriert") if not regeln: raise HTTPException(status_code=400, detail="Keine Regeln definiert") if not sortierungs_manager.starten(): raise HTTPException(status_code=409, detail="Eine Sortierung läuft bereits") # Daten kopieren für Generator ordner_data = [ {"id": o.id, "name": o.name, "pfad": o.pfad, "ziel_ordner": o.ziel_ordner, "rekursiv": o.rekursiv, "dateitypen": o.dateitypen} for o in ordner_liste ] regeln_dicts = [ {"id": r.id, "name": r.name, "prioritaet": r.prioritaet, "muster": r.muster, "extraktion": r.extraktion, "schema": r.schema, "unterordner": r.unterordner} for r in regeln ] # Flags für Generator ist_testmodus = testmodus backup_ordner = ocr_backup_ordner def event_generator(): from ..models.database import SessionLocal def send_event(data): return f"data: {json.dumps(data)}\n\n" sorter = Sorter(regeln_dicts) # PDF Processor mit optionalem Backup-Ordner pdf_processor = PDFProcessor(backup_ordner=backup_ordner) stats = {"gesamt": 0, "sortiert": 0, "zugferd": 0, "fehler": 0} # Start-Event mit Testmodus-Info yield send_event({ "type": "start", "ordner": len(ordner_data), "regeln": len(regeln_dicts), "testmodus": ist_testmodus }) try: for quell_ordner in ordner_data: if sortierungs_manager.soll_abbrechen(): yield send_event({"type": "abgebrochen"}) break pfad = Path(quell_ordner["pfad"]) if not pfad.exists(): yield send_event({"type": "warnung", "nachricht": f"Ordner existiert nicht: {quell_ordner['pfad']}"}) continue yield send_event({"type": "ordner", "name": quell_ordner["name"], "pfad": quell_ordner["pfad"]}) ziel_basis = Path(quell_ordner["ziel_ordner"]) # Dateien sammeln dateien = [] pattern = "**/*" if quell_ordner["rekursiv"] else "*" erlaubte = [t.lower() for t in (quell_ordner["dateitypen"] or [".pdf"])] for f in pfad.glob(pattern): if f.is_file() and f.suffix.lower() in erlaubte: dateien.append(f) yield send_event({"type": "dateien_gefunden", "anzahl": len(dateien)}) for datei in dateien: if sortierungs_manager.soll_abbrechen(): yield send_event({"type": "abgebrochen"}) break stats["gesamt"] += 1 try: rel_pfad = str(datei.relative_to(pfad)) except: rel_pfad = datei.name try: ist_pdf = datei.suffix.lower() == ".pdf" text = "" ist_zugferd = False ist_signiert = False hat_text = False ocr_gemacht = False if ist_pdf: # Im Testmodus: OCR nur extrahieren, NICHT einbetten if ist_testmodus: # Text extrahieren ohne PDF zu verändern text_result, seiten = pdf_processor.extrahiere_text(str(datei)) text = text_result hat_text = bool(text and len(text.strip()) > 50) # ZUGFeRD prüfen zugferd_result = pdf_processor.pruefe_zugferd(str(datei)) ist_zugferd = zugferd_result["ist_zugferd"] # Signatur prüfen ist_signiert = pdf_processor.hat_digitale_signatur(str(datei)) else: # Normale Verarbeitung mit OCR-Einbettung pdf_result = pdf_processor.verarbeite(str(datei)) if pdf_result.get("fehler"): raise Exception(pdf_result["fehler"]) text = pdf_result.get("text", "") ist_zugferd = pdf_result.get("ist_zugferd", False) ist_signiert = pdf_result.get("ist_signiert", False) hat_text = pdf_result.get("hat_text", False) ocr_gemacht = pdf_result.get("ocr_durchgefuehrt", False) # Dokument-Info für Regel-Matching inkl. aller Eigenschaften doc_info = { "text": text, "original_name": datei.name, "absender": "", "dateityp": datei.suffix.lower(), # PDF-Eigenschaften für Typ-basierte Regeln "ist_zugferd": ist_zugferd, "ist_signiert": ist_signiert, "hat_text": hat_text, "ist_pdf": ist_pdf, "ist_bild": datei.suffix.lower() in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp"] } regel = sorter.finde_passende_regel(doc_info) if not regel: stats["fehler"] += 1 yield send_event({ "type": "datei", "original": rel_pfad, "status": "keine_regel", "testmodus": ist_testmodus }) continue extrahiert = sorter.extrahiere_felder(regel, doc_info) schema = regel.get("schema", "{datum} - Dokument.pdf") if schema.endswith(".pdf"): schema = schema[:-4] + datei.suffix neuer_name = sorter.generiere_dateinamen({"schema": schema, **regel}, extrahiert) ziel = ziel_basis if regel.get("unterordner"): ziel = ziel / regel["unterordner"] if ist_testmodus: # Testmodus: Nur simulieren, nicht verschieben stats["sortiert"] += 1 yield send_event({ "type": "datei", "original": rel_pfad, "neuer_name": neuer_name, "ziel_ordner": str(ziel), "status": "sortiert", "regel": regel.get("name"), "extrahiert": extrahiert, "testmodus": True }) else: # Echte Sortierung ziel.mkdir(parents=True, exist_ok=True) neuer_pfad = sorter.verschiebe_datei(str(datei), str(ziel), neuer_name) stats["sortiert"] += 1 # DB speichern session = SessionLocal() try: session.add(VerarbeiteteDatei( original_pfad=str(datei), original_name=datei.name, neuer_pfad=neuer_pfad, neuer_name=neuer_name, ist_zugferd=ist_zugferd, ocr_durchgefuehrt=ocr_gemacht, status="sortiert", extrahierte_daten=extrahiert )) session.commit() finally: session.close() yield send_event({ "type": "datei", "original": rel_pfad, "neuer_name": neuer_name, "ziel_ordner": str(ziel), "status": "sortiert", "regel": regel.get("name"), "extrahiert": extrahiert, "testmodus": False }) except Exception as e: stats["fehler"] += 1 yield send_event({ "type": "datei", "original": rel_pfad, "status": "fehler", "fehler": str(e), "testmodus": ist_testmodus }) yield send_event({"type": "fertig", "testmodus": ist_testmodus, **stats}) finally: sortierungs_manager.beenden() return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"} ) @router.post("/sortierung/stoppen") def stoppe_sortierung(): """Stoppt die laufende Sortierung""" if sortierungs_manager.ist_aktiv(): sortierungs_manager.stoppen() return {"erfolg": True, "nachricht": "Sortierung wird gestoppt"} return {"erfolg": False, "nachricht": "Keine aktive Sortierung"} @router.get("/sortierung/status") def sortierung_status(): """Gibt Status der Sortierung zurück""" return {"aktiv": sortierungs_manager.ist_aktiv()} @router.post("/sortierung/starten") def starte_sortierung(db: Session = Depends(get_db)): ordner_liste = db.query(QuellOrdner).filter(QuellOrdner.aktiv == True).all() regeln = db.query(SortierRegel).filter(SortierRegel.aktiv == True).order_by(SortierRegel.prioritaet).all() if not ordner_liste: return {"fehler": "Keine Quell-Ordner konfiguriert", "verarbeitet": []} if not regeln: return {"fehler": "Keine Regeln definiert", "verarbeitet": []} # Regeln in Dict-Format regeln_dicts = [] for r in regeln: regeln_dicts.append({ "id": r.id, "name": r.name, "prioritaet": r.prioritaet, "muster": r.muster, "extraktion": r.extraktion, "schema": r.schema, "unterordner": r.unterordner }) sorter = Sorter(regeln_dicts) pdf_processor = PDFProcessor() ergebnis = { "gesamt": 0, "sortiert": 0, "zugferd": 0, "fehler": 0, "verarbeitet": [] } for quell_ordner in ordner_liste: pfad = Path(quell_ordner.pfad) if not pfad.exists(): continue ziel_basis = Path(quell_ordner.ziel_ordner) dateien = sammle_dateien(quell_ordner) for datei in dateien: ergebnis["gesamt"] += 1 # Relativer Pfad für Anzeige try: rel_pfad = str(datei.relative_to(pfad)) except: rel_pfad = datei.name datei_info = {"original": rel_pfad} try: ist_pdf = datei.suffix.lower() == ".pdf" text = "" ist_zugferd = False ocr_gemacht = False # Nur PDFs durch den PDF-Processor if ist_pdf: pdf_result = pdf_processor.verarbeite(str(datei)) if pdf_result.get("fehler"): raise Exception(pdf_result["fehler"]) text = pdf_result.get("text", "") ist_zugferd = pdf_result.get("ist_zugferd", False) ocr_gemacht = pdf_result.get("ocr_durchgefuehrt", False) # ZUGFeRD separat behandeln if ist_zugferd: zugferd_ziel = ziel_basis / "zugferd" zugferd_ziel.mkdir(parents=True, exist_ok=True) neuer_pfad = zugferd_ziel / datei.name counter = 1 while neuer_pfad.exists(): neuer_pfad = zugferd_ziel / f"{datei.stem}_{counter}{datei.suffix}" counter += 1 datei.rename(neuer_pfad) ergebnis["zugferd"] += 1 datei_info["zugferd"] = True datei_info["neuer_name"] = neuer_pfad.name db.add(VerarbeiteteDatei( original_pfad=str(datei), original_name=datei.name, neuer_pfad=str(neuer_pfad), neuer_name=neuer_pfad.name, ist_zugferd=True, status="zugferd" )) ergebnis["verarbeitet"].append(datei_info) continue # Regel finden (für PDFs mit Text, für andere nur Dateiname) doc_info = { "text": text, "original_name": datei.name, "absender": "", "dateityp": datei.suffix.lower() } regel = sorter.finde_passende_regel(doc_info) if not regel: datei_info["fehler"] = "Keine passende Regel" ergebnis["fehler"] += 1 ergebnis["verarbeitet"].append(datei_info) continue # Felder extrahieren extrahiert = sorter.extrahiere_felder(regel, doc_info) # Dateiendung beibehalten schema = regel.get("schema", "{datum} - Dokument.pdf") # Endung aus Schema entfernen und Original-Endung anhängen if schema.endswith(".pdf"): schema = schema[:-4] + datei.suffix neuer_name = sorter.generiere_dateinamen({"schema": schema, **regel}, extrahiert) # Zielordner ziel = ziel_basis if regel.get("unterordner"): ziel = ziel / regel["unterordner"] ziel.mkdir(parents=True, exist_ok=True) # Verschieben neuer_pfad = sorter.verschiebe_datei(str(datei), str(ziel), neuer_name) ergebnis["sortiert"] += 1 datei_info["neuer_name"] = neuer_name db.add(VerarbeiteteDatei( original_pfad=str(datei), original_name=datei.name, neuer_pfad=neuer_pfad, neuer_name=neuer_name, ist_zugferd=False, ocr_durchgefuehrt=ocr_gemacht, status="sortiert", extrahierte_daten=extrahiert )) except Exception as e: ergebnis["fehler"] += 1 datei_info["fehler"] = str(e) ergebnis["verarbeitet"].append(datei_info) db.commit() return ergebnis @router.get("/health") def health(): return {"status": "ok"} # ============ Datenbank-Management ============ @router.post("/db/reset") def reset_database(db: Session = Depends(get_db)): """Setzt die Datenbank zurück (löscht alle verarbeiteten Mails/Dateien)""" try: # Verarbeitete Mails löschen mails_count = db.query(VerarbeiteteMail).count() db.query(VerarbeiteteMail).delete() # Verarbeitete Dateien löschen dateien_count = db.query(VerarbeiteteDatei).count() db.query(VerarbeiteteDatei).delete() db.commit() return { "erfolg": True, "nachricht": f"Datenbank zurückgesetzt", "geloescht": { "mails": mails_count, "dateien": dateien_count } } except Exception as e: db.rollback() return {"erfolg": False, "nachricht": str(e)} @router.get("/db/statistik") def db_statistik(db: Session = Depends(get_db)): """Gibt Statistiken über die Datenbank zurück""" return { "postfaecher": db.query(Postfach).count(), "quell_ordner": db.query(QuellOrdner).count(), "regeln": db.query(SortierRegel).count(), "verarbeitete_mails": db.query(VerarbeiteteMail).count(), "verarbeitete_dateien": db.query(VerarbeiteteDatei).count() } # ============ Einfache Regeln (UI-freundlich) ============ @router.get("/dokumenttypen") def liste_dokumenttypen(): """Gibt alle verfügbaren Dokumenttypen für das UI zurück""" from ..modules.sorter import DOKUMENTTYPEN return [ {"id": key, "name": config["name"], "schema": config["schema"], "unterordner": config["unterordner"]} for key, config in DOKUMENTTYPEN.items() ] @router.get("/typ-regeln") def liste_typ_regeln_api(nur_fallback: bool = None): """ Gibt alle verfügbaren Typ-basierten Regeln für das UI zurück Args: nur_fallback: None = alle, true = nur Fallbacks, false = keine Fallbacks """ from ..modules.sorter import liste_typ_regeln return liste_typ_regeln(nur_fallback=nur_fallback) class TypRegelCreate(BaseModel): typ_id: str # z.B. "zugferd", "bilder", "signierte_pdfs" unterordner: Optional[str] = None prioritaet: Optional[int] = None @router.post("/regeln/typ") def erstelle_typ_regel_api(data: TypRegelCreate, db: Session = Depends(get_db)): """Erstellt eine Typ-basierte Regel (Grob-Sortierung nach Dateityp/Eigenschaften)""" from ..modules.sorter import erstelle_typ_regel try: regel_dict = erstelle_typ_regel( data.typ_id, unterordner=data.unterordner, prioritaet=data.prioritaet ) regel = SortierRegel(**regel_dict) db.add(regel) db.commit() db.refresh(regel) return { "id": regel.id, "name": regel.name, "typ_id": data.typ_id, "prioritaet": regel.prioritaet, "unterordner": regel.unterordner } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) class EinfacheRegelCreate(BaseModel): name: str dokumenttyp: str # z.B. "rechnung", "vertrag" keywords: str # Komma-getrennt firma: Optional[str] = None # Fester Firmenwert unterordner: Optional[str] = None prioritaet: int = 50 @router.post("/regeln/einfach") def erstelle_einfache_regel_api(data: EinfacheRegelCreate, db: Session = Depends(get_db)): """Erstellt eine Regel basierend auf Dokumenttyp - für einfaches UI""" from ..modules.sorter import DOKUMENTTYPEN typ_config = DOKUMENTTYPEN.get(data.dokumenttyp, DOKUMENTTYPEN["sonstiges"]) # Muster als Dict (keywords werden vom Sorter geparst) muster = {"keywords": data.keywords} # Extraktion (nur Firma wenn angegeben) extraktion = {} if data.firma: extraktion["firma"] = {"wert": data.firma} regel = SortierRegel( name=data.name, prioritaet=data.prioritaet, aktiv=True, muster=muster, extraktion=extraktion, schema=typ_config["schema"], unterordner=data.unterordner or typ_config["unterordner"] ) db.add(regel) db.commit() db.refresh(regel) return { "id": regel.id, "name": regel.name, "dokumenttyp": data.dokumenttyp, "keywords": data.keywords, "schema": regel.schema } class ExtraktionTestRequest(BaseModel): text: str dateiname: Optional[str] = "test.pdf" class CustomExtraktionRequest(BaseModel): text: str firma: Optional[str] = None datum_regex: Optional[str] = None betrag_regex: Optional[str] = None nummer_regex: Optional[str] = None @router.post("/extraktion/test") def teste_extraktion(data: ExtraktionTestRequest): """Testet die automatische Extraktion auf einem Text""" from ..modules.extraktoren import extrahiere_alle_felder, baue_dateiname dokument_info = { "original_name": data.dateiname, "absender": "" } # Felder extrahieren felder = extrahiere_alle_felder(data.text, dokument_info) # Beispiel-Dateinamen für verschiedene Typen generieren beispiele = {} from ..modules.sorter import DOKUMENTTYPEN for typ_id, typ_config in DOKUMENTTYPEN.items(): beispiele[typ_id] = baue_dateiname(typ_config["schema"], felder, ".pdf") return { "extrahiert": felder, "beispiel_dateinamen": beispiele } @router.post("/extraktion/test-custom") def teste_custom_extraktion(data: CustomExtraktionRequest): """Testet Extraktion mit benutzerdefinierten Regex-Mustern""" import re from ..modules.extraktoren import _parse_betrag felder = {} fehler = [] # Firma (fester Wert) if data.firma: felder["firma"] = data.firma # Datum mit Custom-Regex if data.datum_regex: try: match = re.search(data.datum_regex, data.text, re.IGNORECASE) if match: datum_str = match.group(1) # Versuche verschiedene Formate zu parsen for fmt in ["%d.%m.%Y", "%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"]: try: from datetime import datetime dt = datetime.strptime(datum_str, fmt) felder["datum"] = dt.strftime("%Y-%m-%d") break except: continue if "datum" not in felder: felder["datum"] = datum_str # Rohwert wenn Parsing fehlschlägt else: fehler.append(f"Datum-Regex matched nicht") except Exception as e: fehler.append(f"Datum-Regex Fehler: {str(e)}") else: # Fallback auf automatische Extraktion from ..modules.extraktoren import extrahiere_datum datum = extrahiere_datum(data.text) if datum: felder["datum"] = datum # Betrag mit Custom-Regex if data.betrag_regex: try: match = re.search(data.betrag_regex, data.text, re.IGNORECASE) if match: betrag_str = match.group(1) betrag = _parse_betrag(betrag_str) if betrag is not None: if betrag == int(betrag): felder["betrag"] = str(int(betrag)) else: felder["betrag"] = f"{betrag:.2f}".replace(".", ",") else: felder["betrag"] = betrag_str else: fehler.append(f"Betrag-Regex matched nicht") except Exception as e: fehler.append(f"Betrag-Regex Fehler: {str(e)}") else: from ..modules.extraktoren import extrahiere_betrag betrag = extrahiere_betrag(data.text) if betrag: felder["betrag"] = betrag # Nummer mit Custom-Regex if data.nummer_regex: try: match = re.search(data.nummer_regex, data.text, re.IGNORECASE) if match: felder["nummer"] = match.group(1) else: fehler.append(f"Nummer-Regex matched nicht") except Exception as e: fehler.append(f"Nummer-Regex Fehler: {str(e)}") else: from ..modules.extraktoren import extrahiere_nummer nummer = extrahiere_nummer(data.text) if nummer: felder["nummer"] = nummer # Dokumenttyp automatisch from ..modules.extraktoren import extrahiere_dokumenttyp typ = extrahiere_dokumenttyp(data.text) if typ: felder["typ"] = typ return { "extrahiert": felder, "fehler": "; ".join(fehler) if fehler else None } class DateiExtraktionRequest(BaseModel): pfad: str @router.post("/extraktion/upload-pdf") async def upload_pdf_extraktion(file: UploadFile = File(...)): """Extrahiert Text aus einer hochgeladenen PDF-Datei""" import tempfile import os if not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Nur PDF-Dateien erlaubt") # Temporäre Datei erstellen with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: content = await file.read() tmp.write(content) tmp_path = tmp.name try: pdf_processor = PDFProcessor() result = pdf_processor.verarbeite(tmp_path) if result.get("fehler"): raise HTTPException(status_code=500, detail=result["fehler"]) return { "text": result.get("text", ""), "ocr_durchgefuehrt": result.get("ocr_durchgefuehrt", False), "ist_zugferd": result.get("ist_zugferd", False), "dateiname": file.filename } finally: # Temporäre Datei löschen try: os.unlink(tmp_path) except: pass @router.post("/extraktion/from-file") def extraktion_from_file(data: DateiExtraktionRequest): """Extrahiert Text aus einer Datei auf dem Server""" import os pfad = data.pfad # Sicherheitsprüfung pfad = os.path.abspath(pfad) allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: raise HTTPException(status_code=403, detail="Pfad nicht erlaubt") if not os.path.exists(pfad): raise HTTPException(status_code=404, detail="Datei nicht gefunden") if not pfad.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Nur PDF-Dateien werden unterstützt") pdf_processor = PDFProcessor() result = pdf_processor.verarbeite(pfad) if result.get("fehler"): raise HTTPException(status_code=500, detail=result["fehler"]) return { "text": result.get("text", ""), "ocr_durchgefuehrt": result.get("ocr_durchgefuehrt", False), "ist_zugferd": result.get("ist_zugferd", False), "pfad": pfad } @router.post("/regeln/{id}/vorschau") def regel_vorschau(id: int, data: ExtraktionTestRequest, db: Session = Depends(get_db)): """Zeigt Vorschau wie eine Regel auf einen Text angewendet würde""" regel = db.query(SortierRegel).filter(SortierRegel.id == id).first() if not regel: raise HTTPException(status_code=404, detail="Regel nicht gefunden") from ..modules.sorter import Sorter sorter = Sorter([{ "id": regel.id, "name": regel.name, "prioritaet": regel.prioritaet, "aktiv": True, "muster": regel.muster, "extraktion": regel.extraktion, "schema": regel.schema, "unterordner": regel.unterordner }]) dokument_info = { "text": data.text, "original_name": data.dateiname or "test.pdf", "absender": "" } # Prüfen ob Regel matched passende_regel = sorter.finde_passende_regel(dokument_info) if not passende_regel: return { "matched": False, "grund": "Keywords nicht gefunden" } # Felder extrahieren felder = sorter.extrahiere_felder(passende_regel, dokument_info) # Dateiname generieren dateiname = sorter.generiere_dateinamen(passende_regel, felder) return { "matched": True, "extrahiert": felder, "dateiname": dateiname, "unterordner": passende_regel.get("unterordner") } # ============ Dateimanager / File Operations ============ @router.get("/browse/files") def browse_files(path: str = "/"): """Listet Ordner und Dateien für den Dateimanager""" import os # Sicherheit: Nur bestimmte Basispfade erlauben allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] path = os.path.abspath(path) # Prüfen ob Pfad erlaubt is_allowed = any(path.startswith(base) for base in allowed_bases) or path == "/" if not is_allowed: return {"error": "Pfad nicht erlaubt", "folders": [], "files": []} if not os.path.exists(path): return {"error": "Pfad existiert nicht", "folders": [], "files": []} if not os.path.isdir(path): return {"error": "Kein Verzeichnis", "folders": [], "files": []} def natuerliche_sortierung(name): """ Sortiert natürlich: Sonderzeichen zuerst, dann Zahlen numerisch, dann Buchstaben. Beispiel: !datei, #datei, 1.pdf, 2.pdf, 10.pdf, a.pdf, z.pdf """ import re # Prüfe erstes Zeichen first_char = name[0] if name else '' # Kategorie: 0 = Sonderzeichen, 1 = Zahlen, 2 = Buchstaben if first_char.isdigit(): kategorie = 1 elif first_char.isalpha(): kategorie = 2 else: kategorie = 0 # Sonderzeichen zuerst # Für natürliche Sortierung: Zahlen als Integers behandeln teile = re.split(r'(\d+)', name.lower()) sortier_key = [] for teil in teile: if teil.isdigit(): sortier_key.append((0, int(teil))) # Zahlen numerisch else: sortier_key.append((1, teil)) # Text alphabetisch return (kategorie, sortier_key) try: folders = [] files = [] for entry in os.listdir(path): full_path = os.path.join(path, entry) try: if os.path.isdir(full_path): folders.append({ "name": entry, "path": full_path }) else: stat = os.stat(full_path) files.append({ "name": entry, "path": full_path, "size": stat.st_size, "modified": stat.st_mtime }) except (OSError, PermissionError): continue # Sortierung: Sonderzeichen -> Zahlen -> Buchstaben folders.sort(key=lambda x: natuerliche_sortierung(x["name"])) files.sort(key=lambda x: natuerliche_sortierung(x["name"])) return { "current": path, "parent": os.path.dirname(path) if path != "/" else None, "folders": folders, "files": files } except PermissionError: return {"error": "Zugriff verweigert", "folders": [], "files": []} class FileRenameRequest(BaseModel): pfad: str neuer_name: str @router.post("/file/rename") def rename_file(data: FileRenameRequest): """Benennt eine Datei um""" import os import shutil pfad = os.path.abspath(data.pfad) neuer_name = data.neuer_name # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: return {"erfolg": False, "fehler": "Pfad nicht erlaubt"} if not os.path.exists(pfad): return {"erfolg": False, "fehler": "Datei nicht gefunden"} # Ungültige Zeichen prüfen if '/' in neuer_name or '\\' in neuer_name or '\0' in neuer_name: return {"erfolg": False, "fehler": "Ungültiger Dateiname"} ordner = os.path.dirname(pfad) neuer_pfad = os.path.join(ordner, neuer_name) if os.path.exists(neuer_pfad): return {"erfolg": False, "fehler": "Eine Datei mit diesem Namen existiert bereits"} try: shutil.move(pfad, neuer_pfad) return {"erfolg": True, "neuer_pfad": neuer_pfad} except Exception as e: return {"erfolg": False, "fehler": str(e)} class FileMoveRequest(BaseModel): pfad: str ziel_ordner: str @router.post("/file/move") def move_file(data: FileMoveRequest): """Verschiebt eine Datei in einen anderen Ordner""" import os import shutil pfad = os.path.abspath(data.pfad) ziel_ordner = os.path.abspath(data.ziel_ordner) # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed_source = any(pfad.startswith(base) for base in allowed_bases) is_allowed_target = any(ziel_ordner.startswith(base) for base in allowed_bases) if not is_allowed_source or not is_allowed_target: return {"erfolg": False, "fehler": "Pfad nicht erlaubt"} if not os.path.exists(pfad): return {"erfolg": False, "fehler": "Datei nicht gefunden"} if not os.path.isdir(ziel_ordner): return {"erfolg": False, "fehler": "Zielordner existiert nicht"} dateiname = os.path.basename(pfad) neuer_pfad = os.path.join(ziel_ordner, dateiname) if os.path.exists(neuer_pfad): return {"erfolg": False, "fehler": "Eine Datei mit diesem Namen existiert bereits im Zielordner"} try: shutil.move(pfad, neuer_pfad) return {"erfolg": True, "neuer_pfad": neuer_pfad} except Exception as e: return {"erfolg": False, "fehler": str(e)} class FileDeleteRequest(BaseModel): pfad: str @router.delete("/file/delete") def delete_file(data: FileDeleteRequest): """Löscht eine Datei""" import os pfad = os.path.abspath(data.pfad) # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: return {"erfolg": False, "fehler": "Pfad nicht erlaubt"} if not os.path.exists(pfad): return {"erfolg": False, "fehler": "Datei nicht gefunden"} if os.path.isdir(pfad): return {"erfolg": False, "fehler": "Ordner können nicht gelöscht werden (nur Dateien)"} try: os.remove(pfad) return {"erfolg": True} except Exception as e: return {"erfolg": False, "fehler": str(e)} @router.get("/file/preview") def preview_file(path: str): """Liefert eine Datei für die Vorschau (ohne Sperrung)""" from fastapi.responses import FileResponse, Response import os import mimetypes pfad = os.path.abspath(path) # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: raise HTTPException(status_code=403, detail="Pfad nicht erlaubt") if not os.path.exists(pfad): raise HTTPException(status_code=404, detail="Datei nicht gefunden") # Mimetype bestimmen mime_type, _ = mimetypes.guess_type(pfad) if not mime_type: mime_type = "application/octet-stream" # Datei als Kopie lesen um Sperrung zu vermeiden try: with open(pfad, 'rb') as f: content = f.read() return Response( content=content, media_type=mime_type, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/file/text") def get_file_text(path: str): """Liefert den Textinhalt einer Datei""" import os pfad = os.path.abspath(path) # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: raise HTTPException(status_code=403, detail="Pfad nicht erlaubt") if not os.path.exists(pfad): raise HTTPException(status_code=404, detail="Datei nicht gefunden") try: # Max 1MB lesen max_size = 1024 * 1024 with open(pfad, 'r', encoding='utf-8', errors='replace') as f: content = f.read(max_size) return {"content": content} except Exception as e: return {"content": None, "error": str(e)} @router.get("/file/download") def download_file(path: str): """Datei zum Download""" from fastapi.responses import FileResponse import os pfad = os.path.abspath(path) # Sicherheit allowed_bases = ["/srv", "/home", "/mnt", "/media", "/data", "/tmp"] is_allowed = any(pfad.startswith(base) for base in allowed_bases) if not is_allowed: raise HTTPException(status_code=403, detail="Pfad nicht erlaubt") if not os.path.exists(pfad): raise HTTPException(status_code=404, detail="Datei nicht gefunden") dateiname = os.path.basename(pfad) return FileResponse( path=pfad, filename=dateiname, media_type="application/octet-stream" )