import os import math import time class Media: _id_counter = 0 def __init__(self, path, streams_video, streams_audio, streams_subtitle, streams_format): # misc self.id = Media._id_counter # source self.source_file: str = path self.source_path: str = os.path.dirname(path) self.source_file_name: str = os.path.basename(self.source_file) self.source_duration: int = self.time_in_sec(streams_video[0]["tags"].get("DURATION" or "duration", "00:00:00")) self.source_size: list = self.size_convert("B", None, "storage", int(streams_format[0].get("size"))) self.source_frame_rate: int = self.frame_rate(streams_video) self.source_frames_total: int = self.source_frame_rate * self.source_duration self.source_time: int = 0 # target self.target_file: str = f"{path.rsplit(".", 1)[0]}.webm" self.target_file_name: str = os.path.basename(self.target_file) self.target_size: int = 0 # process self.status = None self.process_start: int = 0 self.process_end: int = 0 self.process_time: int = 0 self.process_size: list = [0, "KiB"] self.process_frames: int = 0 self.process_time_remaining: int = 0 # statistic self.stat_fps: list = [0, 0] self.stat_bitrate: list = [0, 0] self.stat_quantizer: list = [0, 0] self.stat_speed: list = [0, 0] # raw self.streams_video = streams_video self.streams_audio = streams_audio self.streams_subtitle = streams_subtitle self.streams_format = streams_format # -------------------------------------------------------------------------------------------------------------- Media._id_counter += 1 def __str__(self): def stream_output(stream_list): count = 1 string = "" for video_stream in stream_list: string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get( "codec_type") else "Format" for key, value in video_stream.items(): string += f" -- {key}: {value}" string += "\n" count += 1 return string # Ausgabe output_string = f"\n{self.source_file}\n" output_string += "------------------------------------\n" output_string += stream_output(self.streams_format) output_string += "------------------------------------\n" output_string += stream_output(self.streams_video) output_string += "------------------------------------\n" output_string += stream_output(self.streams_audio) output_string += "------------------------------------\n" output_string += stream_output(self.streams_subtitle) output_string += "------------------------------------\n" output_string += f"{self.target_file}\n" output_string += "------------------------------------\n" output_string += f"{self.id} -- {self.status}" output_string += "\n************************************\n" return output_string def to_dict_active_paths(self): return { "source_file_name": self.source_file_name, "source_file": self.source_file, "source_path": self.source_path, "source_duration": self.source_duration, "source_size": self.source_size, "source_frame_rate": self.source_frame_rate, "source_frames_total": self.source_frames_total, "source_time": self.source_time, # target "target_file_name": self.target_file_name, "target_file": self.target_file, "target_size": self.target_size } def to_dict_stat(self): return {time.time(): { # source "id": self.id, "source_file_name": self.source_file_name, "source_file": self.source_file, "source_duration": self.source_duration, "source_size": self.source_size, "source_frame_rate": self.source_frame_rate, "source_frames_total": self.source_frames_total, "source_time": self.source_time, # target "target_file_name": self.target_file_name, "target_file": self.target_file, "target_size": self.target_size, # process "status": self.status, "process_start": self.process_start, "process_end": self.process_end, "process_time": self.process_time, "process_size": self.process_size, "process_frames": self.process_frames, "process_time_remaining": self.process_time_remaining, # statistic "stat_fps": self.stat_fps, "stat_bitrate": self.stat_bitrate, "stat_quantizer": self.stat_quantizer, "stat_speed": self.stat_speed }} @staticmethod def frame_rate(video_streams): var_frame_rate = video_streams[0].get("r_frame_rate", "0/0").split("/") if int(var_frame_rate[1]) > 0: int_frame_rate = round(int(var_frame_rate[0]) / int(var_frame_rate[1])) else: int_frame_rate = 0 return int_frame_rate def time_remaining(self): if self.stat_fps[0] > 0: var_time_remaining = (self.source_frames_total - self.process_frames) / (self.stat_fps[0] / self.stat_fps[1]) self.process_time_remaining = round(var_time_remaining) elif self.stat_fps[0] == 0: self.process_time_remaining = 0 return self.process_time_remaining @staticmethod def format_time(seconds:int, str_day, str_hour, str_min, str_s): days = round(seconds // (24 * 3600)) seconds %= (24 * 3600) if days and str_day is not None: d = (f"{days} {str_day}") else: d = "" hours = round(seconds // 3600) seconds %= 3600 if hours and str_hour is not None: h = f"{hours} {str_hour}" else: h = "" minutes = math.ceil(seconds // 60) seconds %= 60 if minutes and str_min is not None: m = f"{minutes} {str_min}" else: m = "" if seconds and str_s is not None: s = f"{seconds} {str_s}" else: s = "" return f"{d} {h} {m} {s}" # Data convert @staticmethod def time_in_sec(time_str: str) -> int: parts = time_str.split(":") if len(parts) == 1: # Falls nur Sekunden mit Nachkommastellen vorliegen return int(float(parts[0])) # Erst in float, dann in int umwandeln if len(parts) == 3: # Normales HH:MM:SS-Format h, m, s = map(float, parts) # In float umwandeln, falls Nachkommastellen im Sekundenwert sind return int(h * 3600 + m * 60 + s) # Alles in Sekunden umrechnen raise ValueError(f"Ungültiges Zeitformat: {time_str}") @staticmethod def size_convert(source: str, target, unit: str, size=0): list_unit: list = [] if unit == "storage": list_unit = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] elif unit == "data_rate": list_unit = ["bps", "Kbps", "Mbps", "Gbps", "Tbps", "Pbps"] elif unit == "binary_data_rate": list_unit = ["b/s", "Kib/s", "Mib/s", "Gib/s", "Tib/s", "Pib/s"] factor = 1024 # Binäre Umrechnung if source not in list_unit: raise ValueError("Ungültige Quell-Einheit!") source_index = list_unit.index(source) if target: if target not in list_unit: raise ValueError("Ungültige Ziel-Einheit!") target_index = list_unit.index(target) if source_index < target_index: return size / (factor ** (target_index - source_index)), target else: return size * (factor ** (source_index - target_index)), target # Automatische Umrechnung while size >= 1000 and source_index < len(list_unit) - 1: size /= factor source_index += 1 return [round(size, 1), list_unit[source_index]]