import logging import re import os import json import yaml import subprocess from app.class_media_file import Media class Path: def __init__(self, cfg): """ Receive Paths extract from String and save to a List Variable. Methoden: save_paths / read_paths / delete_path / receive_path :param cfg: Global Settings Object """ # Settings self.yaml = cfg # Autostart # self.read_paths() # Variablen self.paths:dict = {} def save_paths(self): """ Saves the extrated Paths in a File :return: True or False """ paths:list = [] for obj in self.paths.values(): if obj.status is None: paths.append(obj.source_file) paths_extrat_dict = {"paths": paths} try: with open(f"app/cfg/{self.yaml["path_file"]}", "w", encoding="utf8") as file: yaml.dump(paths_extrat_dict, file, default_flow_style=False, indent=4) logging.info(f"{len(self.paths)} paths were saved to file") return 1 except FileNotFoundError: logging.error(f"File {self.yaml["path_file"]} not found") return 0 except IOError: logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged") return 0 # Achtung Abrufen aus Datei und neu einlesen der ffprobe Daten fehlt noch def read_paths(self): """ Read Media Paths from a File :return: True or False """ try: with open(f"app/cfg/{self.yaml["path_file"]}", "r", encoding="utf8") as file: list_paths = yaml.safe_load(file) if len(list_paths) > 0: paths = list_paths logging.info(f"{len(paths)} paths were read from file") return 1 except FileNotFoundError: logging.error(f"File {self.yaml["path_file"]} not found") return 0 except IOError: logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged") return 0 def delete_path(self, obj_id:int): """ Delete a Media Path from a List Variable and overwrite the path file :param obj_id: Path from a Media File which to delete :return: True or False """ try: del self.paths[obj_id] self.save_paths() return 1 except ValueError: return 0 def receive_paths(self, var_paths:str): """ Splitting the Path String to List Single Media Paths :param var_paths: String of a single or more Media Paths :return: True or False """ pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)" paths = re.split(pattern, var_paths) for path in paths: self.get_with_ffprobe(path) print(paths) self.save_paths() return 1 @staticmethod def extract_media_info(path:str, select:str): """ Erstellt ffprobe command for selected Streams :param path: path to media file :param select: v (for video), a (for audio), s (for subtitle) :return: """ command = [ "ffprobe", "-v", "error", "-select_streams", f"{select}", "-show_entries", "stream=index,channels,codec_name,codec_type,pix_fmt,level," "film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration", "-show_entries", "format=size,bit_rate,nb_streams,duration", "-of", "json", path ] result = subprocess.run(command, stdout=subprocess.PIPE, text=True) json_data = json.loads(result.stdout) if select == "v": if json_data.get("format") is not list: list_format = [json_data.get("format", "")] else: list_format = json_data.get("format") return json_data.get("streams", []), list_format elif select == "a": return json_data.get("streams", []) elif select == "s": return json_data.get("streams", []) def get_with_ffprobe(self, path:str): try: path = path.strip() if os.path.exists(path): streams_video, streams_format = self.extract_media_info(path, "v") streams_audio = self.extract_media_info(path, "a") streams_subtitle = self.extract_media_info(path, "s") obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format) if not self.search_paths(path): self.paths.update({obj.id: obj}) logging.info(obj) else: logging.info(f"File exists in Waiting Snake :D: {path}") return 1 except Exception as e: logging.error(f"Get Video Information: {e}") return 0 def search_paths(self, path): for obj in self.paths.values(): if obj.source_file == path: return 1 return 0 def count_paths(self, status): count = 0 for obj in self.paths.values(): if obj.status == status: count += 1 return count def active_path_to_dict(self): list_active_paths = {} for obj in self.paths.values(): if obj.status == 3: list_active_paths.update({obj.id: obj.to_dict_active_paths()}) return {"data_convert": list_active_paths} def queue_path_to_dict(self): list_active_paths = {} for obj in self.paths.values(): if obj.status is None: list_active_paths.update({obj.id: obj.to_dict_active_paths()}) return {"data_queue": list_active_paths}