import asyncio import re import subprocess import json from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from contextlib import asynccontextmanager import app.video_class as vc @asynccontextmanager async def lifespan(_: FastAPI): await queue_video() yield app = FastAPI(lifespan=lifespan) app.mount("/webs", StaticFiles(directory="app/webs"), name="webs") templates = Jinja2Templates(directory="app/templates") queue = asyncio.Queue() read_output_task = None convert_task = 0 # Settings language = ["ger", "eng"] subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"] process_count = 1 video_files = {} progress = {} async def queue_video(): global convert_task for key, obj in video_files.items(): if process_count > convert_task: if obj.finished == 0: convert_task += 1 asyncio.create_task(video_convert(obj)) async def get_ffprobe(select, obj): global convert_task command = [ "ffprobe", "-v", "error", "-select_streams", f"{select}", "-show_entries", "stream=index,channels,codec_name,tags:stream_tags=language,tags:format=duration", "-of", "json", obj.source_file ] try: result = subprocess.run(command, stdout=subprocess.PIPE, text=True) json_data = json.loads(result.stdout) print(json_data) duration = json_data.get("format", {"duration": 999}).get("duration") if duration: if duration.replace(".","", 1).isdigit(): obj.duration = round(float(duration)) else: obj.duration = 0 return json_data except Exception as e: convert_task -= 1 await queue_video() async def video_convert(obj): global convert_task json_data_audio = await get_ffprobe("a", obj) json_data_subtitles = await get_ffprobe("s", obj) # Konvertierung ---------------------------------------------------------------------------------------------------- command = [ "ffmpeg", "-y", "-i", obj.source_file, "-map", "0:0", "-c:v", "libsvtav1", "-preset", "5", "-crf", "24", "-g", "240", "-pix_fmt", "yuv420p", "-svtav1-params", "tune=0:film-grain=8", ] if "streams" in json_data_audio: i = 0 for audio_stream in json_data_audio["streams"]: if audio_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{audio_stream['index']}", f"-c:a", "libopus", f"-b:a", "320k", f"-ac", str(audio_stream['channels']) ]) i += 1 # Subtitle-Streams einbinden if "streams" in json_data_subtitles: for subtitle_stream in json_data_subtitles["streams"]: if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist: if subtitle_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{subtitle_stream['index']}", ]) command.append(obj.output_file) """ ffmpeg_cm = "" for cm in command: ffmpeg_cm += f"{cm} " print(ffmpeg_cm) """ # Prozess try: process_video = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) obj.progress = process_video if process_video.returncode == 0: obj.finished = 1 convert_task -= 1 await queue_video() except Exception as e: convert_task -= 1 obj.finished = 2 await queue_video() obj.error.append(f"ffmpeg ---- {e}") async def read_output(): while True: active_processes = [obj for obj in video_files.values() if obj.progress] if not active_processes: await asyncio.sleep(10) # Kein aktives Video -> kurz warten continue for obj in active_processes: if obj.progress: line = await obj.progress.stderr.read(1024) line_decoded = line.decode().strip() print(line_decoded) obj.extract_convert_data(line_decoded) json_data = json.dumps(obj.to_dict()) await queue.put(json_data) @app.post("/") async def receive_video_file(data: dict): pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" file_paths = data.get("files", [])[0] var_list_file = re.split(pattern, file_paths) for path in var_list_file: obj_file = vc.Video(path.strip()) video_files.update({path: obj_file}) await queue_video() #Test print(video_files) @app.get("/progress", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files}) @app.get("/webs-ui", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files}) @app.websocket("/ws") async def websocket_v(websocket: WebSocket): global read_output_task await websocket.accept() if read_output_task is None or read_output_task.done(): read_output_task = asyncio.create_task(read_output()) try: while True: message = await queue.get() await websocket.send_text(message) except WebSocketDisconnect: print("WebSocket disconnected") # Optional: Logging except Exception as e: print(f"WebSocket error: {e}") # Fehlerbehandlung finally: await websocket.close()