diff --git a/app/main.py b/app/main.py index 17395ea..796c276 100755 --- a/app/main.py +++ b/app/main.py @@ -1,82 +1,118 @@ import asyncio +import os.path import re import subprocess import json +import logging +import threading +import time +from datetime import date from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates -from contextlib import asynccontextmanager import app.video_class as vc -@asynccontextmanager -async def lifespan(_: FastAPI): - await queue_video() - yield +semaphore = threading.Semaphore(1) +date = date.today() -app = FastAPI(lifespan=lifespan) +if not os.path.exists("./logs"): + os.mkdir("./logs") + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(f"./logs/{date}.log") + ] +) + +app = FastAPI() app.mount("/webs", StaticFiles(directory="app/webs"), name="webs") templates = Jinja2Templates(directory="app/templates") queue = asyncio.Queue() read_output_task = None -convert_task = 0 # Settings language = ["ger", "eng"] subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"] +max_tasks = 2 -process_count = 1 video_files = {} +active_process = set() +active_tasks = set() +connected_clients = set() -progress = {} -async def queue_video(): - global convert_task - for key, obj in video_files.items(): - if process_count > convert_task: - if obj.finished == 0: - convert_task += 1 - asyncio.create_task(video_convert(obj)) +# Media ---------------------------------------------------------------------------------------------------------------- +def get_video_information(media_path): + pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" -async def get_ffprobe(select, obj): - global convert_task + file_paths = media_path.get("files", [])[0] + var_list_file = re.split(pattern, file_paths) + try: + for source_file in var_list_file: + source_file = source_file.strip() + + video_streams, video_format = get_ffprobe("v", source_file) + audio_streams = get_ffprobe("a",source_file) + subtitle_streams = get_ffprobe("s", source_file) + + obj = vc.Video(source_file, video_streams, video_format, audio_streams, subtitle_streams) + video_files.update({source_file: obj}) + + logging.info(obj) + return 1 + except Exception as e: + logging.error(f"Get Video Information: {e}") + return 0 + +def get_ffprobe(select, source_file): command = [ "ffprobe", "-v", "error", "-select_streams", - f"{select}", "-show_entries", - "stream=index,channels,codec_name,tags:stream_tags=language,tags:format=duration", + f"{select}", + "-show_entries", + "stream=index,channels,codec_name,codec_type,pix_fmt,level," + "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", + "-show_entries", + "format=size,bit_rate,nb_streams", "-of", "json", - obj.source_file + source_file ] - try: - result = subprocess.run(command, stdout=subprocess.PIPE, text=True) - json_data = json.loads(result.stdout) - print(json_data) + result = subprocess.run(command, stdout=subprocess.PIPE, text=True) + json_data = json.loads(result.stdout) - duration = json_data.get("format", {"duration": 999}).get("duration") - if duration: - if duration.replace(".","", 1).isdigit(): - obj.duration = round(float(duration)) - else: - obj.duration = 0 - return json_data + if select == "v": + return json_data.get("streams", []),json_data.get("format", []) + elif select == "a": + return json_data.get("streams", []) + elif select == "s": + return json_data.get("streams", []) - except Exception as e: - convert_task -= 1 - await queue_video() +# Convert Process ------------------------------------------------------------------------------------------------------ +def queue_video(): + for key, obj in video_files.items(): + with semaphore: + obj.task = threading.Thread(target=video_convert, args=(obj,)) + obj.task.start() + active_tasks.add(obj) + logging.info(f"Warteschlange started Auftrag - {obj.task}") -async def video_convert(obj): - global convert_task +def video_convert(obj): + global active_process - json_data_audio = await get_ffprobe("a", obj) - json_data_subtitles = await get_ffprobe("s", obj) + obj.convert_start = time.time() + # Erstelle und setze einen Event-Loop für diesen Thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) - # Konvertierung ---------------------------------------------------------------------------------------------------- command = [ "ffmpeg", "-y", "-i", obj.source_file, "-map", "0:0", @@ -88,9 +124,8 @@ async def video_convert(obj): "-svtav1-params", "tune=0:film-grain=8", ] - if "streams" in json_data_audio: - i = 0 - for audio_stream in json_data_audio["streams"]: + if len(obj.streams_audio): + for audio_stream in obj.streams_audio: if audio_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{audio_stream['index']}", @@ -98,81 +133,95 @@ async def video_convert(obj): f"-b:a", "320k", f"-ac", str(audio_stream['channels']) ]) - i += 1 # Subtitle-Streams einbinden - if "streams" in json_data_subtitles: - for subtitle_stream in json_data_subtitles["streams"]: + if len(obj.streams_subtitle): + for subtitle_stream in obj.streams_subtitle: if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist: if subtitle_stream.get("tags", {}).get("language", None) in language: command.extend([ "-map", f"0:{subtitle_stream['index']}", ]) - command.append(obj.output_file) - - """ - ffmpeg_cm = "" - for cm in command: - ffmpeg_cm += f"{cm} " - - print(ffmpeg_cm) - """ + logging.info(f"{command}") # Prozess try: - process_video = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + obj.process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) - obj.progress = process_video - if process_video.returncode == 0: + active_process.add(obj) + logging.info(f"{obj.file_name}") + + print(obj.process.poll()) + while obj.process.poll() is None: + logging.info(f"{obj.file_name} ... Running") + time.sleep(30) + print(obj.process.poll()) + + if obj.process.poll() == 0: obj.finished = 1 - convert_task -= 1 - await queue_video() + logging.info(f"Process Finished({obj.process.returncode}): {obj.file_name}") + json_data = json.dumps(obj.to_dict()) + loop.run_until_complete(queue.put(json_data)) + + elif obj.process.poll() != 0: + obj.finished = 2 + logging.info(f"Process Failure({obj.process.returncode}): {obj.file_name}") + json_data = json.dumps(obj.to_dict()) + loop.run_until_complete(queue.put(json_data)) + + active_process.discard(obj) + active_tasks.discard(obj) + obj.convert_end = time.time() except Exception as e: - convert_task -= 1 obj.finished = 2 - await queue_video() - obj.error.append(f"ffmpeg ---- {e}") + logging.error(f"Convert Process Failure: {e}") + + +#test = {"files":["/mnt/Storage/11 - Downloads - JDownloader/01 - Fertig/Star Trek: Deep Space Nine - S01E03 - Die Khon-Ma.mkv /mnt/Storage/11 - Downloads - JDownloader/01 - Fertig/Star Trek: Deep Space Nine - S01E04 - Unter Verdacht.mkv"]} +#get_video_information(test) + +#UviCorn WebServer Teil +#---------------------------------------------------------------------------------------------------------------------- +def read_output(qu): + """Thread-Funktion, die Prozessausgaben in die Queue legt.""" + global active_process + + # Erstelle und setze einen Event-Loop für diesen Thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) -async def read_output(): while True: - active_processes = [obj for obj in video_files.values() if obj.progress] - - if not active_processes: - await asyncio.sleep(10) # Kein aktives Video -> kurz warten + if not len(active_process): + loop.run_until_complete(asyncio.sleep(30)) # Keine aktiven Prozesse → kurze Pause continue - for obj in active_processes: - if obj.progress: - line = await obj.progress.stderr.read(1024) + for obj in list(active_process): + line_error = obj.process.stderr.read(1024) + if not line_error: + continue - line_decoded = line.decode().strip() - print(line_decoded) + line_error_decoded = line_error.decode() - obj.extract_convert_data(line_decoded) + logging.info(f"Datenpaket {obj.file_name}: {line_error_decoded}") + obj.extract_convert_data(line_error_decoded) - json_data = json.dumps(obj.to_dict()) - await queue.put(json_data) + json_data = json.dumps(obj.to_dict()) + + # Verwende den Event-Loop, um die Daten in die asyncio Queue zu legen + loop.run_until_complete(qu.put(json_data)) @app.post("/") async def receive_video_file(data: dict): - pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" - - file_paths = data.get("files", [])[0] - var_list_file = re.split(pattern, file_paths) - - for path in var_list_file: - obj_file = vc.Video(path.strip()) - video_files.update({path: obj_file}) - - await queue_video() - #Test - print(video_files) + if get_video_information(data): + queue_video() + else: + logging.error(f"Videos konnten nicht verarbeitet werden! Warteschleife wurde nicht gestarted") @app.get("/progress", response_class=HTMLResponse) async def display_paths(request: Request): @@ -184,22 +233,29 @@ async def display_paths(request: Request): @app.websocket("/ws") async def websocket_v(websocket: WebSocket): - global read_output_task + """WebSocket-Verbindung für die Kommunikation mit Clients.""" + global read_output_task, connected_clients + connected_clients.add(websocket) await websocket.accept() - if read_output_task is None or read_output_task.done(): - read_output_task = asyncio.create_task(read_output()) + if read_output_task is None: + # Startet den Thread zum Verarbeiten der Prozessausgaben + read_output_task = threading.Thread(target=read_output, args=(queue,)) + read_output_task.start() try: while True: - message = await queue.get() + message = await queue.get() # Warten auf neue Nachricht aus der Queue await websocket.send_text(message) except WebSocketDisconnect: - print("WebSocket disconnected") # Optional: Logging + logging.info("WebSocket disconnected") except Exception as e: - print(f"WebSocket error: {e}") # Fehlerbehandlung + logging.error(f"WebSocket error: {e}") finally: + connected_clients.discard(websocket) await websocket.close() - +@app.get("/clients") +async def get_clients_count(): + return {"active_clients": len(connected_clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)} diff --git a/app/video_class.py b/app/video_class.py index 9662b36..0fcfd1c 100644 --- a/app/video_class.py +++ b/app/video_class.py @@ -1,13 +1,34 @@ +import logging import os import re -from datetime import datetime +import math +from collections import deque class Video: - def __init__(self, path): + def __init__(self, path, video_streams, video_format, audio_streams, subtitle_streams): self.id = id(self) self.source_file = path + self.file_name = os.path.basename(self.source_file) + self.duration = self.time_in_sec(video_streams[0]["tags"].get("DURATION" or "duration", "00:00:00")) + self.frame_rate = int(video_streams[0].get("r_frame_rate", "0/0").split("/")[0]) + self.frames_max = self.frame_rate * self.duration + self.output_file = f"{path.rsplit(".", 1)[0]}.webm" + self.convert_start = 0 + self.convert_end = 0 + self.time_estimated = 0 + self.time_deque = deque(maxlen=20) + self.time_remaining = 0 + self.finished = 0 + + # Video / Audio Daten + self.streams_video = video_streams + self.streams_audio = audio_streams + self.streams_subtitle = subtitle_streams + self.format = [video_format] + + # Datenpaket self.frame = 0 self.fps = 0 self.q = 0 @@ -15,14 +36,55 @@ class Video: self.time = 0 self.bitrate = 0 self.speed = 0 - self.finished = 0 - self.progress = None - self.duration = 0 self.loading = 0 - self.error = [] + self.count_empty_data = 0 + + # Process + self.task = None + self.process = None + + def __str__(self): + def stream_output(stream_list): + count = 1 + string = "" + for video_stream in stream_list: + string += f"{video_stream.get("codec_type").capitalize()} {count}" if video_stream.get("codec_type") else "Format" + for key, value in video_stream.items(): + string += f" -- {key}: {value}" + + string += "\n" + count += 1 + + return string + + # Ausgabe + output_string = f"\n{self.source_file}\n" + output_string += "------------------------------------\n" + output_string += stream_output(self.format) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_video) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_audio) + output_string += "------------------------------------\n" + output_string += stream_output(self.streams_subtitle) + output_string += "------------------------------------\n" + output_string += f"{self.output_file}\n" + output_string += "------------------------------------\n" + output_string += f"{self.id} -- {self.finished} -- {self.task} -- {self.process}" + output_string += "\n************************************\n" + + return output_string def to_dict(self): + time_estimated = ((self.frames_max - self.frame) / self.frame_rate) + if self.fps > 0: + self.time_remaining = self.format_time(((self.frames_max - self.frame) / self.fps)) + elif self.fps == 0: + self.time_remaining = "..." + self.calc_loading() + self.time_deque.append(self.time) + self.time_estimated = self.duration - time_estimated return { "source": os.path.basename(self.source_file), @@ -38,15 +100,17 @@ class Video: "speed": self.speed, "finished": self.finished, "duration": self.duration, - "loading": self.loading + "loading": self.loading, + "convert_start": self.convert_start, + "time_remaining": self.time_remaining } def extract_convert_data(self, line_decoded): frame = re.findall(r"frame=\s*(\d+)", line_decoded) - self.frame = frame[0] if frame else 0 + self.frame = int(frame[0]) if frame else 0 - fps = re.findall(r"fps=\s*(\d+.\d+)", line_decoded) - self.fps = fps[0] if fps else 0 + fps = re.findall(r"fps=\s*(\d+)", line_decoded) + self.fps = int(fps[0]) if fps else 0 q = re.findall(r"q=\s*(\d+.\d+)", line_decoded) self.q = q[0] if q else 0 @@ -56,7 +120,7 @@ class Video: time = re.findall(r"time=\s*(\d+:\d+:\d+)", line_decoded) time_v = time[0] if time else "00:00:00" - self.time = self.time_in_sec(time_v) + self.time_in_sec(time_v) bitrate = re.findall(r"bitrate=\s*(\d+)", line_decoded) self.bitrate = self.convert_kb_mb(bitrate[0]) if bitrate else 0 @@ -67,22 +131,57 @@ class Video: @staticmethod def time_in_sec(time_str): hs_ms_s = re.findall(r"\s*(\d+):(\d+):(\d+)", time_str) - if len(hs_ms_s[0]) >= 3: - if hs_ms_s[0][0].isdigit() and hs_ms_s[0][1].isdigit() and hs_ms_s[0][2].isdigit(): - try: - return int(hs_ms_s[0][0]) * 3600 + int(hs_ms_s[0][1]) * 3600 + int(hs_ms_s[0][2]) - except ValueError: - return 0 + if len(hs_ms_s) > 0: + if len(hs_ms_s[0]) >= 3: + if hs_ms_s[0][0].isdigit() and hs_ms_s[0][1].isdigit() and hs_ms_s[0][2].isdigit(): + try: + time = int(hs_ms_s[0][0]) * 60 + int(hs_ms_s[0][1]) * 60 + int(hs_ms_s[0][2]) + print(time) + return time + except ValueError as e: + logging.error(f"Wert: {time_str} Fehler: {e}") + return 0 def calc_loading(self): if self.duration.is_integer(): - self.loading = round(self.time / self.duration * 100) - else: - self.loading = 0 + if all(x == self.time_deque[0] for x in self.time_deque): + loading = round(self.time_estimated / self.duration * 100) + if loading > self.loading: + self.loading = loading + else: + loading = round(self.time / self.duration * 100) + if loading > self.loading: + self.loading = loading @staticmethod def convert_kb_mb(digits): if digits.isdigit(): return round(int(digits) / 1024, 2) else: - return 0 \ No newline at end of file + return 0 + + @staticmethod + def format_time(seconds): + # Berechne die Anzahl der Tage, Stunden, Minuten und Sekunden + days = round(seconds // (24 * 3600)) # 1 Tag = 24 Stunden * 3600 Sekunden + seconds %= (24 * 3600) # Restliche Sekunden nach Tagen + if days: + d = f"{days} Tage" + else: + d = "" + + hours = round(seconds // 3600) # 1 Stunde = 3600 Sekunden + seconds %= 3600 # Restliche Sekunden nach Stunden + if hours: + h = f"{hours} Std" + else: + h = "" + + minutes = math.ceil(seconds // 60) # 1 Minute = 60 Sekunden + seconds %= 60 # Restliche Sekunden nach Minuten + if minutes: + m = f"{minutes} Min" + else: + m = "" + + return f"{d} {h} {m}" diff --git a/app/webs/timer-50.png b/app/webs/timer-50.png new file mode 100644 index 0000000..786674c Binary files /dev/null and b/app/webs/timer-50.png differ diff --git a/app/webs/webs.js b/app/webs/webs.js index 6545aaf..ae48cb0 100644 --- a/app/webs/webs.js +++ b/app/webs/webs.js @@ -21,6 +21,14 @@ ws.onerror = function(event) { console.error("WebSocket Fehler:", event); }; +function sekundenInStundenMinuten(sekunden) { + const stunden = Math.floor(sekunden / 3600); // 1 Stunde = 3600 Sekunden + const minuten = Math.floor((sekunden % 3600) / 60); // Restsekunden in Minuten umrechnen + const verbleibendeSekunden = sekunden % 60; // Übrige Sekunden + + return `${stunden} Stunden, ${minuten} Minuten, ${verbleibendeSekunden} Sekunden`; +} + function createVideoElement(id, source, target, path) { let container = document.createElement("div"); container.className = "video"; @@ -45,7 +53,7 @@ function createVideoElement(id, source, target, path) {

