python.fast-api-converter/app/main.py

205 lines
5.9 KiB
Python
Executable file

import asyncio
import re
import subprocess
import json
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from contextlib import asynccontextmanager
import app.video_class as vc
@asynccontextmanager
async def lifespan(_: FastAPI):
await queue_video()
yield
app = FastAPI(lifespan=lifespan)
app.mount("/webs", StaticFiles(directory="app/webs"), name="webs")
templates = Jinja2Templates(directory="app/templates")
queue = asyncio.Queue()
read_output_task = None
convert_task = 0
# Settings
language = ["ger", "eng"]
subtitle_codec_blacklist = ["hdmv_pgs_subtitle", "dvd_subtitle"]
process_count = 1
video_files = {}
progress = {}
async def queue_video():
global convert_task
for key, obj in video_files.items():
if process_count > convert_task:
if obj.finished == 0:
convert_task += 1
asyncio.create_task(video_convert(obj))
async def get_ffprobe(select, obj):
global convert_task
command = [
"ffprobe", "-v",
"error",
"-select_streams",
f"{select}", "-show_entries",
"stream=index,channels,codec_name,tags:stream_tags=language,tags:format=duration",
"-of", "json",
obj.source_file
]
try:
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
json_data = json.loads(result.stdout)
print(json_data)
duration = json_data.get("format", {"duration": 999}).get("duration")
if duration:
if duration.replace(".","", 1).isdigit():
obj.duration = round(float(duration))
else:
obj.duration = 0
return json_data
except Exception as e:
convert_task -= 1
await queue_video()
async def video_convert(obj):
global convert_task
json_data_audio = await get_ffprobe("a", obj)
json_data_subtitles = await get_ffprobe("s", obj)
# Konvertierung ----------------------------------------------------------------------------------------------------
command = [
"ffmpeg", "-y", "-i", obj.source_file,
"-map", "0:0",
"-c:v", "libsvtav1",
"-preset", "5",
"-crf", "24",
"-g", "240",
"-pix_fmt", "yuv420p",
"-svtav1-params", "tune=0:film-grain=8",
]
if "streams" in json_data_audio:
i = 0
for audio_stream in json_data_audio["streams"]:
if audio_stream.get("tags", {}).get("language", None) in language:
command.extend([
"-map", f"0:{audio_stream['index']}",
f"-c:a", "libopus",
f"-b:a", "320k",
f"-ac", str(audio_stream['channels'])
])
i += 1
# Subtitle-Streams einbinden
if "streams" in json_data_subtitles:
for subtitle_stream in json_data_subtitles["streams"]:
if subtitle_stream.get("codec_name") not in subtitle_codec_blacklist:
if subtitle_stream.get("tags", {}).get("language", None) in language:
command.extend([
"-map", f"0:{subtitle_stream['index']}",
])
command.append(obj.output_file)
"""
ffmpeg_cm = ""
for cm in command:
ffmpeg_cm += f"{cm} "
print(ffmpeg_cm)
"""
# Prozess
try:
process_video = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
obj.progress = process_video
if process_video.returncode == 0:
obj.finished = 1
convert_task -= 1
await queue_video()
except Exception as e:
convert_task -= 1
obj.finished = 2
await queue_video()
obj.error.append(f"ffmpeg ---- {e}")
async def read_output():
while True:
active_processes = [obj for obj in video_files.values() if obj.progress]
if not active_processes:
await asyncio.sleep(10) # Kein aktives Video -> kurz warten
continue
for obj in active_processes:
if obj.progress:
line = await obj.progress.stderr.read(1024)
line_decoded = line.decode().strip()
print(line_decoded)
obj.extract_convert_data(line_decoded)
json_data = json.dumps(obj.to_dict())
await queue.put(json_data)
@app.post("/")
async def receive_video_file(data: dict):
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
file_paths = data.get("files", [])[0]
var_list_file = re.split(pattern, file_paths)
for path in var_list_file:
obj_file = vc.Video(path.strip())
video_files.update({path: obj_file})
await queue_video()
#Test
print(video_files)
@app.get("/progress", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("progress.html", {"request": request, "videos": video_files})
@app.get("/webs-ui", response_class=HTMLResponse)
async def display_paths(request: Request):
return templates.TemplateResponse("webs-ui.html", {"request": request, "videos": video_files})
@app.websocket("/ws")
async def websocket_v(websocket: WebSocket):
global read_output_task
await websocket.accept()
if read_output_task is None or read_output_task.done():
read_output_task = asyncio.create_task(read_output())
try:
while True:
message = await queue.get()
await websocket.send_text(message)
except WebSocketDisconnect:
print("WebSocket disconnected") # Optional: Logging
except Exception as e:
print(f"WebSocket error: {e}") # Fehlerbehandlung
finally:
await websocket.close()