import asyncio import re import subprocess import json from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from contextlib import asynccontextmanager import app.video_class as vc @asynccontextmanager async def lifespan(_: FastAPI): await queue_video() yield app = FastAPI(lifespan=lifespan) app.mount("/webs", StaticFiles(directory="app/webs"), name="webs") templates = Jinja2Templates(directory="app/templates") queue = asyncio.Queue() read_output_task = None convert_task = 0 # Settings language = ["ger", "eng"] process_count = 1 # Test #obj_file_test = vc.Video("/mnt/Media/21 - Spielfilme M/Star Trek/Star Trek 9 - Der Aufstand (1998) 1080p/Star Trek 9 - Der Aufstand (1998) 1080p.mkv") #video_files = {"/mnt/Media/21 - Spielfilme M/Star Trek/Star Trek 9 - Der Aufstand (1998) 1080p/Star Trek 9 - Der Aufstand (1998) 1080p.mkv": obj_file_test} video_files = {} #pattern = r"(/[^:]+?\.(?:mkv|webm|avi|mp4))" pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" progress = {} async def queue_video(): global convert_task for key, obj in video_files.items(): if process_count > convert_task: if obj.finished == 0: convert_task += 1 asyncio.create_task(video_convert(obj)) async def video_convert(obj): global convert_task # Audio ------------------------------------------------------------------------------------------------------------ command = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index,channels,tags:stream_tags=language,tags:format=duration", "-of", "json", obj.source_file ] json_data = {} try: result = subprocess.run(command, stdout=subprocess.PIPE, text=True) json_data = json.loads(result.stdout) print(json_data) obj.duration = json_data.get("format", {"duration":999}).get("duration") except Exception as e: convert_task -= 1 await queue_video() obj.error.append(f"ffprobe ---- {e}") print(obj.error) # Konvertierung ---------------------------------------------------------------------------------------------------- command = [ "ffmpeg", "-y", "-i", obj.source_file, "-map", "0:0", "-c:v", "libsvtav1", "-preset", "5", "-crf", "24", "-g", "240", "-pix_fmt", "yuv420p", "-svtav1-params", "tune=0:film-grain=8", ] if "streams" in json_data: i = 0 for audio_stream in json_data["streams"]: if audio_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{audio_stream['index']}", f"-c:a", "libopus", f"-b:a", "320k", f"-ac", str(audio_stream['channels']) ]) i += 1 command.extend(["-map", "0:s?", "-c:s", "copy"]) command.append(obj.output_file) # Prozess try: process_video = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) obj.progress = process_video if process_video.returncode == 0: obj.finished = 1 convert_task -= 1 await queue_video() except Exception as e: convert_task -= 1 obj.finished = 2 await queue_video() obj.error.append(f"ffmpeg ---- {e}") async def read_output(): while True: active_processes = [obj for obj in video_files.values() if obj.progress] if not active_processes: await asyncio.sleep(10) # Kein aktives Video -> kurz warten continue for obj in active_processes: if obj.progress: line = await obj.progress.stderr.read(2048) line_decoded = line.decode().strip() print(line_decoded) frame = re.findall(r"frame=\s*(\d+)", line_decoded) obj.frame = frame[0] if frame else obj.frame fps = re.findall(r"fps=\s*(\d+.\d+)", line_decoded) obj.fps = fps[0] if fps else obj.fps q = re.findall(r"q=\s*(\d+.\d+)", line_decoded) obj.q = q[0] if q else obj.q size = re.findall(r"size=\s*(\d+)", line_decoded) obj.size = round(int(size[0]) / 1024, 2) if size else obj.size time = re.findall(r"time=\s*(\d+:\d+:\d+)", line_decoded) time_v = time[0] if time else time obj.time = obj.time_in_sec(time_v) bitrate = re.findall(r"bitrate=\s*(\d+)", line_decoded) obj.bitrate = round(int(bitrate[0]) / 1024, 2) if bitrate else obj.bitrate speed = re.findall(r"speed=\s*(\d+\.\d+)", line_decoded) obj.speed = speed[0] if speed else obj.speed json_data = json.dumps(obj.to_dict()) await queue.put(json_data) @app.post("/") async def receive_video_file(data: dict): file_paths = data.get("files", [])[0] var_list_file = re.split(pattern, file_paths) for path in var_list_file: obj_file = vc.Video(path.strip()) video_files.update({path: obj_file}) await queue_video() #Test print(video_files) @app.get("/progress", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files}) @app.get("/webs-ui", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files}) @app.websocket("/ws") async def websocket_v(websocket: WebSocket): global read_output_task await websocket.accept() if read_output_task is None or read_output_task.done(): read_output_task = asyncio.create_task(read_output()) try: while True: message = await queue.get() await websocket.send_text(message) except WebSocketDisconnect: print("WebSocket disconnected") # Optional: Logging except Exception as e: print(f"WebSocket error: {e}") # Fehlerbehandlung finally: await websocket.close()