Video Konvertierung integriert und Prozess Auslesung komplett geändert. Datengewinnung und Verarbeitung verfeinert

This commit is contained in:
Eduard Wisch 2025-03-24 21:33:09 +01:00
parent deb7a85474
commit a70bd2c796
7 changed files with 522 additions and 10 deletions

View file

@ -1,4 +1,12 @@
import asyncio
import logging
from app.main_server import Server
if __name__ == "__main__":
obj_server = Server()
try:
asyncio.run(obj_server.start())
except Exception as e:
logging.critical(f"Global error: {e}")

View file

@ -1,5 +1,18 @@
log_file: "server.log"
log_level: "DEBUG"
log_level: DEBUG
path_file: "media_path.yaml"
server_ip: "0.0.0.0"
server_port: 8000
task_max: 1
autostart: true
subtitle:
language:
- ger
- eng
blacklist:
- hdmv_pgs_subtitle
- dvd_subtitle
audio:
language:
- ger
- eng

110
app/class_file_convert.py Normal file
View file

@ -0,0 +1,110 @@
import logging
import asyncio
import time
from app.class_file_convert_read_out import Process
class Convert:
def __init__(self, websocket, cfg, obj_path):
self.yaml = cfg
self.obj_path = obj_path
self.obj_websocket = websocket
self.obj_process = Process(websocket)
self.semaphore = asyncio.Semaphore(self.yaml["task_max"])
self.active_tasks = set()
self.active_process = set()
logging.info("Video Convert Start here")
async def snake_waiting(self):
async with self.semaphore:
for obj_id, obj in list(self.obj_path.paths.items()):
if obj.status is None:
obj.task = asyncio.create_task(self.convert_video(obj))
self.active_tasks.add(obj)
logging.info(f"Warteschlange started Auftrag - {obj.task}")
print(self.obj_path.paths)
async def convert_video(self, obj):
"""Startet die Videokonvertierung asynchron."""
obj.convert_start = time.time()
command = self.convert_cmd(obj)
logging.info(f"Starte Konvertierung: {command}")
try:
# Starte den Subprozess asynchron
obj.process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.active_process.add(obj)
obj.process_start = time.time()
await self.obj_process.read_out(obj)
await obj.process.wait()
# Prozess beendet, Status auswerten
if obj.process.returncode == 0:
obj.status = 0
result = "Finished"
else:
obj.status = 1
result = "Failure"
# Statistik
#obj_stat = Sc()
#obj_stat.save_stat(obj.get_vars())
except Exception as e:
obj.status = 2
logging.error(f"Fehler in video_convert(): {e}")
finally:
logging.info(f"Prozess {result}({obj.process.returncode}): {obj.source_file_name}")
await self.obj_websocket.send_websocket(obj.to_dict())
self.active_process.discard(obj)
self.active_tasks.discard(obj)
obj.convert_end = time.time()
def convert_cmd(self, obj):
command_convert = [
"ffmpeg", "-y", "-i", obj.source_file,
"-map", "0:0",
"-c:v", "libsvtav1",
"-preset", "5",
"-crf", "30",
"-g", "240",
"-pix_fmt", "yuv420p10le",
"-svtav1-params", "tune=0:film-grain=8",
]
if len(obj.streams_audio):
for audio_stream in obj.streams_audio:
if audio_stream.get("tags", {}).get("language", None) in self.yaml["audio"]["language"]:
command_convert.extend([
"-map", f"0:{audio_stream['index']}",
f"-c:a", "libopus",
f"-b:a", "320k",
f"-ac", str(audio_stream['channels'])
])
# Subtitle-Streams einbinden
if len(obj.streams_subtitle):
for subtitle_stream in obj.streams_subtitle:
if subtitle_stream.get("codec_name") not in self.yaml["subtitle"]["blacklist"]:
if subtitle_stream.get("tags", {}).get("language", None) in self.yaml["subtitle"]["language"]:
command_convert.extend([
"-map", f"0:{subtitle_stream['index']}",
])
command_convert.append(obj.target_file)
return command_convert
def snake_update(self):
print(self.obj_path.paths)

View file

