Convert, Read Out und die Warteschlange überarbeitet.

This commit is contained in:
Eduard Wisch 2025-03-31 20:12:06 +02:00
parent a70bd2c796
commit 456852399e
6 changed files with 163 additions and 36 deletions

View file

@ -8,5 +8,7 @@ if __name__ == "__main__":
try:
asyncio.run(obj_server.start())
except asyncio.exceptions.CancelledError:
pass
except Exception as e:
logging.critical(f"Global error: {e}")

View file

@ -10,20 +10,31 @@ class Convert:
self.obj_websocket = websocket
self.obj_process = Process(websocket)
self.semaphore = asyncio.Semaphore(self.yaml["task_max"])
self.active_tasks = set()
self.active_process = set()
logging.info("Video Convert Start here")
async def snake_waiting(self):
async with self.semaphore:
while True:
if len(self.active_tasks) < self.yaml["task_max"] and self.obj_path.count_paths(None) > 0:
for obj_id, obj in list(self.obj_path.paths.items()):
if obj.status is None:
obj.task = asyncio.create_task(self.convert_video(obj))
self.active_tasks.add(obj)
logging.info(f"Warteschlange started Auftrag - {obj.task}")
if len(self.active_tasks) >= self.yaml["task_max"]:
break
if len(self.active_tasks) >= 0:
logging.info(f"{len(self.active_tasks)} is active.")
await asyncio.sleep(600)
continue
if self.obj_path.count_paths(None) == 0 and len(self.active_tasks) == 0:
break
print(self.obj_path.paths)
async def convert_video(self, obj):
@ -31,6 +42,7 @@ class Convert:
obj.convert_start = time.time()
command = self.convert_cmd(obj)
result = None
logging.info(f"Starte Konvertierung: {command}")
@ -62,7 +74,7 @@ class Convert:
except Exception as e:
obj.status = 2
logging.error(f"Fehler in video_convert(): {e}")
logging.error(f"Fehler in video_convert(): {e}", exc_info=True)
finally:
logging.info(f"Prozess {result}({obj.process.returncode}): {obj.source_file_name}")
await self.obj_websocket.send_websocket(obj.to_dict())

View file

@ -1,4 +1,6 @@
import logging
import re
import asyncio
class Process:
def __init__(self, obj_websocket):
@ -20,6 +22,7 @@ class Process:
async def read_out(self, obj):
self.id = obj.id
i = 100
while True:
line = await obj.process.stderr.read(1024)
@ -27,35 +30,45 @@ class Process:
self.process_line_extract(obj, line_decoded)
await self.obj_websocket.send_websocket(obj.to_dict())
# in json umwandeln
#await self.obj_websocket.send_websocket(self.to_dict())
if self.line_empty > 20:
if self.line_empty > 30:
break
elif not line:
self.line_empty += 1
await asyncio.sleep(2)
continue
elif line:
self.line_empty = 0
if i == 100 or i == 200 or i == 300 or i == 400 or i == 500:
logging.info(self.to_dict())
self.save_stat_value(obj)
elif i == 101 or i == 501:
i = 0
time = obj.format_time(obj.time_remaining())
if time != " ":
logging.info(f"Time remaining: {time}")
i += 1
def process_line_extract(self, obj, line:str):
# FPS
fps = re.findall(r"fps=\s*(\d+.\d*)", line)
self.fps = float(fps[0]) if fps else 0
obj.stat_fps = [obj.stats_fps[0] + self.fps, obj.stats_fps[1] + 1]
self.fps = float(fps[0]) if fps else 0.0
# Quantizer
q = re.findall(r"q=\s*(\d+).\d+", line)
self.quantizer = int(q[0]) if q else 0
obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1]
# Bitrate
bitrate = re.findall(r"bitrate=\s*(\d+)", line)
self.bitrate[0] = int(bitrate[0]) if bitrate else 0
obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate, obj.stat_bitrate[1] + 1]
# Speed
speed = re.findall(r"speed=\s*(\d+\.\d+)", line)
self.speed = float(speed[0]) if speed else 0
obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1]
self.speed = float(speed[0]) if speed else 0.0
# File Size
size = re.findall(r"size=\s*(\d+)", line)
@ -66,8 +79,8 @@ class Process:
# Time
media_time = re.findall(r"time=\s*(\d+:\d+:\d+)", line)
time_v = media_time[0] if media_time else "00:00:00"
if self.time < self.time_in_sec(time_v):
self.time = self.time_in_sec(time_v)
if self.time < obj.time_in_sec(time_v):
self.time = obj.time_in_sec(time_v)
obj.process_time = self.time
# Frames
@ -76,6 +89,19 @@ class Process:
self.frames = int(frame[0])
obj.process_frames = self.frames
def save_stat_value(self, obj):
if self.fps:
obj.stat_fps = [obj.stat_fps[0] + self.fps, obj.stat_fps[1] + 1]
if self.quantizer:
obj.stat_quantizer = [obj.stat_quantizer[0] + self.quantizer, obj.stat_quantizer[1] + 1]
if self.bitrate[0]:
obj.stat_bitrate = [obj.stat_bitrate[0] + self.bitrate[0], obj.stat_bitrate[1] + 1]
if self.speed:
obj.stat_speed = [obj.stat_speed[0] + self.speed, obj.stat_speed[1] + 1]
def to_dict(self):
return {"data_flow": {self.id: {
"frames": self.frames,
@ -87,12 +113,6 @@ class Process:
"speed": self.speed
}}}
# Data convert
@staticmethod
def time_in_sec(time_str: str) -> int:
h, m, s = map(int, time_str.split(":"))
return h * 3600 + m * 60 + s
@staticmethod
def size_convert(source: str, target, unit: str, size=0):
list_unit: list = []

