python.fast-api-converter/app/main.py

247 lines
No EOL
7.3 KiB
Python
Executable file

import asyncio
import os.path
import re
import subprocess
import json
import logging
import threading
import time
from datetime import date
import uvicorn
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from starlette.websockets import WebSocketState
import app.video_class as vc
from app.ffmpeg_class import FfmpegCommands as fc
from app.config_class import Config as c
# Globale Variablen
config = c.Config()
ffmpeg: ffmpeg = fc.Ffmpeg(config)
queue: queue = queue.Queue()
semaphore = threading.Semaphore(config["convert"]["max_process"])
thread_output = None
video_files = {}
date = date.today()
# Prozess Objects
active_process = set()
active_tasks = set()
clients = set()
if not os.path.exists("./logs"):
os.mkdir("./logs")
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"./logs/{date}.log")
]
)
app = FastAPI()
app.mount("/webs", StaticFiles(directory="app/webs"), name="webs")
templates = Jinja2Templates(directory="app/templates")
def obj_list():
list_json = []
for obj_video in video_files.values():
list_json.append(obj_video.get_vars())
return json.dumps({"list_video":list_json})
# Media ----------------------------------------------------------------------------------------------------------------
def get_video_information(media_path):
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
file_paths = media_path.get("files", [])[0]
var_list_file = re.split(pattern, file_paths)
try:
for source_file in var_list_file:
source_file = source_file.strip()
video_streams, video_format = get_ffprobe("v", source_file)
audio_streams = get_ffprobe("a",source_file)
subtitle_streams = get_ffprobe("s", source_file)
obj = vc.Video(source_file, video_streams, video_format, audio_streams, subtitle_streams)
video_files.update({source_file: obj})
logging.info(obj)
return 1
except Exception as e:
logging.error(f"Get Video Information: {e}")
return 0
def get_ffprobe(select, source_file):
command = ffmpeg.video_info()
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
json_data = json.loads(result.stdout)
if select == "v":
return json_data.get("streams", []),json_data.get("format", [])
elif select == "a":
return json_data.get("streams", [])
elif select == "s":
return json_data.get("streams", [])
# Convert Process ------------------------------------------------------------------------------------------------------
def queue_video():
for key, obj in video_files.items():
if obj.finished == 0:
obj.task = threading.Thread(target=video_convert, args=(obj,))
obj.task.start()
active_tasks.add(obj)
logging.info(f"Warteschlange started Auftrag - {obj.task}")
def video_convert(obj):
global active_process
semaphore.acquire()
result: str = None
obj.convert_start = time.time()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
command = ffmpeg.video_convert()
logging.info(f"{command}")
loop.run_until_complete(queue.put(video_list()))
# Prozess
try:
obj.process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
active_process.add(obj)
obj.finished = 3
while obj.process.poll() is None:
logging.info(f"{obj.file_name} ... Running")
time.sleep(10)
if obj.process.poll() == 0:
obj.finished = 1
result = "Finished"
elif obj.process.poll() != 0:
obj.finished = 2
result = "Failure"
logging.info(f"Process {result}({obj.process.returncode}): {obj.file_name}")
loop.run_until_complete(queue.put(obj.to_dict()))
active_process.discard(obj)
active_tasks.discard(obj)
obj.convert_end = time.time()
semaphore.release()
except Exception as e:
obj.finished = 2
semaphore.release()
logging.error(f"Convert Process Failure: {e}")
#UviCorn WebServer Teil
#----------------------------------------------------------------------------------------------------------------------
def read_output():
"""Thread-Funktion, die Prozessausgaben in die Queue legt."""
global active_process
while True:
if not len(active_process):
time.sleep(5)
continue
for obj in list(active_process):
line_error = obj.process.stderr.read(1024).decode()
logging.info(f"Data ffmpeg: {line_error}")
obj.extract_convert_data(line_error)
# Verwende den Event-Loop, um die Daten in die asyncio Queue zu legen
logging.info(f"Data Packet created: {obj.to_dict()}")
queue.put(obj.to_dict())
def video_list():
vlist = []
for video in video_files.values():
vlist.append(video.get_vars())
return json.dumps({"video_list": vlist})
@app.post("/")
async def receive_video_file(data: dict):
if get_video_information(data):
queue_video()
else:
logging.error(f"Videos konnten nicht verarbeitet werden! Warteschleife wurde nicht gestarted")
@app.get("/progress", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files})
@app.get("/webs-ui", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files})
@app.websocket("/ws")
async def websocket_v(websocket: WebSocket):
"""WebSocket-Verbindung für die Kommunikation mit Clients."""
global thread_output, clients
clients.add(websocket)
await websocket.accept()
if thread_output is None:
# Startet den Thread zum Verarbeiten der Prozessausgaben
thread_output = threading.Thread(target=read_output, args=(queue,))
thread_output.start()
try:
var_first_sending = 0
while True:
if not var_first_sending:
queue.put(video_list())
var_first_sending = 1
if websocket not in clients:
break
message = queue.get() # Warten auf neue Nachricht aus der Queue
for client in clients:
await client.send_text(message)
except WebSocketDisconnect:
logging.info("WebSocket disconnected")
except Exception as e:
logging.error(f"WebSocket error: {e}")
finally:
clients.discard(websocket)
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.close()
@app.get("/clients")
async def get_clients_count():
return {"active_clients": len(clients), "active_processes": len(active_process), "active_tasks": len(active_tasks)}
if __name__ == "__main__":
uvicorn.run("app.main:app", host="127.0.0.1", port=8000, reload=False)