@ -0,0 +1,129 @@
import re
class Process:
def __init__(self, obj_websocket):
self.obj_websocket = obj_websocket
self.line_empty = 0
# Data
self.id = None
self.fps: float = 0.0
self.speed: float = 0.0
self.quantizer: int = 0
self.bitrate: list = [0, "kbits/s"]
self.size: list = [0, "KiB"]
self.time: int = 0
self.frames: int = 0
async def read_out(self, obj):
self.id = obj.id
while True:
line = await obj.process.stderr.read(1024)
line_decoded = line.decode()
self.process_line_extract(obj, line_decoded)
await self.obj_websocket.send_websocket(obj.to_dict())
if self.line_empty > 20:
break
elif not line:
self.line_empty += 1
elif line:
self.line_empty = 0
def process_line_extract(self, obj, line:str):
# FPS
fps = re.findall(r"fps=\s*(\d+.\d*)", line)
self.fps = float(fps[0]) if fps else 0
obj.stat_fps = [obj.stats_fps[0] + self.fps, obj.stats_fps[1] + 1]
# Quantizer
q = re.findall(r"q=\s*(\d+).\d+", line)
self.quantizer = int(q[0]) if q else 0
obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1]
# Bitrate
bitrate = re.findall(r"bitrate=\s*(\d+)", line)
self.bitrate[0] = int(bitrate[0]) if bitrate else 0
obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate, obj.stat_bitrate[1] + 1]
# Speed
speed = re.findall(r"speed=\s*(\d+\.\d+)", line)
self.speed = float(speed[0]) if speed else 0
obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1]
# File Size
size = re.findall(r"size=\s*(\d+)", line)
if size and int(size[0]) > self.size[0]:
self.size[0] = int(size[0])
obj.process_size = self.size
# Time
media_time = re.findall(r"time=\s*(\d+:\d+:\d+)", line)
time_v = media_time[0] if media_time else "00:00:00"
if self.time < self.time_in_sec(time_v):
self.time = self.time_in_sec(time_v)
obj.process_time = self.time
# Frames
frame = re.findall(r"frame=\s*(\d+)", line)
if frame and int(frame[0]) > self.frames:
self.frames = int(frame[0])
obj.process_frames = self.frames
def to_dict(self):
return {"data_flow": {self.id: {
"frames": self.frames,
"fps": self.fps,
"quantizer": self.quantizer,
"size": self.size,
"time": self.time,
"bitrate": self.bitrate,
"speed": self.speed
}}}
# Data convert
@staticmethod
def time_in_sec(time_str: str) -> int:
h, m, s = map(int, time_str.split(":"))
return h * 3600 + m * 60 + s
@staticmethod
def size_convert(source: str, target, unit: str, size=0):
list_unit: list = []
if unit == "storage":
list_unit = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
elif unit == "data_rate":
list_unit = ["bps", "Kbps", "Mbps", "Gbps", "Tbps", "Pbps"]
elif unit == "binary_data_rate":
list_unit = ["b/s", "Kib/s", "Mib/s", "Gib/s", "Tib/s", "Pib/s"]
factor = 1024 # Binäre Umrechnung
if source not in list_unit:
raise ValueError("Ungültige Quell-Einheit!")
source_index = list_unit.index(source)
if target:
if target not in list_unit:
raise ValueError("Ungültige Ziel-Einheit!")
target_index = list_unit.index(target)
if source_index < target_index:
return size / (factor ** (target_index - source_index)), target
else:
return size * (factor ** (source_index - target_index)), target
# Automatische Umrechnung
while size >= 1000 and source_index < len(list_unit) - 1:
size /= factor
source_index += 1
return [round(size,1), list_unit[source_index]]

155
app/class_file_path.py Normal file
View file

@ -0,0 +1,155 @@
import logging
import re
import os
import json
import yaml
import subprocess
from app.class_media_file import Media
class Path:
def __init__(self, cfg):
"""
Receive Paths extract from String and save to a List Variable.
Methoden: save_paths / read_paths / delete_path / receive_path
:param cfg: Global Settings Object
"""
# Settings
self.yaml = cfg
# Autostart
# self.read_paths()
# Variablen
self.paths:dict = {}
def save_paths(self):
"""
Saves the extrated Paths in a File
:return: True or False
"""
paths_extrat_dict:dict = {}
for obj_id, obj in self.paths.items():
paths_extrat_dict.update({id: obj.__dict__})
try:
with open(f"app/cfg/{self.yaml["path_file"]}", "w", encoding="utf8") as file:
yaml.dump(paths_extrat_dict, file, default_flow_style=False, indent=4)
logging.info(f"{len(self.paths)} paths were saved to file")
return 1
except FileNotFoundError:
logging.error(f"File {self.yaml["path_file"]} not found")
return 0
except IOError:
logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged")
return 0
# Achtung Abrufen aus Datei und neu einlesen der ffprobe Daten fehlt noch
def read_paths(self):
"""
Read Media Paths from a File
:return: True or False
"""
try:
with open(f"app/cfg/{self.yaml["path_file"]}", "r", encoding="utf8") as file:
list_paths = yaml.safe_load(file)
if len(list_paths) > 0:
paths = list_paths
logging.info(f"{len(paths)} paths were read from file")
return 1
except FileNotFoundError:
logging.error(f"File {self.yaml["path_file"]} not found")
return 0
except IOError:
logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged")
return 0
def delete_path(self, obj_id:int):
"""
Delete a Media Path from a List Variable and overwrite the path file
:param obj_id: Path from a Media File which to delete
:return: True or False
"""
try:
del self.paths[obj_id]
self.save_paths()
return 1
except ValueError:
return 0
def receive_paths(self, var_paths:str):
"""
Splitting the Path String to List Single Media Paths
:param var_paths: String of a single or more Media Paths
:return: True or False
"""
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
paths = re.split(pattern, var_paths)
print(paths)
for path in paths:
self.get_with_ffprobe(path)
return 1
@staticmethod
def extract_media_info(path:str, select:str):
"""
Erstellt ffprobe command for selected Streams
:param path: path to media file
:param select: v (for video), a (for audio), s (for subtitle)
:return:
"""
command = [
"ffprobe", "-v",
"error",
"-select_streams",
f"{select}",
"-show_entries",
"stream=index,channels,codec_name,codec_type,pix_fmt,level,"
"film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration",
"-show_entries",
"format=size,bit_rate,nb_streams",
"-of", "json",
path
]
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
json_data = json.loads(result.stdout)
if select == "v":
if json_data.get("format") is not list:
list_format = [json_data.get("format", "")]
else:
list_format = json_data.get("format")
return json_data.get("streams", []), list_format
elif select == "a":
return json_data.get("streams", [])
elif select == "s":
return json_data.get("streams", [])
def get_with_ffprobe(self, path:str):
try:
path = path.strip()
if os.path.exists(path):
streams_video, streams_format = self.extract_media_info(path, "v")
streams_audio = self.extract_media_info(path, "a")
streams_subtitle = self.extract_media_info(path, "s")
obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format)
self.paths.update({obj.id: obj})
logging.info(obj)
return 1
except Exception as e:
logging.error(f"Get Video Information: {e}")
return 0

