Adding customized version of pyprobe to libs.

pull/551/head
Louis Vézina 5 years ago
parent 9a7372c880
commit 07bf13c87e

@ -0,0 +1,2 @@
from pyprobe import VideoFileParser

@ -0,0 +1,41 @@
class BaseParser:
@classmethod
def parse(cls, data, rawMode, includeMissing):
"""Core of the parser classes
Collects all methods prefixed with "value_" and builds a dict of
their return values. Parser classes will inherit from this class.
All methods that begin with "value_" in a parser class will be given
the same `data` argument and are expected to pull their corresponding
value from the collection.
These methods return a tuple - their raw value and formatted value.
The raw value is a string or tuple of string and the formatted value
be of type string, int, float, or tuple.
If no data is found in a method, the raw value is expected to be None,
and for the formatted value, strings will be "null", ints will be 0,
floats will be 0.0.
Args:
data (dict): Raw video data
rawMode (bool): Returns raw values instead of formatted values
includeMissing (bool): If value is missing, return "empty" value
Returns:
dict<str, dict<str, var>>: Parsed data from class methods, may not have every value.
"""
parsers = [getattr(cls, p) for p in dir(cls) if p.startswith("value_")]
info = {}
for parser in parsers:
parsed_raw, parsed_formatted = parser(data)
if parsed_raw == None and not includeMissing:
continue
name = parser.__name__[6:]
if rawMode:
info[name] = parsed_raw
else:
info[name] = parsed_formatted
return info

@ -0,0 +1,215 @@
from os import path
from baseparser import BaseParser
class StreamParser(BaseParser):
@staticmethod
def value_codec(data):
"""Returns a string"""
info = data.get("codec_name", None)
return info, (info or "null")
@staticmethod
def value_format(data):
"""Returns a string"""
info = data.get("format_name", None)
return info, (info or "null")
@staticmethod
def value_bit_rate(data):
"""Returns an int"""
info = data.get("bit_rate", None)
try:
return info, int(float(info))
except (ValueError, TypeError):
return info, 0
class VideoStreamParser(BaseParser):
@staticmethod
def value_codec(data):
return StreamParser.value_codec(data)
@staticmethod
def value_format(data):
return StreamParser.value_format(data)
@staticmethod
def value_bit_rate(data):
return StreamParser.value_bit_rate(data)
@staticmethod
def value_resolution(data):
"""Returns a tuple (width, height)"""
width = data.get("width", None)
height = data.get("height", None)
if width is None and height is None:
return None, (0, 0)
try:
return (width, height), (int(float(width)), int(float(height)))
except (ValueError, TypeError):
return (width, height), (0, 0)
@staticmethod
def average_framerate(data):
"""Returns an int"""
frames = data.get("nb_frames", None)
duration = data.get("duration", None)
try:
return float(frames) / float(duration)
except (ValueError, TypeError, ZeroDivisionError):
return 0.0
@classmethod
def value_framerate(cls, data):
"""Returns a float"""
input_str = data.get("avg_frame_rate", None)
try:
num, den = input_str.split("/")
return input_str, round(float(num) / float(den), 3)
except (ValueError, ZeroDivisionError, AttributeError):
info = cls.average_framerate(data)
return input_str, info
@staticmethod
def value_aspect_ratio(data):
"""Returns a string"""
info = data.get("display_aspect_ratio", None)
return info, (info or "null")
@staticmethod
def value_pixel_format(data):
"""Returns a string"""
info = data.get("pix_fmt", None)
return info, (info or "null")
class AudioStreamParser(StreamParser):
@staticmethod
def value_sample_rate(data):
"""Returns an int - audio sample rate in Hz"""
info = data.get("sample_rate", None)
try:
return info, int(float(info))
except (ValueError, TypeError):
return info, 0
@staticmethod
def value_channel_count(data):
"""Returns an int"""
info = data.get("channels", None)
try:
return info, int(float(info))
except (ValueError, TypeError):
return info, 0
@staticmethod
def value_channel_layout(data):
"""Returns a string"""
info = data.get("channel_layout", None)
return info, (info or "null")
class SubtitleStreamParser(BaseParser):
@staticmethod
def value_codec(data):
return StreamParser.value_codec(data)
@staticmethod
def value_language(data):
"""Returns a string """
tags = data.get("tags", None)
if tags:
info = tags.get("language", None)
return info, (info or "null")
return None, "null"
@staticmethod
def value_forced(data):
"""Returns a bool """
disposition = data.get("disposition", None)
if disposition:
info = disposition.get("forced", None)
return bool(info), (bool(info) or False)
return None, "null"
class ChapterParser(BaseParser):
@staticmethod
def value_start(data):
"""Returns an int"""
info = data.get("start_time", None)
try:
return info, float(data.get("start_time"))
except (ValueError, TypeError):
return info, 0
@classmethod
def value_end(cls, data):
"""Returns a float"""
info = data.get("end_time", None)
try:
return info, float(info)
except (ValueError, TypeError):
return info, 0
@staticmethod
def value_title(data):
"""Returns a string"""
info = data.get("tags", {}).get("title", None)
return info, (info or "null")
@staticmethod
def fillEmptyTitles(chapters):
"""Add text in place of empty titles
If a chapter doesn't have a title, this will add a basic
string in the form "Chapter `index+1`"
Args:
chapters(list<dict>): The list of parsed chapters
"""
index = 0
for chapter in chapters:
if not chapter["title"]:
chapter["title"] = "Chapter " + str(index)
index += 1
class RootParser(BaseParser):
@staticmethod
def value_duration(data):
"""Returns an int"""
info = data.get("duration", None)
try:
return info, float(info)
except (ValueError, TypeError):
return info, 0.0
@staticmethod
def value_size(data):
"""Returns an int"""
info = data.get("size", None)
if info is None:
file_path = data.get("filename", "")
if path.isfile(file_path):
info = str(path.getsize(file_path))
try:
return info, int(float(info))
except (ValueError, TypeError):
return info, 0
@classmethod
def value_bit_rate(cls, data):
"""Returns an int"""
info = data.get("bit_rate", None)
if info is None:
_, size = cls.value_size(data)
_, duration = cls.value_duration(data)
if size and duration:
info = size / (duration / 60 * 0.0075) / 1000
try:
return info, int(float(info))
except (ValueError, TypeError):
return info, 0

