import logging import asyncio import time import traceback from typing import TYPE_CHECKING from app.class_file_convert_read_out import Process from app.class_media_file_stat import Stat if TYPE_CHECKING: from app.class_media_file import Media class Convert: def __init__(self, websocket, cfg, obj_path): self.yaml = cfg self.obj_path = obj_path self.obj_websocket = websocket self.active_tasks = set() self.active_process = set() async def snake_waiting(self): while True: if len(self.active_tasks) < self.yaml["task_max"] and self.obj_path.count_paths(None) > 0: for obj_id, obj in list(self.obj_path.paths.items()): if obj.status is None: obj.task = asyncio.create_task(self.convert_video(obj)) self.active_tasks.add(obj) logging.info(f"Warteschlange started Auftrag - {obj.task}") obj.status = 3 if len(self.active_tasks) >= self.yaml["task_max"]: break if len(self.active_tasks) > 0: logging.info(f"{len(self.active_tasks)} is active.") await asyncio.sleep(500) continue if self.obj_path.count_paths(None) == 0 and len(self.active_tasks) == 0: break print(self.obj_path.paths) async def convert_video(self, obj): """Startet die Videokonvertierung asynchron.""" obj_process = Process(self.obj_websocket) obj_stat = Stat() obj.convert_start = time.time() command = self.convert_cmd(obj) result = None logging.info(f"Starte Konvertierung: {command}") await self.obj_websocket.send_websocket(self.obj_path.active_path_to_dict()) await self.obj_websocket.send_websocket(self.obj_path.queue_path_to_dict()) try: # Starte den Subprozess asynchron obj.process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.active_process.add(obj) obj.process_start = time.time() await obj_process.read_out(obj) await obj.process.wait() # Prself.obj_websocket.send_websocket(self.obj_path.active_path_to_dict())ozess beendet, Status auswerten if obj.process.returncode == 0: obj.status = 0 result = "Finished" else: obj.status = 1 result = "Failure" except Exception as e: obj.status = 2 logging.error(f"Fehler in video_convert(): {e}") logging.error(traceback.format_exc()) finally: logging.info(f"Prozess {result}({obj.process.returncode}): {obj.source_file_name}") await self.obj_websocket.send_websocket(self.obj_path.active_path_to_dict()) await self.obj_websocket.send_websocket(self.obj_path.queue_path_to_dict()) self.active_process.discard(obj) self.active_tasks.discard(obj) self.obj_path.save_paths() obj.convert_end = time.time() obj_stat.save_stat(obj) def convert_cmd(self, obj): command_convert = [ "ffmpeg", "-y", "-i", obj.source_file, # "-init_hw_device", "vaapi=va:/dev/dri/renderD128", "-map", "0:0", "-c:v", "libsvtav1", #"-c:v", "av1_qsv", "-preset", "5", "-crf", "30", "-g", "240", "-pix_fmt", "yuv420p10le", "-svtav1-params", "tune=0:film-grain=8", ] if len(obj.streams_audio): for audio_stream in obj.streams_audio: if audio_stream.get("tags", {}).get("language", None) in self.yaml["audio"]["language"]: command_convert.extend([ "-map", f"0:{audio_stream['index']}", f"-c:a", "libopus", f"-b:a", "320k", f"-ac", str(audio_stream['channels']) ]) # Subtitle-Streams einbinden if len(obj.streams_subtitle): for subtitle_stream in obj.streams_subtitle: if subtitle_stream.get("codec_name") not in self.yaml["subtitle"]["blacklist"]: if subtitle_stream.get("tags", {}).get("language", None) in self.yaml["subtitle"]["language"]: command_convert.extend([ "-map", f"0:{subtitle_stream['index']}", ]) command_convert.append(obj.target_file) obj.cmd = command_convert return command_convert def snake_update(self): print(self.obj_path.paths)