208 lines
6.2 KiB
Python
Executable file
208 lines
6.2 KiB
Python
Executable file
import logging
|
|
import re
|
|
import os
|
|
import json
|
|
import yaml
|
|
import subprocess
|
|
from app.class_media_file import Media
|
|
|
|
|
|
class Path:
|
|
def __init__(self, cfg):
|
|
"""
|
|
Receive Paths extract from String and save to a List Variable.
|
|
Methoden: save_paths / read_paths / delete_path / receive_path
|
|
|
|
:param cfg: Global Settings Object
|
|
"""
|
|
|
|
# Settings
|
|
self.yaml = cfg
|
|
|
|
# Autostart
|
|
# self.read_paths()
|
|
|
|
# Variablen
|
|
self.paths:dict = {}
|
|
|
|
def save_paths(self):
|
|
"""
|
|
Saves the extrated Paths in a File
|
|
:return: True or False
|
|
"""
|
|
paths:list = []
|
|
|
|
for obj in self.paths.values():
|
|
if obj.status is None:
|
|
paths.append(obj.source_file)
|
|
|
|
paths_extrat_dict = {"paths": paths}
|
|
|
|
try:
|
|
with open(f"app/cfg/{self.yaml['path_file']}", "w", encoding="utf8") as file:
|
|
yaml.dump(paths_extrat_dict, file, default_flow_style=False, indent=4)
|
|
logging.info(f"{len(self.paths)} paths were saved to file")
|
|
return 1
|
|
except FileNotFoundError:
|
|
logging.error(f"File {self.yaml['path_file']} not found")
|
|
return 0
|
|
except IOError:
|
|
logging.critical(f"Error file {self.yaml['path_file']} maybe damaged")
|
|
return 0
|
|
|
|
# Achtung Abrufen aus Datei und neu einlesen der ffprobe Daten fehlt noch
|
|
def read_paths(self):
|
|
"""
|
|
Read Media Paths from a File
|
|
:return: True or False
|
|
"""
|
|
try:
|
|
with open(f"app/cfg/{self.yaml['path_file']}", "r", encoding="utf8") as file:
|
|
list_paths = yaml.safe_load(file)
|
|
count = len(list_paths.get("paths", []))
|
|
|
|
if count > 0:
|
|
self.receive_paths(list_paths)
|
|
|
|
logging.info(f"{count} paths were read from file")
|
|
return 1
|
|
except FileNotFoundError:
|
|
logging.error(f"File {self.yaml['path_file']} not found")
|
|
return 0
|
|
except IOError:
|
|
logging.critical(f"Error file {self.yaml['path_file']} maybe damaged")
|
|
return 0
|
|
|
|
def delete_path(self, obj_id:int):
|
|
"""
|
|
Delete a Media Path from a List Variable and overwrite the path file
|
|
:param obj_id: Path from a Media File which to delete
|
|
:return: True or False
|
|
"""
|
|
|
|
try:
|
|
del self.paths[obj_id]
|
|
self.save_paths()
|
|
return 1
|
|
except ValueError:
|
|
return 0
|
|
|
|
def receive_paths(self, var_paths):
|
|
"""
|
|
Splitting the Path String to List Single Media Paths
|
|
:param var_paths: String or List of a single or more Media Paths
|
|
:return: True or False
|
|
"""
|
|
logging.info(f"Empfangen{var_paths}")
|
|
|
|
if isinstance(var_paths, str):
|
|
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
|
|
paths = re.split(pattern, var_paths)
|
|
else:
|
|
paths = var_paths
|
|
|
|
for path in paths:
|
|
self.get_with_ffprobe(path)
|
|
|
|
print(paths)
|
|
|
|
if len(paths):
|
|
self.save_paths()
|
|
|
|
return 1
|
|
|
|
@staticmethod
|
|
def extract_media_info(path:str, select:str):
|
|
"""
|
|
Erstellt ffprobe command for selected Streams
|
|
:param path: path to media file
|
|
:param select: v (for video), a (for audio), s (for subtitle)
|
|
:return:
|
|
"""
|
|
command = [
|
|
"ffprobe", "-v",
|
|
"error",
|
|
"-select_streams",
|
|
f"{select}",
|
|
"-show_entries",
|
|
"stream=index,channels,codec_name,codec_type,pix_fmt,level,"
|
|
"film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration",
|
|
"-show_entries",
|
|
"format=size,bit_rate,nb_streams,duration",
|
|
"-of", "json",
|
|
path
|
|
]
|
|
|
|
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
|
|
json_data = json.loads(result.stdout)
|
|
|
|
if select == "v":
|
|
if json_data.get("format") is not list:
|
|
list_format = [json_data.get("format", "")]
|
|
else:
|
|
list_format = json_data.get("format")
|
|
|
|
return json_data.get("streams", []), list_format
|
|
elif select == "a":
|
|
return json_data.get("streams", [])
|
|
elif select == "s":
|
|
return json_data.get("streams", [])
|
|
|
|
return ""
|
|
|
|
def get_with_ffprobe(self, path:str):
|
|
try:
|
|
path = path.strip()
|
|
|
|
if os.path.exists(path):
|
|
streams_video, streams_format = self.extract_media_info(path, "v")
|
|
streams_audio = self.extract_media_info(path, "a")
|
|
streams_subtitle = self.extract_media_info(path, "s")
|
|
|
|
obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format)
|
|
if not self.search_paths(path):
|
|
self.paths.update({obj.id: obj})
|
|
logging.info(obj)
|
|
else:
|
|
logging.info(f"File exists in Waiting Snake :D: {path}")
|
|
|
|
return 1
|
|
except Exception as e:
|
|
logging.error(f"Get Video Information: {e}")
|
|
return 0
|
|
|
|
def search_paths(self, path):
|
|
for obj in self.paths.values():
|
|
if obj.source_file == path:
|
|
return 1
|
|
|
|
return 0
|
|
|
|
def count_paths(self, status):
|
|
count = 0
|
|
|
|
for obj in self.paths.values():
|
|
if obj.status == status:
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def active_path_to_dict(self):
|
|
list_active_paths = {}
|
|
|
|
for obj in self.paths.values():
|
|
if obj.status == 3:
|
|
list_active_paths.update({obj.id: obj.to_dict_active_paths()})
|
|
|
|
return {"data_convert": list_active_paths}
|
|
|
|
def queue_path_to_dict(self):
|
|
list_active_paths = {}
|
|
|
|
for obj in self.paths.values():
|
|
if obj.status is None or obj.status == 1 or obj.status == 2 or obj.status == 3:
|
|
list_active_paths.update({obj.id: obj.to_dict_active_paths()})
|
|
|
|
return {"data_queue": list_active_paths}
|
|
|
|
|