python.fast-api-converter/app/video_class.py

221 lines
7.3 KiB
Python

import json
import logging
import os
import re
import math
from collections import deque
class Video:
def __init__(self, path, video_streams, video_format, audio_streams, subtitle_streams):
self.id = id(self)
#Source
self.source_file = path
self.file_name = os.path.basename(self.source_file)
self.duration = self.time_in_sec(video_streams[0]["tags"].get("DURATION" or "duration", "00:00:00"))
self.frame_rate = self.frame_rate(video_streams)
self.frames_max = self.frame_rate * self.duration
self.source_file_size = video_format.get("size", 0)
# Target
self.output_file = f"{path.rsplit(".", 1)[0]}.webm"
self.convert_start = 0
self.convert_end = 0
self.time_estimated = 0
self.time_deque = deque(maxlen=20)
self.time_remaining = 0
self.finished = 0
# Video / Audio Daten
self.streams_video = video_streams
self.streams_audio = audio_streams
self.streams_subtitle = subtitle_streams
self.format = [video_format]
# Datenpaket
self.frame: int = 0
self.fps: float = 0
self.q: int = 0
self.size: int = 0
self.time: int = 0
self.bitrate: int = 0
self.speed: float = 0
self.loading: int = 0
# Process
self.task = None
self.process = None
def __str__(self):
def stream_output(stream_list):
count = 1
string = ""
for video_stream in stream_list:
string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get("codec_type") else "Format"
for key, value in video_stream.items():
string += f" -- {key}: {value}"
string += "\n"
count += 1
return string
# Ausgabe
output_string = f"\n{self.source_file}\n"
output_string += "------------------------------------\n"
output_string += stream_output(self.format)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_video)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_audio)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_subtitle)
output_string += "------------------------------------\n"
output_string += f"{self.output_file}\n"
output_string += "------------------------------------\n"
output_string += f"{self.id} -- {self.finished} -- {self.task} -- {self.process}"
output_string += "\n************************************\n"
return output_string
def to_dict(self):
time_estimated = ((self.frames_max - self.frame) / self.frame_rate)
if self.fps > 0:
self.time_remaining = self.format_time(((self.frames_max - self.frame) / self.fps))
elif self.fps == 0:
self.time_remaining = "..."
self.calc_loading()
self.time_deque.append(self.time)
self.time_estimated = self.duration - time_estimated
data_packet = {
"source": os.path.basename(self.source_file),
"target": os.path.basename(self.output_file),
"path": os.path.dirname(self.source_file),
"id": self.id,
"frame": self.frame,
"fps": self.fps,
"q": self.q,
"size": self.size,
"time": self.time,
"bitrate": self.bitrate,
"speed": self.speed,
"finished": self.finished,
"duration": self.duration,
"loading": self.loading,
"convert_start": self.convert_start,
"time_remaining": self.time_remaining
}
return json.dumps({"data_flow":data_packet})
def get_vars(self):
obj_vars = {
"id": self.id,
"file_name": self.file_name,
"duration": self.format_time(self.duration),
"source_file_size": self.convert_kb_mb(self.source_file_size),
"finished": self.finished
}
return obj_vars
def extract_convert_data(self, line_decoded):
# Frames
frame = re.findall(r"frame=\s*(\d+)", line_decoded)
if frame and int(frame[0]) > self.frame:
self.frame = int(frame[0])
# FPS
fps = re.findall(r"fps=\s*(\d+.\d*)", line_decoded)
self.fps = float(fps[0]) if fps else 0
# Quantizer
q = re.findall(r"q=\s*(\d+).\d+", line_decoded)
self.q = int(q[0]) if q else 0
# File Size
size = re.findall(r"size=\s*(\d+)", line_decoded)
if size and self.convert_kb_mb(size[0]) > self.size:
self.size = self.convert_kb_mb(size[0])
# Time
time = re.findall(r"time=\s*(\d+:\d+:\d+)", line_decoded)
time_v = time[0] if time else "00:00:00"
self.time = self.time_in_sec(time_v)
# Bitrate
bitrate = re.findall(r"bitrate=\s*(\d+)", line_decoded)
self.bitrate = self.convert_kb_mb(bitrate[0]) if bitrate else 0
# Speed
speed = re.findall(r"speed=\s*(\d+\.\d+)", line_decoded)
self.speed = float(speed[0]) if speed else 0
@staticmethod
def time_in_sec(time_str):
hs_ms_s = re.findall(r"\s*(\d+):(\d+):(\d+)", time_str)
if len(hs_ms_s) > 0:
if len(hs_ms_s[0]) >= 3:
if hs_ms_s[0][0].isdigit() and hs_ms_s[0][1].isdigit() and hs_ms_s[0][2].isdigit():
try:
return int(hs_ms_s[0][0]) * 3600 + int(hs_ms_s[0][1]) * 60 + int(hs_ms_s[0][2])
except ValueError as e:
logging.error(f"Wert: {time_str} Fehler: {e}")
return 0
def calc_loading(self):
if self.duration.is_integer():
if all(x == self.time_deque[0] for x in self.time_deque):
loading = round(self.time_estimated / self.duration * 100)
if loading > self.loading:
self.loading = loading
else:
loading = round(self.time / self.duration * 100)
if loading > self.loading:
self.loading = loading
@staticmethod
def convert_kb_mb(digits):
if digits.isdigit():
return round(int(digits) / 1024, 2)
else:
return 0
@staticmethod
def frame_rate(video_streams):
var_frame_rate = video_streams[0].get("r_frame_rate", "0/0").split("/")
if int(var_frame_rate[1]) > 0:
int_frame_rate = int(var_frame_rate[0]) / int(var_frame_rate[1])
else:
int_frame_rate = 0
return int_frame_rate
@staticmethod
def format_time(seconds):
days = round(seconds // (24 * 3600))
seconds %= (24 * 3600)
if days:
d = f"{days} T"
else:
d = ""
hours = round(seconds // 3600)
seconds %= 3600
if hours:
h = f"{hours} S"
else:
h = ""
minutes = math.ceil(seconds // 60)
seconds %= 60
if minutes:
m = f"{minutes} M"
else:
m = ""
return f"{d} {h} {m}"