python.fast-api-converter/app/main.py

261 lines
8.6 KiB
Python
Executable file

import asyncio
import os.path
import re
import subprocess
import json
import logging
import threading
import time
from datetime import date
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
import app.video_class as vc
semaphore = threading.Semaphore(1)
date = date.today()
if not os.path.exists("./logs"):
os.mkdir("./logs")
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"./logs/{date}.log")
]
)
app = FastAPI()
app.mount("/webs", StaticFiles(directory="app/webs"), name="webs")
templates = Jinja2Templates(directory="app/templates")
queue = asyncio.Queue()
read_output_task = None
# Settings
language = ["ger", "eng"]
subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"]
max_tasks = 2
video_files = {}
active_process = set()
active_tasks = set()
connected_clients = set()
# Media ----------------------------------------------------------------------------------------------------------------
def get_video_information(media_path):
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
file_paths = media_path.get("files", [])[0]
var_list_file = re.split(pattern, file_paths)
try:
for source_file in var_list_file:
source_file = source_file.strip()
video_streams, video_format = get_ffprobe("v", source_file)
audio_streams = get_ffprobe("a",source_file)
subtitle_streams = get_ffprobe("s", source_file)
obj = vc.Video(source_file, video_streams, video_format, audio_streams, subtitle_streams)
video_files.update({source_file: obj})
logging.info(obj)
return 1
except Exception as e:
logging.error(f"Get Video Information: {e}")
return 0
def get_ffprobe(select, source_file):
command = [
"ffprobe", "-v",
"error",
"-select_streams",
f"{select}",
"-show_entries",
"stream=index,channels,codec_name,codec_type,pix_fmt,level,"
"film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration",
"-show_entries",
"format=size,bit_rate,nb_streams",
"-of", "json",
source_file
]
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
json_data = json.loads(result.stdout)
if select == "v":
return json_data.get("streams", []),json_data.get("format", [])
elif select == "a":
return json_data.get("streams", [])
elif select == "s":
return json_data.get("streams", [])
# Convert Process ------------------------------------------------------------------------------------------------------
def queue_video():
for key, obj in video_files.items():
with semaphore:
obj.task = threading.Thread(target=video_convert, args=(obj,))
obj.task.start()
active_tasks.add(obj)
logging.info(f"Warteschlange started Auftrag - {obj.task}")
def video_convert(obj):
global active_process
obj.convert_start = time.time()
# Erstelle und setze einen Event-Loop für diesen Thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
command = [
"ffmpeg", "-y", "-i", obj.source_file,
"-map", "0:0",
"-c:v", "libsvtav1",
"-preset", "5",
"-crf", "24",
"-g", "240",
"-pix_fmt", "yuv420p",
"-svtav1-params", "tune=0:film-grain=8",
]
if len(obj.streams_audio):
for audio_stream in obj.streams_audio:
if audio_stream.get("tags", {}).get("language", None) in language:
command.extend([
"-map", f"0:{audio_stream['index']}",
f"-c:a", "libopus",
f"-b:a", "320k",
f"-ac", str(audio_stream['channels'])
])
# Subtitle-Streams einbinden
if len(obj.streams_subtitle):
for subtitle_stream in obj.streams_subtitle:
if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist:
if subtitle_stream.get("tags", {}).get("language", None) in language:
command.extend([
"-map", f"0:{subtitle_stream['index']}",
])
command.append(obj.output_file)
logging.info(f"{command}")
# Prozess
try:
obj.process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
active_process.add(obj)
logging.info(f"{obj.file_name}")
print(obj.process.poll())
while obj.process.poll() is None:
logging.info(f"{obj.file_name} ... Running")
time.sleep(30)
print(obj.process.poll())
if obj.process.poll() == 0:
obj.finished = 1
logging.info(f"Process Finished({obj.process.returncode}): {obj.file_name}")
json_data = json.dumps(obj.to_dict())
loop.run_until_complete(queue.put(json_data))
elif obj.process.poll() != 0:
obj.finished = 2
logging.info(f"Process Failure({obj.process.returncode}): {obj.file_name}")
json_data = json.dumps(obj.to_dict())
loop.run_until_complete(queue.put(json_data))
active_process.discard(obj)
active_tasks.discard(obj)
obj.convert_end = time.time()
except Exception as e:
obj.finished = 2
logging.error(f"Convert Process Failure: {e}")
#test = {"files":["/mnt/Storage/11 - Downloads - JDownloader/01 - Fertig/Star Trek: Deep Space Nine - S01E03 - Die Khon-Ma.mkv /mnt/Storage/11 - Downloads - JDownloader/01 - Fertig/Star Trek: Deep Space Nine - S01E04 - Unter Verdacht.mkv"]}
#get_video_information(test)
#UviCorn WebServer Teil
#----------------------------------------------------------------------------------------------------------------------
def read_output(qu):
"""Thread-Funktion, die Prozessausgaben in die Queue legt."""
global active_process
# Erstelle und setze einen Event-Loop für diesen Thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
if not len(active_process):
loop.run_until_complete(asyncio.sleep(30)) # Keine aktiven Prozesse → kurze Pause
continue
for obj in list(active_process):
line_error = obj.process.stderr.read(1024)
if not line_error:
continue
line_error_decoded = line_error.decode()
logging.info(f"Datenpaket {obj.file_name}: {line_error_decoded}")
obj.extract_convert_data(line_error_decoded)
json_data = json.dumps(obj.to_dict())
# Verwende den Event-Loop, um die Daten in die asyncio Queue zu legen
loop.run_until_complete(qu.put(json_data))
@app.post("/")
async def receive_video_file(data: dict):
if get_video_information(data):
queue_video()
else:
logging.error(f"Videos konnten nicht verarbeitet werden! Warteschleife wurde nicht gestarted")
@app.get("/progress", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files})
@app.get("/webs-ui", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files})
@app.websocket("/ws")
async def websocket_v(websocket: WebSocket):
"""WebSocket-Verbindung für die Kommunikation mit Clients."""
global read_output_task, connected_clients
connected_clients.add(websocket)
await websocket.accept()
if read_output_task is None:
# Startet den Thread zum Verarbeiten der Prozessausgaben
read_output_task = threading.Thread(target=read_output, args=(queue,))
read_output_task.start()
try:
while True:
message = await queue.get() # Warten auf neue Nachricht aus der Queue
await websocket.send_text(message)
except WebSocketDisconnect:
logging.info("WebSocket disconnected")
except Exception as e:
logging.error(f"WebSocket error: {e}")
finally:
connected_clients.discard(websocket)
await websocket.close()
@app.get("/clients")
async def get_clients_count():
return {"active_clients": len(connected_clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)}