diff --git a/app/config.yaml b/app/config.yaml new file mode 100644 index 0000000..664fa95 --- /dev/null +++ b/app/config.yaml @@ -0,0 +1,10 @@ +convert: + max_process: 1 + language: + - deu + - eng + +subtitle: + blacklist: + - hdmv_pgs_subtitle + - dvd_subtitle \ No newline at end of file diff --git a/app/config_class.py b/app/config_class.py new file mode 100644 index 0000000..12d397e --- /dev/null +++ b/app/config_class.py @@ -0,0 +1,6 @@ +import yaml + +class Config: + def __init__(self): + with open("app/config.yaml") as file: + self.config = yaml.safe_load(file) diff --git a/app/ffmpeg_class.py b/app/ffmpeg_class.py new file mode 100644 index 0000000..ca14a6e --- /dev/null +++ b/app/ffmpeg_class.py @@ -0,0 +1,54 @@ +class Ffmpeg: + def __init__(config): + self.config = config + + def video_info(): + command_info = [ + "ffprobe", "-v", + "error", + "-select_streams", + f"{select}", + "-show_entries", + "stream=index,channels,codec_name,codec_type,pix_fmt,level," + "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", + "-show_entries", + "format=size,bit_rate,nb_streams", + "-of", "json", + source_file + ] + + return command_info + + def video_convert(): + command_convert = [ + "ffmpeg", "-y", "-i", obj.source_file, + "-map", "0:0", + "-c:v", "libsvtav1", + "-preset", "5", + "-crf", "30", + "-g", "240", + "-pix_fmt", "yuv420p10le", + "-svtav1-params", "tune=0:film-grain=8", + ] + + if len(obj.streams_audio): + for audio_stream in obj.streams_audio: + if audio_stream.get("tags", {}).get("language", None) in config["convert"]["language"]: + command_convert.extend([ + "-map", f"0:{audio_stream['index']}", + f"-c:a", "libopus", + f"-b:a", "320k", + f"-ac", str(audio_stream['channels']) + ]) + + # Subtitle-Streams einbinden + if len(obj.streams_subtitle): + for subtitle_stream in obj.streams_subtitle: + if subtitle_stream.get("codec_name") not in config["subtitle"]["blacklist"]: + if subtitle_stream.get("tags", {}).get("language", None) in config["convert"]["language"]: + command_convert.extend([ + "-map", f"0:{subtitle_stream['index']}", + ]) + command_convert.append(obj.output_file) + + return command_convert diff --git a/app/main.py b/app/main.py index 3aa62b2..d568a50 100755 --- a/app/main.py +++ b/app/main.py @@ -16,21 +16,23 @@ from fastapi.templating import Jinja2Templates from starlette.websockets import WebSocketState import app.video_class as vc - -# Settings -language = ["ger", "eng"] -subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"] -max_tasks = 1 +from app.ffmpeg_class import FfmpegCommands as fc +from app.config_class import Config as c # Globale Variablen -queue = asyncio.Queue() -read_output_task = None +config = c.Config() +ffmpeg: ffmpeg = fc.Ffmpeg(config) +queue: queue = queue.Queue() +semaphore = threading.Semaphore(config["convert"]["max_process"]) + +thread_output = None video_files = {} +date = date.today() + +# Prozess Objects active_process = set() active_tasks = set() -connected_clients = set() -semaphore = threading.Semaphore(max_tasks) -date = date.today() +clients = set() if not os.path.exists("./logs"): os.mkdir("./logs") @@ -80,19 +82,7 @@ def get_video_information(media_path): return 0 def get_ffprobe(select, source_file): - command = [ - "ffprobe", "-v", - "error", - "-select_streams", - f"{select}", - "-show_entries", - "stream=index,channels,codec_name,codec_type,pix_fmt,level," - "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", - "-show_entries", - "format=size,bit_rate,nb_streams", - "-of", "json", - source_file - ] + command = ffmpeg.video_info() result = subprocess.run(command, stdout=subprocess.PIPE, text=True) json_data = json.loads(result.stdout) @@ -118,41 +108,14 @@ def video_convert(obj): semaphore.acquire() + result: str = None obj.convert_start = time.time() - # Erstelle und setze einen Event-Loop für diesen Thread + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - command = [ - "ffmpeg", "-y", "-i", obj.source_file, - "-map", "0:0", - "-c:v", "libsvtav1", - "-preset", "5", - "-crf", "30", - "-g", "240", - "-pix_fmt", "yuv420p10le", - "-svtav1-params", "tune=0:film-grain=8", - ] + command = ffmpeg.video_convert() - if len(obj.streams_audio): - for audio_stream in obj.streams_audio: - if audio_stream.get("tags", {}).get("language", None) in language: - command.extend([ - "-map", f"0:{audio_stream['index']}", - f"-c:a", "libopus", - f"-b:a", "320k", - f"-ac", str(audio_stream['channels']) - ]) - - # Subtitle-Streams einbinden - if len(obj.streams_subtitle): - for subtitle_stream in obj.streams_subtitle: - if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist: - if subtitle_stream.get("tags", {}).get("language", None) in language: - command.extend([ - "-map", f"0:{subtitle_stream['index']}", - ]) - command.append(obj.output_file) logging.info(f"{command}") loop.run_until_complete(queue.put(video_list())) @@ -173,17 +136,19 @@ def video_convert(obj): if obj.process.poll() == 0: obj.finished = 1 - logging.info(f"Process Finished({obj.process.returncode}): {obj.file_name}") - loop.run_until_complete(queue.put(obj.to_dict())) - + result = "Finished" elif obj.process.poll() != 0: obj.finished = 2 - logging.info(f"Process Failure({obj.process.returncode}): {obj.file_name}") - loop.run_until_complete(queue.put(obj.to_dict())) + result = "Failure" + logging.info(f"Process {result}({obj.process.returncode}): {obj.file_name}") + + loop.run_until_complete(queue.put(obj.to_dict())) active_process.discard(obj) active_tasks.discard(obj) + obj.convert_end = time.time() + semaphore.release() except Exception as e: @@ -193,30 +158,24 @@ def video_convert(obj): #UviCorn WebServer Teil #---------------------------------------------------------------------------------------------------------------------- -def read_output(qu): +def read_output(): """Thread-Funktion, die Prozessausgaben in die Queue legt.""" global active_process - # Erstelle und setze einen Event-Loop für diesen Thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - while True: if not len(active_process): time.sleep(5) continue for obj in list(active_process): - line_error = obj.process.stderr.read(1024) + line_error = obj.process.stderr.read(1024).decode() - line_error_decoded = line_error.decode() - - logging.info(f"Data ffmpeg: {line_error_decoded}") - obj.extract_convert_data(line_error_decoded) + logging.info(f"Data ffmpeg: {line_error}") + obj.extract_convert_data(line_error) # Verwende den Event-Loop, um die Daten in die asyncio Queue zu legen logging.info(f"Data Packet created: {obj.to_dict()}") - loop.run_until_complete(qu.put(obj.to_dict())) + queue.put(obj.to_dict()) def video_list(): vlist = [] @@ -244,40 +203,45 @@ async def display_paths(request: Request): @app.websocket("/ws") async def websocket_v(websocket: WebSocket): """WebSocket-Verbindung für die Kommunikation mit Clients.""" - global read_output_task, connected_clients + global thread_output, clients - connected_clients.add(websocket) + clients.add(websocket) await websocket.accept() - if read_output_task is None: + if thread_output is None: # Startet den Thread zum Verarbeiten der Prozessausgaben - read_output_task = threading.Thread(target=read_output, args=(queue,)) - read_output_task.start() + thread_output = threading.Thread(target=read_output, args=(queue,)) + thread_output.start() try: var_first_sending = 0 + while True: - if websocket not in connected_clients: - break - - message = await queue.get() # Warten auf neue Nachricht aus der Queue - await websocket.send_text(message) if not var_first_sending: - await queue.put(video_list()) + queue.put(video_list()) var_first_sending = 1 + if websocket not in clients: + break + + message = queue.get() # Warten auf neue Nachricht aus der Queue + + for client in clients: + await client.send_text(message) + + except WebSocketDisconnect: logging.info("WebSocket disconnected") except Exception as e: logging.error(f"WebSocket error: {e}") finally: - connected_clients.discard(websocket) + clients.discard(websocket) if websocket.client_state == WebSocketState.CONNECTED: await websocket.close() @app.get("/clients") async def get_clients_count(): - return {"active_clients": len(connected_clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)} + return {"active_clients": len(clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)} if __name__ == "__main__": uvicorn.run("app.main:app", host="127.0.0.1", port=8000, reload=False) \ No newline at end of file diff --git a/app/video_class.py b/app/video_class.py index 2d75223..a2aec00 100644 --- a/app/video_class.py +++ b/app/video_class.py @@ -40,7 +40,7 @@ class Video: self.size: int = 0 self.time: int = 0 self.bitrate: int = 0 - self.speed: int = 0 + self.speed: float = 0 self.loading: int = 0 # Process @@ -124,27 +124,36 @@ class Video: return obj_vars def extract_convert_data(self, line_decoded): + # Frames frame = re.findall(r"frame=\s*(\d+)", line_decoded) - self.frame = int(frame[0]) if frame else 0 + if frame and int(frame[0]) > self.frame: + self.frame = int(frame[0]) + # FPS fps = re.findall(r"fps=\s*(\d+)", line_decoded) self.fps = int(fps[0]) if fps else 0 + # Quantizer q = re.findall(r"q=\s*(\d+.\d+)", line_decoded) - self.q = q[0] if q else 0 + self.q = int([0]) if q else 0 + # File Size size = re.findall(r"size=\s*(\d+)", line_decoded) - self.size = self.convert_kb_mb(size[0]) if size else 0 + if size and self.convert_kb_mb(size[0]) > self.size: + self.size = self.convert_kb_mb(size[0]) + # Time time = re.findall(r"time=\s*(\d+:\d+:\d+)", line_decoded) time_v = time[0] if time else "00:00:00" self.time = self.time_in_sec(time_v) + # Bitrate bitrate = re.findall(r"bitrate=\s*(\d+)", line_decoded) self.bitrate = self.convert_kb_mb(bitrate[0]) if bitrate else 0 + # Speed speed = re.findall(r"speed=\s*(\d+\.\d+)", line_decoded) - self.speed = speed[0] if speed else 0 + self.speed = float(speed[0]) if speed else 0 @staticmethod def time_in_sec(time_str): @@ -153,9 +162,7 @@ class Video: if len(hs_ms_s[0]) >= 3: if hs_ms_s[0][0].isdigit() and hs_ms_s[0][1].isdigit() and hs_ms_s[0][2].isdigit(): try: - time = int(hs_ms_s[0][0]) * 60 + int(hs_ms_s[0][1]) * 60 + int(hs_ms_s[0][2]) - print(time) - return time + return int(hs_ms_s[0][0]) * 60 + int(hs_ms_s[0][1]) * 60 + int(hs_ms_s[0][2]) except ValueError as e: logging.error(f"Wert: {time_str} Fehler: {e}") return 0