diff --git a/__main__.py b/__main__.py index ca2ad3d..6e82795 100644 --- a/__main__.py +++ b/__main__.py @@ -1,4 +1,12 @@ +import asyncio +import logging + from app.main_server import Server if __name__ == "__main__": - obj_server = Server() \ No newline at end of file + obj_server = Server() + + try: + asyncio.run(obj_server.start()) + except Exception as e: + logging.critical(f"Global error: {e}") \ No newline at end of file diff --git a/app/cfg/settings.yaml b/app/cfg/settings.yaml index 3f2a751..3dc7125 100644 --- a/app/cfg/settings.yaml +++ b/app/cfg/settings.yaml @@ -1,5 +1,18 @@ log_file: "server.log" -log_level: "DEBUG" +log_level: DEBUG +path_file: "media_path.yaml" server_ip: "0.0.0.0" server_port: 8000 -task_max: 1 \ No newline at end of file +task_max: 1 +autostart: true +subtitle: + language: + - ger + - eng + blacklist: + - hdmv_pgs_subtitle + - dvd_subtitle +audio: + language: + - ger + - eng \ No newline at end of file diff --git a/app/class_file_convert.py b/app/class_file_convert.py new file mode 100644 index 0000000..db17247 --- /dev/null +++ b/app/class_file_convert.py @@ -0,0 +1,110 @@ +import logging +import asyncio +import time +from app.class_file_convert_read_out import Process + +class Convert: + def __init__(self, websocket, cfg, obj_path): + self.yaml = cfg + self.obj_path = obj_path + self.obj_websocket = websocket + self.obj_process = Process(websocket) + + self.semaphore = asyncio.Semaphore(self.yaml["task_max"]) + self.active_tasks = set() + self.active_process = set() + + logging.info("Video Convert Start here") + + async def snake_waiting(self): + async with self.semaphore: + for obj_id, obj in list(self.obj_path.paths.items()): + if obj.status is None: + obj.task = asyncio.create_task(self.convert_video(obj)) + self.active_tasks.add(obj) + logging.info(f"Warteschlange started Auftrag - {obj.task}") + + print(self.obj_path.paths) + + async def convert_video(self, obj): + """Startet die Videokonvertierung asynchron.""" + + obj.convert_start = time.time() + command = self.convert_cmd(obj) + + logging.info(f"Starte Konvertierung: {command}") + + try: + # Starte den Subprozess asynchron + obj.process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + self.active_process.add(obj) + obj.process_start = time.time() + + await self.obj_process.read_out(obj) + await obj.process.wait() + + # Prozess beendet, Status auswerten + if obj.process.returncode == 0: + obj.status = 0 + result = "Finished" + else: + obj.status = 1 + result = "Failure" + + # Statistik + #obj_stat = Sc() + #obj_stat.save_stat(obj.get_vars()) + + except Exception as e: + obj.status = 2 + logging.error(f"Fehler in video_convert(): {e}") + finally: + logging.info(f"Prozess {result}({obj.process.returncode}): {obj.source_file_name}") + await self.obj_websocket.send_websocket(obj.to_dict()) + + self.active_process.discard(obj) + self.active_tasks.discard(obj) + + obj.convert_end = time.time() + + def convert_cmd(self, obj): + command_convert = [ + "ffmpeg", "-y", "-i", obj.source_file, + "-map", "0:0", + "-c:v", "libsvtav1", + "-preset", "5", + "-crf", "30", + "-g", "240", + "-pix_fmt", "yuv420p10le", + "-svtav1-params", "tune=0:film-grain=8", + ] + + if len(obj.streams_audio): + for audio_stream in obj.streams_audio: + if audio_stream.get("tags", {}).get("language", None) in self.yaml["audio"]["language"]: + command_convert.extend([ + "-map", f"0:{audio_stream['index']}", + f"-c:a", "libopus", + f"-b:a", "320k", + f"-ac", str(audio_stream['channels']) + ]) + + # Subtitle-Streams einbinden + if len(obj.streams_subtitle): + for subtitle_stream in obj.streams_subtitle: + if subtitle_stream.get("codec_name") not in self.yaml["subtitle"]["blacklist"]: + if subtitle_stream.get("tags", {}).get("language", None) in self.yaml["subtitle"]["language"]: + command_convert.extend([ + "-map", f"0:{subtitle_stream['index']}", + ]) + command_convert.append(obj.target_file) + + return command_convert + + def snake_update(self): + print(self.obj_path.paths) \ No newline at end of file diff --git a/app/class_file_convert_read_out.py b/app/class_file_convert_read_out.py new file mode 100644 index 0000000..4f18a9b --- /dev/null +++ b/app/class_file_convert_read_out.py @@ -0,0 +1,129 @@ +import re + +class Process: + def __init__(self, obj_websocket): + self.obj_websocket = obj_websocket + self.line_empty = 0 + + # Data + self.id = None + self.fps: float = 0.0 + self.speed: float = 0.0 + + self.quantizer: int = 0 + self.bitrate: list = [0, "kbits/s"] + self.size: list = [0, "KiB"] + + self.time: int = 0 + self.frames: int = 0 + + + async def read_out(self, obj): + self.id = obj.id + + while True: + line = await obj.process.stderr.read(1024) + line_decoded = line.decode() + + self.process_line_extract(obj, line_decoded) + + await self.obj_websocket.send_websocket(obj.to_dict()) + + if self.line_empty > 20: + break + elif not line: + self.line_empty += 1 + elif line: + self.line_empty = 0 + + def process_line_extract(self, obj, line:str): + # FPS + fps = re.findall(r"fps=\s*(\d+.\d*)", line) + self.fps = float(fps[0]) if fps else 0 + obj.stat_fps = [obj.stats_fps[0] + self.fps, obj.stats_fps[1] + 1] + + # Quantizer + q = re.findall(r"q=\s*(\d+).\d+", line) + self.quantizer = int(q[0]) if q else 0 + obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1] + + # Bitrate + bitrate = re.findall(r"bitrate=\s*(\d+)", line) + self.bitrate[0] = int(bitrate[0]) if bitrate else 0 + obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate, obj.stat_bitrate[1] + 1] + + # Speed + speed = re.findall(r"speed=\s*(\d+\.\d+)", line) + self.speed = float(speed[0]) if speed else 0 + obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1] + + # File Size + size = re.findall(r"size=\s*(\d+)", line) + if size and int(size[0]) > self.size[0]: + self.size[0] = int(size[0]) + obj.process_size = self.size + + # Time + media_time = re.findall(r"time=\s*(\d+:\d+:\d+)", line) + time_v = media_time[0] if media_time else "00:00:00" + if self.time < self.time_in_sec(time_v): + self.time = self.time_in_sec(time_v) + obj.process_time = self.time + + # Frames + frame = re.findall(r"frame=\s*(\d+)", line) + if frame and int(frame[0]) > self.frames: + self.frames = int(frame[0]) + obj.process_frames = self.frames + + def to_dict(self): + return {"data_flow": {self.id: { + "frames": self.frames, + "fps": self.fps, + "quantizer": self.quantizer, + "size": self.size, + "time": self.time, + "bitrate": self.bitrate, + "speed": self.speed + }}} + + # Data convert + @staticmethod + def time_in_sec(time_str: str) -> int: + h, m, s = map(int, time_str.split(":")) + return h * 3600 + m * 60 + s + + @staticmethod + def size_convert(source: str, target, unit: str, size=0): + list_unit: list = [] + + if unit == "storage": + list_unit = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] + elif unit == "data_rate": + list_unit = ["bps", "Kbps", "Mbps", "Gbps", "Tbps", "Pbps"] + elif unit == "binary_data_rate": + list_unit = ["b/s", "Kib/s", "Mib/s", "Gib/s", "Tib/s", "Pib/s"] + + factor = 1024 # Binäre Umrechnung + + if source not in list_unit: + raise ValueError("Ungültige Quell-Einheit!") + + source_index = list_unit.index(source) + + if target: + if target not in list_unit: + raise ValueError("Ungültige Ziel-Einheit!") + target_index = list_unit.index(target) + + if source_index < target_index: + return size / (factor ** (target_index - source_index)), target + else: + return size * (factor ** (source_index - target_index)), target + + # Automatische Umrechnung + while size >= 1000 and source_index < len(list_unit) - 1: + size /= factor + source_index += 1 + + return [round(size,1), list_unit[source_index]] diff --git a/app/class_file_path.py b/app/class_file_path.py new file mode 100644 index 0000000..ae5f6d2 --- /dev/null +++ b/app/class_file_path.py @@ -0,0 +1,155 @@ +import logging +import re +import os +import json +import yaml +import subprocess +from app.class_media_file import Media + + +class Path: + def __init__(self, cfg): + """ + Receive Paths extract from String and save to a List Variable. + Methoden: save_paths / read_paths / delete_path / receive_path + + :param cfg: Global Settings Object + """ + + # Settings + self.yaml = cfg + + # Autostart + # self.read_paths() + + # Variablen + self.paths:dict = {} + + def save_paths(self): + """ + Saves the extrated Paths in a File + :return: True or False + """ + paths_extrat_dict:dict = {} + + for obj_id, obj in self.paths.items(): + paths_extrat_dict.update({id: obj.__dict__}) + + try: + with open(f"app/cfg/{self.yaml["path_file"]}", "w", encoding="utf8") as file: + yaml.dump(paths_extrat_dict, file, default_flow_style=False, indent=4) + logging.info(f"{len(self.paths)} paths were saved to file") + return 1 + except FileNotFoundError: + logging.error(f"File {self.yaml["path_file"]} not found") + return 0 + except IOError: + logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged") + return 0 + + # Achtung Abrufen aus Datei und neu einlesen der ffprobe Daten fehlt noch + def read_paths(self): + """ + Read Media Paths from a File + :return: True or False + """ + try: + with open(f"app/cfg/{self.yaml["path_file"]}", "r", encoding="utf8") as file: + list_paths = yaml.safe_load(file) + if len(list_paths) > 0: + paths = list_paths + + logging.info(f"{len(paths)} paths were read from file") + return 1 + except FileNotFoundError: + logging.error(f"File {self.yaml["path_file"]} not found") + return 0 + except IOError: + logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged") + return 0 + + def delete_path(self, obj_id:int): + """ + Delete a Media Path from a List Variable and overwrite the path file + :param obj_id: Path from a Media File which to delete + :return: True or False + """ + + try: + del self.paths[obj_id] + self.save_paths() + return 1 + except ValueError: + return 0 + + def receive_paths(self, var_paths:str): + """ + Splitting the Path String to List Single Media Paths + :param var_paths: String of a single or more Media Paths + :return: True or False + """ + pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" + paths = re.split(pattern, var_paths) + + print(paths) + + for path in paths: + self.get_with_ffprobe(path) + + return 1 + + @staticmethod + def extract_media_info(path:str, select:str): + """ + Erstellt ffprobe command for selected Streams + :param path: path to media file + :param select: v (for video), a (for audio), s (for subtitle) + :return: + """ + command = [ + "ffprobe", "-v", + "error", + "-select_streams", + f"{select}", + "-show_entries", + "stream=index,channels,codec_name,codec_type,pix_fmt,level," + "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", + "-show_entries", + "format=size,bit_rate,nb_streams", + "-of", "json", + path + ] + + result = subprocess.run(command, stdout=subprocess.PIPE, text=True) + json_data = json.loads(result.stdout) + + if select == "v": + if json_data.get("format") is not list: + list_format = [json_data.get("format", "")] + else: + list_format = json_data.get("format") + + return json_data.get("streams", []), list_format + elif select == "a": + return json_data.get("streams", []) + elif select == "s": + return json_data.get("streams", []) + + def get_with_ffprobe(self, path:str): + try: + path = path.strip() + + if os.path.exists(path): + streams_video, streams_format = self.extract_media_info(path, "v") + streams_audio = self.extract_media_info(path, "a") + streams_subtitle = self.extract_media_info(path, "s") + + obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format) + self.paths.update({obj.id: obj}) + + logging.info(obj) + + return 1 + except Exception as e: + logging.error(f"Get Video Information: {e}") + return 0 \ No newline at end of file diff --git a/app/class_media_file.py b/app/class_media_file.py new file mode 100644 index 0000000..9993a4d --- /dev/null +++ b/app/class_media_file.py @@ -0,0 +1,80 @@ +import os + + +class Media: + _id_counter = 0 + + def __init__(self, path, streams_video, streams_audio, streams_subtitle, streams_format): + # misc + self.id = Media._id_counter + # source + self.source_file: str = path + self.source_file_name: str = os.path.basename(self.source_file) + self.source_size: int = 0 + self.source_frames: int = 0 + self.source_time: int = 0 + + # target + self.target_file: str = f"{path.rsplit(".", 1)[0]}.webm" + self.target_size: int = 0 + + # process + self.status = None + self.process_start: int = 0 + self.process_end: int = 0 + self.process_time: int = 0 + self.process_size: int = 0 + self.process_frames: int = 0 + + # statistic + self.stat_fps: list = [0, 0] + self.stat_bitrate: list = [0, 0] + self.stat_quantizer: list = [0, 0] + self.stat_speed: list = [0, 0] + + # raw + self.streams_video = streams_video + self.streams_audio = streams_audio + self.streams_subtitle = streams_subtitle + self.streams_format = streams_format + + # -------------------------------------------------------------------------------------------------------------- + + Media._id_counter += 1 + + def __str__(self): + def stream_output(stream_list): + count = 1 + string = "" + for video_stream in stream_list: + string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get( + "codec_type") else "Format" + for key, value in video_stream.items(): + string += f" -- {key}: {value}" + + string += "\n" + count += 1 + + return string + + # Ausgabe + output_string = f"\n{self.source_file}\n" + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_format) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_video) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_audio) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_subtitle) + output_string += "------------------------------------\n" + output_string += f"{self.target_file}\n" + output_string += "------------------------------------\n" + output_string += f"{self.id} -- {self.status}" + output_string += "\n************************************\n" + + return output_string + + @staticmethod + def to_dict(): + return "Fertig mit der Welt" \ No newline at end of file diff --git a/app/main_server.py b/app/main_server.py index 15dd5f2..bf823a3 100644 --- a/app/main_server.py +++ b/app/main_server.py @@ -1,23 +1,33 @@ import websockets -import asyncio import json +import logging from app.class_settings import Settings +from app.class_file_path import Path +from app.class_file_convert import Convert + +var_convert_active = 0 class Server: def __init__(self): + self.websocket = None obj_settings = Settings() obj_settings.set_logging() self.yaml = obj_settings.yaml + self.obj_path = Path(self.yaml) + self.obj_convert = Convert(self, self.yaml, self.obj_path) + async def send_websocket(self, message): try: - asyncio.run(self.start()) - except KeyboardInterrupt: - print("Server sagt: Server wurde beendet.") + await self.websocket.send(message) + except websockets.exceptions.ConnectionClosed: + logging.warning("No websocket client connected!") + + async def handle_client(self, websocket): + self.websocket = websocket + global var_convert_active - @staticmethod - async def handle_client(websocket): print("Server sagt: Client verbunden") try: async for message in websocket: @@ -26,7 +36,14 @@ class Server: data = json.loads(message) if data.get("data_path"): - pass + self.obj_path.receive_paths(data.get("data_path")) + + if var_convert_active == 0 and self.yaml['autostart']: + await self.obj_convert.snake_waiting() + var_convert_active = 1 + else: + self.obj_convert.snake_update() + elif data.get("data_command"): pass