import json import logging import os import re import math from collections import deque class Video: def __init__(self, path, video_streams, video_format, audio_streams, subtitle_streams): self.id = id(self) self.source_file = path self.file_name = os.path.basename(self.source_file) self.duration = self.time_in_sec(video_streams[0]["tags"].get("DURATION" or "duration", "00:00:00")) self.frame_rate = self.frame_rate(video_streams) self.frames_max = self.frame_rate * self.duration self.output_file = f"{path.rsplit(".", 1)[0]}.webm" self.convert_start = 0 self.convert_end = 0 self.time_estimated = 0 self.time_deque = deque(maxlen=20) self.time_remaining = 0 self.finished = 0 # Video / Audio Daten self.streams_video = video_streams self.streams_audio = audio_streams self.streams_subtitle = subtitle_streams self.format = [video_format] # Datenpaket self.frame = 0 self.fps = 0 self.q = 0 self.size = 0 self.time = 0 self.bitrate = 0 self.speed = 0 self.loading = 0 self.count_empty_data = 0 # Process self.task = None self.process = None def __str__(self): def stream_output(stream_list): count = 1 string = "" for video_stream in stream_list: string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get("codec_type") else "Format" for key, value in video_stream.items(): string += f" -- {key}: {value}" string += "\n" count += 1 return string # Ausgabe output_string = f"\n{self.source_file}\n" output_string += "------------------------------------\n" output_string += stream_output(self.format) output_string += "------------------------------------\n" output_string += stream_output(self.streams_video) output_string += "------------------------------------\n" output_string += stream_output(self.streams_audio) output_string += "------------------------------------\n" output_string += stream_output(self.streams_subtitle) output_string += "------------------------------------\n" output_string += f"{self.output_file}\n" output_string += "------------------------------------\n" output_string += f"{self.id} -- {self.finished} -- {self.task} -- {self.process}" output_string += "\n************************************\n" return output_string def to_dict(self): time_estimated = ((self.frames_max - self.frame) / self.frame_rate) if self.fps > 0: self.time_remaining = self.format_time(((self.frames_max - self.frame) / self.fps)) elif self.fps == 0: self.time_remaining = "..." self.calc_loading() self.time_deque.append(self.time) self.time_estimated = self.duration - time_estimated data_packet = { "source": os.path.basename(self.source_file), "target": os.path.basename(self.output_file), "path": os.path.dirname(self.source_file), "id": self.id, "frame": self.frame, "fps": self.fps, "q": self.q, "size": self.size, "time": self.time, "bitrate": self.bitrate, "speed": self.speed, "finished": self.finished, "duration": self.duration, "loading": self.loading, "convert_start": self.convert_start, "time_remaining": self.time_remaining } return json.dumps({"data_flow":data_packet}) def get_vars(self): return json.dumps({"obj_list":vars(self)}) def extract_convert_data(self, line_decoded): frame = re.findall(r"frame=\s*(\d+)", line_decoded) self.frame = int(frame[0]) if frame else 0 fps = re.findall(r"fps=\s*(\d+)", line_decoded) self.fps = int(fps[0]) if fps else 0 q = re.findall(r"q=\s*(\d+.\d+)", line_decoded) self.q = q[0] if q else 0 size = re.findall(r"size=\s*(\d+)", line_decoded) self.size = self.convert_kb_mb(size[0]) if size else 0 time = re.findall(r"time=\s*(\d+:\d+:\d+)", line_decoded) time_v = time[0] if time else "00:00:00" self.time = self.time_in_sec(time_v) bitrate = re.findall(r"bitrate=\s*(\d+)", line_decoded) self.bitrate = self.convert_kb_mb(bitrate[0]) if bitrate else 0 speed = re.findall(r"speed=\s*(\d+\.\d+)", line_decoded) self.speed = speed[0] if speed else 0 @staticmethod def time_in_sec(time_str): hs_ms_s = re.findall(r"\s*(\d+):(\d+):(\d+)", time_str) if len(hs_ms_s) > 0: if len(hs_ms_s[0]) >= 3: if hs_ms_s[0][0].isdigit() and hs_ms_s[0][1].isdigit() and hs_ms_s[0][2].isdigit(): try: time = int(hs_ms_s[0][0]) * 60 + int(hs_ms_s[0][1]) * 60 + int(hs_ms_s[0][2]) print(time) return time except ValueError as e: logging.error(f"Wert: {time_str} Fehler: {e}") return 0 def calc_loading(self): if self.duration.is_integer(): if all(x == self.time_deque[0] for x in self.time_deque): loading = round(self.time_estimated / self.duration * 100) if loading > self.loading: self.loading = loading else: loading = round(self.time / self.duration * 100) if loading > self.loading: self.loading = loading @staticmethod def convert_kb_mb(digits): if digits.isdigit(): return round(int(digits) / 1024, 2) else: return 0 @staticmethod def frame_rate(video_streams): var_frame_rate = video_streams[0].get("r_frame_rate", "0/0").split("/") if int(var_frame_rate[1]) > 0: int_frame_rate = int(var_frame_rate[0]) / int(var_frame_rate[1]) else: int_frame_rate = 0 return int_frame_rate @staticmethod def format_time(seconds): # Berechne die Anzahl der Tage, Stunden, Minuten und Sekunden days = round(seconds // (24 * 3600)) # 1 Tag = 24 Stunden * 3600 Sekunden seconds %= (24 * 3600) # Restliche Sekunden nach Tagen if days: d = f"{days} Tage" else: d = "" hours = round(seconds // 3600) # 1 Stunde = 3600 Sekunden seconds %= 3600 # Restliche Sekunden nach Stunden if hours: h = f"{hours} Std" else: h = "" minutes = math.ceil(seconds // 60) # 1 Minute = 60 Sekunden seconds %= 60 # Restliche Sekunden nach Minuten if minutes: m = f"{minutes} Min" else: m = "" return f"{d} {h} {m}"