import asyncio import websockets import json import logging import shlex from asyncio import CancelledError from websockets import InvalidUpgrade, ConnectionClosed, ConnectionClosedError from aiohttp import web from app.class_settings import Settings from app.class_file_path import Path from app.class_file_convert import Convert from app.class_media_file_stat import Stat var_convert_active = False class Server: def __init__(self): self.websocket = None self.clients = set() obj_settings = Settings() obj_settings.set_logging() self.yaml = obj_settings.yaml self.obj_path = Path(self.yaml) self.obj_convert = Convert(self, self.yaml, self.obj_path) async def start_convert(self): global var_convert_active try: if not var_convert_active and self.yaml['autostart']: await self.obj_convert.snake_waiting() var_convert_active = True else: self.obj_convert.snake_update() finally: self.set_var_convert_active(False) async def send_websocket(self, message): message_json = json.dumps(message) for client in self.clients.copy(): try: await client.send(message_json) except websockets.exceptions.ConnectionClosed: self.clients.remove(client) async def handle_client(self, websocket): self.websocket = websocket global var_convert_active if websocket not in self.clients: self.clients.add(websocket) await self.send_websocket(self.obj_path.active_path_to_dict()) await self.send_websocket(self.obj_path.queue_path_to_dict()) try: async for message in websocket: data = json.loads(message) if data.get("data_path"): self.obj_path.receive_paths(data.get("data_path")) await self.start_convert() await self.send_websocket(self.obj_path.active_path_to_dict()) await self.send_websocket(self.obj_path.queue_path_to_dict()) elif data.get("data_command"): if data["data_command"]["cmd"] == "delete": self.obj_path.delete_path(data["data_command"]["id"]) await self.send_websocket(self.obj_path.queue_path_to_dict()) elif data.get("data_message"): logging.info(f"Server hat Empfangen: {data.get('data_message')}") except (ConnectionClosedError, ConnectionClosed): logging.warning("Client Verbindung geschlossen") except InvalidUpgrade: logging.warning("Ungültiger Websocket Upgrade versuch") except Exception as e: logging.error(f"Unerwarteter Fehler {e}") finally: self.clients.discard(websocket) @staticmethod def set_var_convert_active(value: bool): global var_convert_active var_convert_active = value async def server_websocket(self): self.obj_path.read_paths() asyncio.create_task(self.start_convert()) server = await websockets.serve(self.handle_client, self.yaml['server_ip'], self.yaml['websocket_port']) logging.info(f"Websocket Server läuft auf IP: {self.yaml['server_ip']} Port: {self.yaml['websocket_port']}") await server.wait_closed() # WebServer -------------------------------------------------------------------------------------------------------- # noinspection PyUnusedLocal @staticmethod async def handle_index(request): return web.FileResponse("./client/index.html") # noinspection PyUnusedLocal async def handle_ip(self, request): url = self.yaml["extern_websocket_url"] port = self.yaml["websocket_port"] websocket_https = self.yaml["websocket_https"] return web.json_response({"extern_websocket_url": url, "websocket_port": port, "websocket_https": websocket_https}) # noinspection PyUnusedLocal @staticmethod async def handle_stat(request): obj_stat = Stat() return web.json_response(obj_stat.read_stat()) # noinspection PyUnusedLocal async def handle_cmd(self, request): command = self.obj_convert.active_tasks # deine oben gepostete Funktion html = "" for obj in command: # ffmpeg-Befehl als String (mit Shell-Escapes, falls Leerzeichen etc.) cmd_str = " ".join([shlex.quote(arg) for arg in obj.cmd]) html += f"{cmd_str}

" return web.Response(text=html, content_type="text/html") async def server_http(self): app = web.Application() app.router.add_get("/", self.handle_index) app.router.add_get("/api/ip", self.handle_ip) app.router.add_get("/api/stats", self.handle_stat) app.router.add_get("/api/cmd", self.handle_cmd) app.router.add_static("/client/", path="./client", name="client") runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", self.yaml["webserver_port"]) await site.start() logging.info(f"HTTP Server läuft auf Port {self.yaml['webserver_port']}") # Start Server ----------------------------------------------------------------------------------------------------- async def start_server(self): try: await asyncio.gather( self.server_websocket(), self.server_http() ) except CancelledError: logging.warning("Server wurde durch Keyboard Interrupt gestoppt.") except Exception as e: logging.error(f"Unerwarteter Fehler beim Starten des Servers {e}")