80
app/class_media_file.py Normal file
View file

@ -0,0 +1,80 @@
import os
class Media:
_id_counter = 0
def __init__(self, path, streams_video, streams_audio, streams_subtitle, streams_format):
# misc
self.id = Media._id_counter
# source
self.source_file: str = path
self.source_file_name: str = os.path.basename(self.source_file)
self.source_size: int = 0
self.source_frames: int = 0
self.source_time: int = 0
# target
self.target_file: str = f"{path.rsplit(".", 1)[0]}.webm"
self.target_size: int = 0
# process
self.status = None
self.process_start: int = 0
self.process_end: int = 0
self.process_time: int = 0
self.process_size: int = 0
self.process_frames: int = 0
# statistic
self.stat_fps: list = [0, 0]
self.stat_bitrate: list = [0, 0]
self.stat_quantizer: list = [0, 0]
self.stat_speed: list = [0, 0]
# raw
self.streams_video = streams_video
self.streams_audio = streams_audio
self.streams_subtitle = streams_subtitle
self.streams_format = streams_format
# --------------------------------------------------------------------------------------------------------------
Media._id_counter += 1
def __str__(self):
def stream_output(stream_list):
count = 1
string = ""
for video_stream in stream_list:
string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get(
"codec_type") else "Format"
for key, value in video_stream.items():
string += f" -- {key}: {value}"
string += "\n"
count += 1
return string
# Ausgabe
output_string = f"\n{self.source_file}\n"
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_format)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_video)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_audio)
output_string += "------------------------------------\n"
output_string += stream_output(self.streams_subtitle)
output_string += "------------------------------------\n"
output_string += f"{self.target_file}\n"
output_string += "------------------------------------\n"
output_string += f"{self.id} -- {self.status}"
output_string += "\n************************************\n"
return output_string
@staticmethod
def to_dict():
return "Fertig mit der Welt"

View file

@ -1,23 +1,33 @@
import websockets
import asyncio
import json
import logging
from app.class_settings import Settings
from app.class_file_path import Path
from app.class_file_convert import Convert
var_convert_active = 0
class Server:
def __init__(self):
self.websocket = None
obj_settings = Settings()
obj_settings.set_logging()
self.yaml = obj_settings.yaml
self.obj_path = Path(self.yaml)
self.obj_convert = Convert(self, self.yaml, self.obj_path)
async def send_websocket(self, message):
try:
asyncio.run(self.start())
except KeyboardInterrupt:
print("Server sagt: Server wurde beendet.")
await self.websocket.send(message)
except websockets.exceptions.ConnectionClosed:
logging.warning("No websocket client connected!")
async def handle_client(self, websocket):
self.websocket = websocket
global var_convert_active
@staticmethod
async def handle_client(websocket):
print("Server sagt: Client verbunden")
try:
async for message in websocket:
@ -26,7 +36,14 @@ class Server:
data = json.loads(message)
if data.get("data_path"):
pass
self.obj_path.receive_paths(data.get("data_path"))
if var_convert_active == 0 and self.yaml['autostart']:
await self.obj_convert.snake_waiting()
var_convert_active = 1
else:
self.obj_convert.snake_update()
elif data.get("data_command"):
pass