import asyncio import os.path import re import subprocess import json import logging import threading import time from datetime import date import uvicorn from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from starlette.websockets import WebSocketState import app.video_class as vc # Settings language = ["ger", "eng"] subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"] max_tasks = 1 # Globale Variablen queue = asyncio.Queue() read_output_task = None video_files = {} active_process = set() active_tasks = set() connected_clients = set() semaphore = threading.Semaphore(max_tasks) date = date.today() if not os.path.exists("./logs"): os.mkdir("./logs") logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(f"./logs/{date}.log") ] ) app = FastAPI() app.mount("/webs", StaticFiles(directory="app/webs"), name="webs") templates = Jinja2Templates(directory="app/templates") def obj_list(): list_json = [] for obj_video in video_files.values(): list_json.append(obj_video.get_vars()) return json.dumps({"list_video":list_json}) # Media ---------------------------------------------------------------------------------------------------------------- def get_video_information(media_path): pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" file_paths = media_path.get("files", [])[0] var_list_file = re.split(pattern, file_paths) try: for source_file in var_list_file: source_file = source_file.strip() video_streams, video_format = get_ffprobe("v", source_file) audio_streams = get_ffprobe("a",source_file) subtitle_streams = get_ffprobe("s", source_file) obj = vc.Video(source_file, video_streams, video_format, audio_streams, subtitle_streams) video_files.update({source_file: obj}) logging.info(obj) return 1 except Exception as e: logging.error(f"Get Video Information: {e}") return 0 def get_ffprobe(select, source_file): command = [ "ffprobe", "-v", "error", "-select_streams", f"{select}", "-show_entries", "stream=index,channels,codec_name,codec_type,pix_fmt,level," "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", "-show_entries", "format=size,bit_rate,nb_streams", "-of", "json", source_file ] result = subprocess.run(command, stdout=subprocess.PIPE, text=True) json_data = json.loads(result.stdout) if select == "v": return json_data.get("streams", []),json_data.get("format", []) elif select == "a": return json_data.get("streams", []) elif select == "s": return json_data.get("streams", []) # Convert Process ------------------------------------------------------------------------------------------------------ def queue_video(): for key, obj in video_files.items(): if obj.finished == 0: obj.task = threading.Thread(target=video_convert, args=(obj,)) obj.task.start() active_tasks.add(obj) logging.info(f"Warteschlange started Auftrag - {obj.task}") def video_convert(obj): global active_process semaphore.acquire() obj.convert_start = time.time() # Erstelle und setze einen Event-Loop für diesen Thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) command = [ "ffmpeg", "-y", "-i", obj.source_file, "-map", "0:0", "-c:v", "libsvtav1", "-preset", "5", "-crf", "30", "-g", "240", "-pix_fmt", "yuv420p10le", "-svtav1-params", "tune=0:film-grain=8", ] if len(obj.streams_audio): for audio_stream in obj.streams_audio: if audio_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{audio_stream['index']}", f"-c:a", "libopus", f"-b:a", "320k", f"-ac", str(audio_stream['channels']) ]) # Subtitle-Streams einbinden if len(obj.streams_subtitle): for subtitle_stream in obj.streams_subtitle: if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist: if subtitle_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{subtitle_stream['index']}", ]) command.append(obj.output_file) logging.info(f"{command}") loop.run_until_complete(queue.put(video_list())) # Prozess try: obj.process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) active_process.add(obj) obj.finished = 3 while obj.process.poll() is None: logging.info(f"{obj.file_name} ... Running") time.sleep(10) if obj.process.poll() == 0: obj.finished = 1 logging.info(f"Process Finished({obj.process.returncode}): {obj.file_name}") loop.run_until_complete(queue.put(obj.to_dict())) elif obj.process.poll() != 0: obj.finished = 2 logging.info(f"Process Failure({obj.process.returncode}): {obj.file_name}") loop.run_until_complete(queue.put(obj.to_dict())) active_process.discard(obj) active_tasks.discard(obj) obj.convert_end = time.time() semaphore.release() except Exception as e: obj.finished = 2 semaphore.release() logging.error(f"Convert Process Failure: {e}") #UviCorn WebServer Teil #---------------------------------------------------------------------------------------------------------------------- def read_output(qu): """Thread-Funktion, die Prozessausgaben in die Queue legt.""" global active_process # Erstelle und setze einen Event-Loop für diesen Thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) while True: if not len(active_process): time.sleep(5) continue for obj in list(active_process): line_error = obj.process.stderr.read(1024) line_error_decoded = line_error.decode() logging.info(f"Data ffmpeg: {line_error_decoded}") obj.extract_convert_data(line_error_decoded) # Verwende den Event-Loop, um die Daten in die asyncio Queue zu legen logging.info(f"Data Packet created: {obj.to_dict()}") loop.run_until_complete(qu.put(obj.to_dict())) def video_list(): vlist = [] for video in video_files.values(): vlist.append(video.get_vars()) return json.dumps({"video_list": vlist}) @app.post("/") async def receive_video_file(data: dict): if get_video_information(data): queue_video() else: logging.error(f"Videos konnten nicht verarbeitet werden! Warteschleife wurde nicht gestarted") @app.get("/progress", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files}) @app.get("/webs-ui", response_class=HTMLResponse) async def display_paths(request: Request): return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files}) @app.websocket("/ws") async def websocket_v(websocket: WebSocket): """WebSocket-Verbindung für die Kommunikation mit Clients.""" global read_output_task, connected_clients connected_clients.add(websocket) await websocket.accept() if read_output_task is None: # Startet den Thread zum Verarbeiten der Prozessausgaben read_output_task = threading.Thread(target=read_output, args=(queue,)) read_output_task.start() try: var_first_sending = 0 while True: if websocket not in connected_clients: break message = await queue.get() # Warten auf neue Nachricht aus der Queue await websocket.send_text(message) if not var_first_sending: await queue.put(video_list()) var_first_sending = 1 except WebSocketDisconnect: logging.info("WebSocket disconnected") except Exception as e: logging.error(f"WebSocket error: {e}") finally: connected_clients.discard(websocket) if websocket.client_state == WebSocketState.CONNECTED: await websocket.close() @app.get("/clients") async def get_clients_count(): return {"active_clients": len(connected_clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)} if __name__ == "__main__": uvicorn.run("app.main:app", host="127.0.0.1", port=8000, reload=False)