python.video_converter_v3/app/class_file_path.py

197 lines
5.9 KiB
Python
Executable file

import logging
import re
import os
import json
import yaml
import subprocess
from app.class_media_file import Media
class Path:
def __init__(self, cfg):
"""
Receive Paths extract from String and save to a List Variable.
Methoden: save_paths / read_paths / delete_path / receive_path
:param cfg: Global Settings Object
"""
# Settings
self.yaml = cfg
# Autostart
# self.read_paths()
# Variablen
self.paths:dict = {}
def save_paths(self):
"""
Saves the extrated Paths in a File
:return: True or False
"""
paths:list = []
for obj in self.paths.values():
if obj.status is None:
paths.append(obj.source_file)
paths_extrat_dict = {"paths": paths}
try:
with open(f"app/cfg/{self.yaml["path_file"]}", "w", encoding="utf8") as file:
yaml.dump(paths_extrat_dict, file, default_flow_style=False, indent=4)
logging.info(f"{len(self.paths)} paths were saved to file")
return 1
except FileNotFoundError:
logging.error(f"File {self.yaml["path_file"]} not found")
return 0
except IOError:
logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged")
return 0
# Achtung Abrufen aus Datei und neu einlesen der ffprobe Daten fehlt noch
def read_paths(self):
"""
Read Media Paths from a File
:return: True or False
"""
try:
with open(f"app/cfg/{self.yaml["path_file"]}", "r", encoding="utf8") as file:
list_paths = yaml.safe_load(file)
if len(list_paths) > 0:
paths = list_paths
logging.info(f"{len(paths)} paths were read from file")
return 1
except FileNotFoundError:
logging.error(f"File {self.yaml["path_file"]} not found")
return 0
except IOError:
logging.critical(f"Error file {self.yaml["path_file"]} maybe damaged")
return 0
def delete_path(self, obj_id:int):
"""
Delete a Media Path from a List Variable and overwrite the path file
:param obj_id: Path from a Media File which to delete
:return: True or False
"""
try:
del self.paths[obj_id]
self.save_paths()
return 1
except ValueError:
return 0
def receive_paths(self, var_paths:str):
"""
Splitting the Path String to List Single Media Paths
:param var_paths: String of a single or more Media Paths
:return: True or False
"""
pattern = r"(?<=\.mkv\s|\.mp4\s|\.avi\s)|(?<=\.webm\s)"
paths = re.split(pattern, var_paths)
for path in paths:
self.get_with_ffprobe(path)
print(paths)
self.save_paths()
return 1
@staticmethod
def extract_media_info(path:str, select:str):
"""
Erstellt ffprobe command for selected Streams
:param path: path to media file
:param select: v (for video), a (for audio), s (for subtitle)
:return:
"""
command = [
"ffprobe", "-v",
"error",
"-select_streams",
f"{select}",
"-show_entries",
"stream=index,channels,codec_name,codec_type,pix_fmt,level,"
"film_grain,r_frame_rate,bit_rate,sample_rate,width,height,size,tags:stream_tags=language,duration",
"-show_entries",
"format=size,bit_rate,nb_streams",
"-of", "json",
path
]
result = subprocess.run(command, stdout=subprocess.PIPE, text=True)
json_data = json.loads(result.stdout)
if select == "v":
if json_data.get("format") is not list:
list_format = [json_data.get("format", "")]
else:
list_format = json_data.get("format")
return json_data.get("streams", []), list_format
elif select == "a":
return json_data.get("streams", [])
elif select == "s":
return json_data.get("streams", [])
def get_with_ffprobe(self, path:str):
try:
path = path.strip()
if os.path.exists(path):
streams_video, streams_format = self.extract_media_info(path, "v")
streams_audio = self.extract_media_info(path, "a")
streams_subtitle = self.extract_media_info(path, "s")
obj = Media(path, streams_video, streams_audio, streams_subtitle, streams_format)
if not self.search_paths(path):
self.paths.update({obj.id: obj})
logging.info(obj)
else:
logging.info(f"File exists in Waiting Snake :D: {path}")
return 1
except Exception as e:
logging.error(f"Get Video Information: {e}")
return 0
def search_paths(self, path):
for obj in self.paths.values():
if obj.source_file == path:
return 1
return 0
def count_paths(self, status):
count = 0
for obj in self.paths.values():
if obj.status == status:
count += 1
return count
def active_path_to_dict(self):
list_active_paths = {}
for obj in self.paths.values():
if obj.status == 3:
list_active_paths.update({obj.id: obj.to_dict_active_paths()})
return {"data_convert": list_active_paths}
def queue_path_to_dict(self):
list_active_paths = {}
for obj in self.paths.values():
if obj.status is None:
list_active_paths.update({obj.id: obj.to_dict_active_paths()})
return {"data_queue": list_active_paths}