View file

@ -145,11 +145,31 @@ class Path:
streams_subtitle = self.extract_media_info(path, "s")
obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format)
if not self.search_paths(path):
self.paths.update({obj.id: obj})
logging.info(obj)
else:
logging.info(f"File exists in Waiting Snake :D: {path}")
return 1
except Exception as e:
logging.error(f"Get Video Information: {e}")
logging.error(f"Get Video Information: {e}", exc_info=True)
return 0
def search_paths(self, path):
for obj in self.paths.values():
if obj.source_file == path:
return 1
return 0
def count_paths(self, status):
count = 0
for obj in self.paths.values():
if obj.status == status:
count += 1
return count

View file

@ -1,5 +1,5 @@
import os
import math
class Media:
_id_counter = 0
@ -10,8 +10,10 @@ class Media:
# source
self.source_file: str = path
self.source_file_name: str = os.path.basename(self.source_file)
self.source_duration: int = self.time_in_sec(streams_video[0]["tags"].get("DURATION" or "duration", "00:00:00"))
self.source_size: int = 0
self.source_frames: int = 0
self.source_frame_rate: int = self.frame_rate(streams_video)
self.source_frames_total: int = self.source_frame_rate * self.source_duration
self.source_time: int = 0
# target
@ -25,6 +27,7 @@ class Media:
self.process_time: int = 0
self.process_size: int = 0
self.process_frames: int = 0
self.process_time_remaining: int = 0
# statistic
self.stat_fps: list = [0, 0]
@ -78,3 +81,62 @@ class Media:
@staticmethod
def to_dict():
return "Fertig mit der Welt"
@staticmethod
def frame_rate(video_streams):
var_frame_rate = video_streams[0].get("r_frame_rate", "0/0").split("/")
if int(var_frame_rate[1]) > 0:
int_frame_rate = round(int(var_frame_rate[0]) / int(var_frame_rate[1]))
else:
int_frame_rate = 0
return int_frame_rate
def time_remaining(self):
if self.stat_fps[0] > 0:
var_time_remaining = (self.source_frames_total - self.process_frames) / (self.stat_fps[0] / self.stat_fps[1])
self.process_time_remaining = round(var_time_remaining)
elif self.stat_fps[0] == 0:
self.process_time_remaining = 0
return self.process_time_remaining
@staticmethod
def format_time(seconds):
days = round(seconds // (24 * 3600))
seconds %= (24 * 3600)
if days:
d = (f"{days} Tag"
f"")
else:
d = ""
hours = round(seconds // 3600)
seconds %= 3600
if hours:
h = f"{hours} Stunden"
else:
h = ""
minutes = math.ceil(seconds // 60)
seconds %= 60
if minutes:
m = f"{minutes} Minuten"
else:
m = ""
return f"{d} {h} {m}"
# Data convert
@staticmethod
def time_in_sec(time_str: str) -> int:
parts = time_str.split(":")
if len(parts) == 1: # Falls nur Sekunden mit Nachkommastellen vorliegen
return int(float(parts[0])) # Erst in float, dann in int umwandeln
if len(parts) == 3: # Normales HH:MM:SS-Format
h, m, s = map(float, parts) # In float umwandeln, falls Nachkommastellen im Sekundenwert sind
return int(h * 3600 + m * 60 + s) # Alles in Sekunden umrechnen
raise ValueError(f"Ungültiges Zeitformat: {time_str}")

View file

@ -1,3 +1,5 @@
import asyncio
import websockets
import json
import logging
@ -6,7 +8,7 @@ from app.class_settings import Settings
from app.class_file_path import Path
from app.class_file_convert import Convert
var_convert_active = 0
var_convert_active = False
class Server:
def __init__(self):
@ -38,9 +40,9 @@ class Server:
if data.get("data_path"):
self.obj_path.receive_paths(data.get("data_path"))
if var_convert_active == 0 and self.yaml['autostart']:
if var_convert_active == False and self.yaml['autostart']:
await self.obj_convert.snake_waiting()
var_convert_active = 1
var_convert_active = True
else:
self.obj_convert.snake_update()
@ -49,10 +51,19 @@ class Server:
#response = f"Server antwortet: {message.upper()}"
#await websocket.send(response)
except websockets.exceptions.ConnectionClosedError:
pass
except websockets.exceptions.InvalidUpgrade:
pass
except websockets.exceptions.ConnectionClosed:
print("Server sagt: Client getrennt")
@staticmethod
def set_var_convert_active(value: bool):
global var_convert_active
var_convert_active = value
async def start(self):
server = await websockets.serve(self.handle_client, self.yaml['server_ip'], self.yaml['server_port'])
print(f"Websocket Server läuft auf IP: {self.yaml['server_ip']} Port: {self.yaml['server_port']}")