From 456852399ef1756abe26f0b1a0dac4fb7d00ad6c Mon Sep 17 00:00:00 2001 From: data Date: Mon, 31 Mar 2025 20:12:06 +0200 Subject: [PATCH] =?UTF-8?q?Convert,=20Read=20Out=20und=20die=20Warteschlan?= =?UTF-8?q?ge=20=C3=BCberarbeitet.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- __main__.py | 2 + app/class_file_convert.py | 28 ++++++++---- app/class_file_convert_read_out.py | 52 ++++++++++++++++------- app/class_file_path.py | 30 ++++++++++--- app/class_media_file.py | 68 ++++++++++++++++++++++++++++-- app/main_server.py | 19 +++++++-- 6 files changed, 163 insertions(+), 36 deletions(-) diff --git a/__main__.py b/__main__.py index 6e82795..fec2766 100644 --- a/__main__.py +++ b/__main__.py @@ -8,5 +8,7 @@ if __name__ == "__main__": try: asyncio.run(obj_server.start()) + except asyncio.exceptions.CancelledError: + pass except Exception as e: logging.critical(f"Global error: {e}") \ No newline at end of file diff --git a/app/class_file_convert.py b/app/class_file_convert.py index db17247..351e4e3 100644 --- a/app/class_file_convert.py +++ b/app/class_file_convert.py @@ -10,19 +10,30 @@ class Convert: self.obj_websocket = websocket self.obj_process = Process(websocket) - self.semaphore = asyncio.Semaphore(self.yaml["task_max"]) self.active_tasks = set() self.active_process = set() logging.info("Video Convert Start here") async def snake_waiting(self): - async with self.semaphore: - for obj_id, obj in list(self.obj_path.paths.items()): - if obj.status is None: - obj.task = asyncio.create_task(self.convert_video(obj)) - self.active_tasks.add(obj) - logging.info(f"Warteschlange started Auftrag - {obj.task}") + while True: + if len(self.active_tasks) < self.yaml["task_max"] and self.obj_path.count_paths(None) > 0: + for obj_id, obj in list(self.obj_path.paths.items()): + if obj.status is None: + obj.task = asyncio.create_task(self.convert_video(obj)) + self.active_tasks.add(obj) + logging.info(f"Warteschlange started Auftrag - {obj.task}") + + if len(self.active_tasks) >= self.yaml["task_max"]: + break + + if len(self.active_tasks) >= 0: + logging.info(f"{len(self.active_tasks)} is active.") + await asyncio.sleep(600) + continue + + if self.obj_path.count_paths(None) == 0 and len(self.active_tasks) == 0: + break print(self.obj_path.paths) @@ -31,6 +42,7 @@ class Convert: obj.convert_start = time.time() command = self.convert_cmd(obj) + result = None logging.info(f"Starte Konvertierung: {command}") @@ -62,7 +74,7 @@ class Convert: except Exception as e: obj.status = 2 - logging.error(f"Fehler in video_convert(): {e}") + logging.error(f"Fehler in video_convert(): {e}", exc_info=True) finally: logging.info(f"Prozess {result}({obj.process.returncode}): {obj.source_file_name}") await self.obj_websocket.send_websocket(obj.to_dict()) diff --git a/app/class_file_convert_read_out.py b/app/class_file_convert_read_out.py index 4f18a9b..1f4048f 100644 --- a/app/class_file_convert_read_out.py +++ b/app/class_file_convert_read_out.py @@ -1,4 +1,6 @@ +import logging import re +import asyncio class Process: def __init__(self, obj_websocket): @@ -20,6 +22,7 @@ class Process: async def read_out(self, obj): self.id = obj.id + i = 100 while True: line = await obj.process.stderr.read(1024) @@ -27,35 +30,45 @@ class Process: self.process_line_extract(obj, line_decoded) - await self.obj_websocket.send_websocket(obj.to_dict()) + # in json umwandeln + #await self.obj_websocket.send_websocket(self.to_dict()) - if self.line_empty > 20: + if self.line_empty > 30: break elif not line: self.line_empty += 1 + await asyncio.sleep(2) + continue elif line: self.line_empty = 0 + if i == 100 or i == 200 or i == 300 or i == 400 or i == 500: + logging.info(self.to_dict()) + self.save_stat_value(obj) + elif i == 101 or i == 501: + i = 0 + time = obj.format_time(obj.time_remaining()) + if time != " ": + logging.info(f"Time remaining: {time}") + + i += 1 + def process_line_extract(self, obj, line:str): # FPS fps = re.findall(r"fps=\s*(\d+.\d*)", line) - self.fps = float(fps[0]) if fps else 0 - obj.stat_fps = [obj.stats_fps[0] + self.fps, obj.stats_fps[1] + 1] + self.fps = float(fps[0]) if fps else 0.0 # Quantizer q = re.findall(r"q=\s*(\d+).\d+", line) self.quantizer = int(q[0]) if q else 0 - obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1] # Bitrate bitrate = re.findall(r"bitrate=\s*(\d+)", line) self.bitrate[0] = int(bitrate[0]) if bitrate else 0 - obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate, obj.stat_bitrate[1] + 1] # Speed speed = re.findall(r"speed=\s*(\d+\.\d+)", line) - self.speed = float(speed[0]) if speed else 0 - obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1] + self.speed = float(speed[0]) if speed else 0.0 # File Size size = re.findall(r"size=\s*(\d+)", line) @@ -66,8 +79,8 @@ class Process: # Time media_time = re.findall(r"time=\s*(\d+:\d+:\d+)", line) time_v = media_time[0] if media_time else "00:00:00" - if self.time < self.time_in_sec(time_v): - self.time = self.time_in_sec(time_v) + if self.time < obj.time_in_sec(time_v): + self.time = obj.time_in_sec(time_v) obj.process_time = self.time # Frames @@ -76,6 +89,19 @@ class Process: self.frames = int(frame[0]) obj.process_frames = self.frames + def save_stat_value(self, obj): + if self.fps: + obj.stat_fps = [obj.stat_fps[0] + self.fps, obj.stat_fps[1] + 1] + + if self.quantizer: + obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1] + + if self.bitrate[0]: + obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate[0], obj.stat_bitrate[1] + 1] + + if self.speed: + obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1] + def to_dict(self): return {"data_flow": {self.id: { "frames": self.frames, @@ -87,12 +113,6 @@ class Process: "speed": self.speed }}} - # Data convert - @staticmethod - def time_in_sec(time_str: str) -> int: - h, m, s = map(int, time_str.split(":")) - return h * 3600 + m * 60 + s - @staticmethod def size_convert(source: str, target, unit: str, size=0): list_unit: list = [] diff --git a/app/class_file_path.py b/app/class_file_path.py index ae5f6d2..c11732f 100644 --- a/app/class_file_path.py +++ b/app/class_file_path.py @@ -145,11 +145,31 @@ class Path: streams_subtitle = self.extract_media_info(path, "s") obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format) - self.paths.update({obj.id: obj}) - - logging.info(obj) + if not self.search_paths(path): + self.paths.update({obj.id: obj}) + logging.info(obj) + else: + logging.info(f"File exists in Waiting Snake :D: {path}") return 1 except Exception as e: - logging.error(f"Get Video Information: {e}") - return 0 \ No newline at end of file + logging.error(f"Get Video Information: {e}", exc_info=True) + return 0 + + def search_paths(self, path): + for obj in self.paths.values(): + if obj.source_file == path: + return 1 + + return 0 + + def count_paths(self, status): + count = 0 + + for obj in self.paths.values(): + if obj.status == status: + count += 1 + + return count + + diff --git a/app/class_media_file.py b/app/class_media_file.py index 9993a4d..48dd70e 100644 --- a/app/class_media_file.py +++ b/app/class_media_file.py @@ -1,5 +1,5 @@ import os - +import math class Media: _id_counter = 0 @@ -10,8 +10,10 @@ class Media: # source self.source_file: str = path self.source_file_name: str = os.path.basename(self.source_file) + self.source_duration: int = self.time_in_sec(streams_video[0]["tags"].get("DURATION" or "duration", "00:00:00")) self.source_size: int = 0 - self.source_frames: int = 0 + self.source_frame_rate: int = self.frame_rate(streams_video) + self.source_frames_total: int = self.source_frame_rate * self.source_duration self.source_time: int = 0 # target @@ -25,6 +27,7 @@ class Media: self.process_time: int = 0 self.process_size: int = 0 self.process_frames: int = 0 + self.process_time_remaining: int = 0 # statistic self.stat_fps: list = [0, 0] @@ -77,4 +80,63 @@ class Media: @staticmethod def to_dict(): - return "Fertig mit der Welt" \ No newline at end of file + return "Fertig mit der Welt" + + @staticmethod + def frame_rate(video_streams): + var_frame_rate = video_streams[0].get("r_frame_rate", "0/0").split("/") + if int(var_frame_rate[1]) > 0: + int_frame_rate = round(int(var_frame_rate[0]) / int(var_frame_rate[1])) + else: + int_frame_rate = 0 + + return int_frame_rate + + def time_remaining(self): + if self.stat_fps[0] > 0: + var_time_remaining = (self.source_frames_total - self.process_frames) / (self.stat_fps[0] / self.stat_fps[1]) + self.process_time_remaining = round(var_time_remaining) + elif self.stat_fps[0] == 0: + self.process_time_remaining = 0 + + return self.process_time_remaining + + @staticmethod + def format_time(seconds): + days = round(seconds // (24 * 3600)) + seconds %= (24 * 3600) + if days: + d = (f"{days} Tag" + f"") + else: + d = "" + + hours = round(seconds // 3600) + seconds %= 3600 + if hours: + h = f"{hours} Stunden" + else: + h = "" + + minutes = math.ceil(seconds // 60) + seconds %= 60 + if minutes: + m = f"{minutes} Minuten" + else: + m = "" + + return f"{d} {h} {m}" + + # Data convert + @staticmethod + def time_in_sec(time_str: str) -> int: + parts = time_str.split(":") + + if len(parts) == 1: # Falls nur Sekunden mit Nachkommastellen vorliegen + return int(float(parts[0])) # Erst in float, dann in int umwandeln + + if len(parts) == 3: # Normales HH:MM:SS-Format + h, m, s = map(float, parts) # In float umwandeln, falls Nachkommastellen im Sekundenwert sind + return int(h * 3600 + m * 60 + s) # Alles in Sekunden umrechnen + + raise ValueError(f"Ungültiges Zeitformat: {time_str}") \ No newline at end of file diff --git a/app/main_server.py b/app/main_server.py index bf823a3..e6f6827 100644 --- a/app/main_server.py +++ b/app/main_server.py @@ -1,3 +1,5 @@ +import asyncio + import websockets import json import logging @@ -6,7 +8,7 @@ from app.class_settings import Settings from app.class_file_path import Path from app.class_file_convert import Convert -var_convert_active = 0 +var_convert_active = False class Server: def __init__(self): @@ -38,9 +40,9 @@ class Server: if data.get("data_path"): self.obj_path.receive_paths(data.get("data_path")) - if var_convert_active == 0 and self.yaml['autostart']: + if var_convert_active == False and self.yaml['autostart']: await self.obj_convert.snake_waiting() - var_convert_active = 1 + var_convert_active = True else: self.obj_convert.snake_update() @@ -49,10 +51,19 @@ class Server: #response = f"Server antwortet: {message.upper()}" #await websocket.send(response) - + except websockets.exceptions.ConnectionClosedError: + pass + except websockets.exceptions.InvalidUpgrade: + pass except websockets.exceptions.ConnectionClosed: print("Server sagt: Client getrennt") + @staticmethod + def set_var_convert_active(value: bool): + global var_convert_active + + var_convert_active = value + async def start(self): server = await websockets.serve(self.handle_client, self.yaml['server_ip'], self.yaml['server_port']) print(f"Websocket Server läuft auf IP: {self.yaml['server_ip']} Port: {self.yaml['server_port']}")