@ -0,0 +1,213 @@
import json
import subprocess
from os import path
from sys import getfilesystemencoding
import ffprobeparsers
class VideoFileParser:
def __init__(
self,
ffprobe="ffprobe",
includeMissing=True,
rawMode=False,
):
self._ffprobe = ffprobe
self._includeMissing = includeMissing
self._rawMode = rawMode
########################################
# Main Method
def parseFfprobe(self, inputFile):
"""Takes an input file and returns the parsed data using ffprobe.
Args:
inputFile (str): Video file path
Returns:
dict<str, dict<str, var>>: Parsed video info
Raises:
FileNotFoundError: The input video file or input executable was not found
IOError: Execution failed
"""
if not path.isfile(inputFile):
raise FileNotFoundError(inputFile + " not found")
self._checkExecutable(self._ffprobe)
fdict = self._executeFfprobe(inputFile)
return self._parseFfprobe(fdict, inputFile)
########################################
# ffprobe Parsing
def _executeFfprobe(self, inputFile):
"""Executes ffprobe program on input file to get raw info
fdict = dict<str, fdict> or dict<str, str>
Args:
inputFile (str): Video file path
Returns:
fdict: Parsed data
"""
commandArgs = [
"-v",
"quiet",
"-hide_banner",
"-show_error",
"-show_format",
"-show_streams",
"-show_programs",
"-show_chapters",
"-show_private_data",
"-print_format",
"json",
]
outputJson = self._executeParser(self._ffprobe, commandArgs, inputFile)
try:
data = json.loads(outputJson)
except json.JSONDecodeError:
raise IOError("Could not decode ffprobe output for file " + inputFile)
return data
def _parseFfprobe(self, fOutput, inputFile):
"""Parse all data from fOutput to organized format
fdict = dict<str, fdict> or dict<str, str>
Args:
fOutput (fdict): Stream data from ffprobe
inputFile (str): Video file path
Returns:
dict<str, dict<str, str>>: Parsed video data
"""
videoInfo = {}
videoInfo["path"] = path.abspath(inputFile)
videoInfo.update(
ffprobeparsers.RootParser.parse(
fOutput["format"], self._rawMode, self._includeMissing
)
)
videoInfo.update(self._parseFfprobeStreams(fOutput))
videoInfo.update(self._parseFfprobeChapters(fOutput))
if not self._rawMode:
ffprobeparsers.ChapterParser.fillEmptyTitles(videoInfo["chapters"])
return videoInfo
def _parseFfprobeStreams(self, fOutput):
"""Parses video, audio, and subtitle streams
fdict = dict<str, fdict> or dict<str, str>
Args:
streams_data (fdict): Stream data from ffprobe
Returns:
dict<str, dict<str, var>>: Parsed streams - video, audio, and subtitle
"""
parsedInfo = {"videos": [], "audios": [], "subtitles": []}
for stream in fOutput["streams"]:
streamType = stream["codec_type"]
data = None
if streamType == "video":
data = ffprobeparsers.VideoStreamParser.parse(
stream, self._rawMode, self._includeMissing
)
parsedInfo["videos"].append(data)
elif streamType == "audio":
data = ffprobeparsers.AudioStreamParser.parse(
stream, self._rawMode, self._includeMissing
)
parsedInfo["audios"].append(data)
elif streamType == "subtitle":
data = ffprobeparsers.SubtitleStreamParser.parse(
stream, self._rawMode, self._includeMissing
)
parsedInfo["subtitles"].append(data)
return parsedInfo
def _parseFfprobeChapters(self, fOutput):
"""Parses chapters
fdict = dict<str, fdict> or dict<str, str>
Args:
chapters_data (fdict): Stream data from ffprobe
Returns:
dict<str, dict<str, var>>: Parsed chapters
"""
parsedInfo = {"chapters": []}
if fOutput["chapters"] is None:
return parsedInfo
for chapter in fOutput["chapters"]:
parsedInfo["chapters"].append(
ffprobeparsers.ChapterParser.parse(
chapter, self._rawMode, self._includeMissing
)
)
return parsedInfo
########################################
# Misc Methods
@staticmethod
def _executeParser(parser, commandArgs, inputFile):
"""Executes parser on the input file
Args:
parser (str): Executable location or command
commandArgs (list of strings): Extra command arguments
inputFile (str): the input file location
Raises:
IOError: ffprobe execution failed
"""
command = [parser] + commandArgs + [inputFile.encode(getfilesystemencoding())]
try:
completedProcess = subprocess.check_output(
command, stderr=subprocess.STDOUT
)
except subprocess.CalledProcessError as e:
raise IOError(
"Error occurred during execution - " + e.output
)
return completedProcess
@staticmethod
def _checkExecutable(executable):
"""Checks if target is executable
Args:
executable (str): Executable location, can be file or command
Raises:
FileNotFoundError: Executable was not found
"""
try:
subprocess.check_output(
[executable, "--help"],
stderr=subprocess.STDOUT
)
except OSError:
raise FileNotFoundError(executable + " not found")
class FileNotFoundError(Exception):
pass
class IOError(Exception):
pass

@ -15,6 +15,7 @@ guessit=2.1.4
langdetect=1.0.7
py-pretty=1
pycountry=18.2.23
pyprobe=0.1.2 <-- modified version: do not update!!!
pysrt=1.1.1
pytz=2018.4
rarfile=3.0

Loading…
Cancel
Save