Merge remote-tracking branch 'origin/development' into development

pull/1888/head
morpheus65535 3 years ago
commit f760d3198b

@ -203,10 +203,8 @@ defaults = {
'approved_only': 'False'
},
'embeddedsubtitles': {
'include_ass': 'True',
'include_srt': 'True',
'included_codecs': '[]',
'hi_fallback': 'False',
'mergerfs_mode': 'False',
'timeout': '600',
},
'subsync': {
@ -261,6 +259,7 @@ raw_keys = ['movie_default_forced', 'serie_default_forced']
array_keys = ['excluded_tags',
'exclude',
'included_codecs',
'subzero_mods',
'excluded_series_types',
'enabled_providers',

@ -227,15 +227,13 @@ def get_providers_auth():
'hashed_password': settings.ktuvit.hashed_password,
},
'embeddedsubtitles': {
'include_ass': settings.embeddedsubtitles.getboolean('include_ass'),
'include_srt': settings.embeddedsubtitles.getboolean('include_srt'),
'included_codecs': get_array_from(settings.embeddedsubtitles.included_codecs),
'hi_fallback': settings.embeddedsubtitles.getboolean('hi_fallback'),
'mergerfs_mode': settings.embeddedsubtitles.getboolean('mergerfs_mode'),
'cache_dir': os.path.join(args.config_dir, "cache"),
'ffprobe_path': _FFPROBE_BINARY,
'ffmpeg_path': _FFMPEG_BINARY,
'timeout': settings.embeddedsubtitles.timeout,
}
},
}

@ -87,33 +87,22 @@ export const ProviderList: Readonly<ProviderInfo[]> = [
description: "Embedded Subtitles from your Media Files",
inputs: [
{
type: "switch",
key: "include_srt",
name: "Include SRT",
defaultValue: true,
type: "chips",
key: "included_codecs",
name: "Allowed codecs (subrip, ass, webvtt, mov_text). Leave empty to allow all.",
defaultValue: [],
},
{
type: "switch",
key: "include_ass",
name: "Include ASS (will be converted to SRT)",
defaultValue: true,
type: "text",
key: "timeout",
defaultValue: 600,
name: "Extraction timeout in seconds",
},
{
type: "switch",
key: "hi_fallback",
name: "Use HI subtitles as a fallback (don't enable it if you have a HI language profile)",
},
{
type: "switch",
key: "mergerfs_mode",
name: "[EXPERIMENTAL] Ignore cloud video files from rclone/mergerfs",
},
{
type: "text",
key: "timeout",
defaultValue: 600,
name: "Extraction timeout in seconds",
},
],
message:
"Warning for cloud users: this provider needs to read the entire file in order to extract subtitles.",

@ -1,443 +1,7 @@
# -*- coding: utf-8 -*-
# License: GPL
from __future__ import annotations
from .container import FFprobeVideoContainer
from .stream import FFprobeSubtitleStream
import json
import logging
import os
import re
import subprocess
from typing import List, Optional
from babelfish import Language
from babelfish.exceptions import LanguageError
import pysubs2
__version__ = "0.1.4"
logger = logging.getLogger(__name__)
# Paths to executables
FFPROBE_PATH = os.environ.get("FFPROBE_PATH", "ffprobe")
FFMPEG_PATH = os.environ.get("FFMPEG_PATH", "ffmpeg")
FFMPEG_STATS = True
FF_LOG_LEVEL = "quiet"
class FeseError(Exception):
pass
class ExtractionError(FeseError):
pass
class InvalidFile(FeseError):
pass
class InvalidStream(FeseError):
pass
class InvalidSource(FeseError):
pass
class ConversionError(FeseError):
pass
class LanguageNotFound(FeseError):
pass
# Extensions
SRT = "srt"
ASS = "ass"
class FFprobeSubtitleDisposition:
def __init__(self, data: dict):
self.default = False
self.generic = False
self.dub = False
self.original = False
self.comment = False
self.lyrics = False
self.karaoke = False
self.forced = False
self.hearing_impaired = False
self.visual_impaired = False
self.clean_effects = False
self.attached_pic = False
self.timed_thumbnails = False
self._content_type = None
for key, val in data.items():
if hasattr(self, key):
setattr(self, key, bool(val))
def update_from_tags(self, tags):
tag_title = tags.get("title")
if tag_title is None:
logger.debug("Title not found. Marking as generic")
self.generic = True
return None
l_tag_title = tag_title.lower()
for key, val in _content_types.items():
if val.search(l_tag_title) is not None:
logger.debug("Found %s: %s", key, l_tag_title)
self._content_type = key
setattr(self, key, True)
return None
logger.debug("Generic disposition title found: %s", l_tag_title)
self.generic = True
return None
@property
def suffix(self):
if self._content_type is not None:
return f"-{self._content_type}"
return ""
def __str__(self):
return self.suffix.lstrip("-").upper() or "GENERIC"
class FFprobeSubtitleStream:
"""Base class for FFprobe (FFmpeg) extractable subtitle streams."""
def __init__(self, stream: dict):
"""
:raises: LanguageNotFound
"""
self.index = int(stream.get("index", 0))
self.codec_name = stream.get("codec_name", "Unknown")
self.extension = _subtitle_extensions.get(self.codec_name, self.codec_name)
self.r_frame_rate = stream.get("r_frame_rate")
self.avg_frame_rate = stream.get("avg_frame_rate")
self.time_base = stream.get("time_base")
self.tags = stream.get("tags", {})
self.start_time = float(stream.get("start_time", 0))
# TODO: separate tags
self.number_of_frames = int(self.tags.get("NUMBER_OF_FRAMES", 0))
self.number_of_frames_eng = int(
self.tags.get("NUMBER_OF_FRAMES-eng", self.number_of_frames)
)
self.duration, self.duration_ts = 0, 0
# some subtitles streams miss the duration_ts field and only have tags->DURATION field
# fixme: we still don't know if "DURATION" is a common tag/key
if "DURATION" in self.tags:
try:
h, m, s = [
ts.replace(",", ".").strip()
for ts in self.tags["DURATION"].split(":")
]
self.duration = float(s) + float(m) * 60 + float(h) * 60 * 60
self.duration_ts = int(self.duration * 1000)
except ValueError as error:
logger.warning("Couldn't get duration field: %s. Using 0", error)
else:
try:
self.duration = float(stream.get("duration", "0").replace(",", "."))
self.duration_ts = int(stream.get("duration_ts", self.duration * 1000))
# some subtitles streams miss a duration completely and has "N/A" as value
except ValueError as error:
logger.warning("Couldn't get duration field: %s. Using 0", error)
self.start_pts = int(stream.get("start_pts", 0))
self.disposition = FFprobeSubtitleDisposition(stream.get("disposition", {}))
if self.tags:
self.disposition.update_from_tags(self.tags)
self.language: Language = self._language()
@property
def suffix(self):
lang = self.language.alpha2
if self.language.country is not None:
lang = f"{lang}-{self.language.country}"
return f"{lang}{self.disposition.suffix}.{self.extension}"
def _language(self) -> Language:
og_lang = self.tags.get("language")
last_exc = None
if og_lang is not None:
if og_lang in _extra_languages:
extra = _extra_languages[og_lang]
title = self.tags.get("title", "n/a").lower()
if any(possible in title for possible in extra["matches"]):
logger.debug("Found extra language %s", extra["language_args"])
return Language(*extra["language_args"])
try:
lang = Language.fromalpha3b(og_lang)
# Test for suffix
assert lang.alpha2
return lang
except LanguageError as error:
last_exc = error
logger.debug("Error with '%s' language: %s", og_lang, error)
raise LanguageNotFound(
f"Couldn't detect language for stream: {self.tags}"
) from last_exc
def __repr__(self) -> str:
return f"<{self.codec_name.upper()}: {self.language}@{self.disposition}>"
class FFprobeVideoContainer:
def __init__(self, path: str):
self.path = path
@property
def extension(self):
return os.path.splitext(self.path)[-1].lstrip(".")
def get_subtitles(self, timeout: int = 600) -> List[FFprobeSubtitleStream]:
"""Factory function to create subtitle instances from FFprobe.
:param timeout: subprocess timeout in seconds (default: 600)
:raises: InvalidSource"""
ff_command = [
FFPROBE_PATH,
"-v",
FF_LOG_LEVEL,
"-print_format",
"json",
"-show_format",
"-show_streams",
self.path,
]
try:
result = subprocess.run(
ff_command, stdout=subprocess.PIPE, check=True, timeout=timeout
)
streams = json.loads(result.stdout)["streams"]
except _ffprobe_exceptions as error:
raise InvalidSource(
f"{error} trying to get information from {self.path}"
) from error # We want to see the traceback
subs = []
for stream in streams:
if stream.get("codec_type", "n/a") != "subtitle":
continue
try:
subs.append(FFprobeSubtitleStream(stream))
except LanguageNotFound:
pass
if not subs:
logger.debug("Source doesn't have any subtitle valid streams")
return []
logger.debug("Found subtitle streams: %s", subs)
return subs
def extract_subtitles(
self,
subtitles: List[FFprobeSubtitleStream],
custom_dir=None,
overwrite=True,
timeout=600,
):
"""Extracts a list of subtitles. Returns a dictionary of the extracted
filenames by index.
:param subtitles: a list of FFprobeSubtitle instances
:param custom_dir: a custom directory to save the subtitles. Defaults to
same directory as the media file
:param overwrite: overwrite files with the same name (default: True)
:param timeout: subprocess timeout in seconds (default: 600)
:raises: ExtractionError, OSError
"""
extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
if FFMPEG_STATS:
extract_command.append("-stats")
extract_command.extend(["-y", "-i", self.path])
if custom_dir is not None:
# May raise OSError
os.makedirs(custom_dir, exist_ok=True)
items = {}
collected_paths = set()
for subtitle in subtitles:
sub_path = f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}"
if custom_dir is not None:
sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
if sub_path in collected_paths:
sub_path = (
f"{sub_path.rstrip(f'.{subtitle.suffix}')}"
f"-{len(collected_paths)}.{subtitle.suffix}"
)
if not overwrite and os.path.isfile(sub_path):
logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
continue
extract_command.extend(
["-map", f"0:{subtitle.index}", "-c", "copy", sub_path]
)
logger.debug("Appending subtitle path: %s", sub_path)
collected_paths.add(sub_path)
items[subtitle.index] = sub_path
if not items:
logger.debug("No subtitles to extract")
return {}
logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
try:
subprocess.run(extract_command, timeout=timeout, check=True)
except (subprocess.SubprocessError, FileNotFoundError) as error:
raise ExtractionError(f"Error calling ffmpeg: {error}") from error
for path in items.values():
if not os.path.isfile(path):
logger.debug("%s was not extracted", path)
return items
def __repr__(self) -> str:
return f"<FFprobeVideoContainer {self.extension}: {self.path}>"
def check_integrity(
subtitle: FFprobeSubtitleStream, path: str, sec_offset_threshold=900
):
"""A relative check for the integriy of a file. This can be used to find a failed
ffmpeg extraction where the final file might not be complete or might be corrupted.
Currently, only ASS and Subrip are supported.
:param subtitle: FFprobeSubtitle instance
:param path: the path of the subtitle file (ass or srt)
:param sec_offset_threshold: the maximum seconds offset to determine if the file is complete
:raises: InvalidFile
"""
if subtitle.extension not in (ASS, SRT):
raise InvalidFile(f"Extension not supported: {subtitle.extension}")
try:
sub = pysubs2.load(path)
except (pysubs2.Pysubs2Error, UnicodeError, OSError, FileNotFoundError) as error:
raise InvalidFile(error) from error
else:
# ignore the duration check if the stream has no duration listed at all
if subtitle.duration_ts:
off = abs(int(sub[-1].end) - subtitle.duration_ts)
if off > abs(sec_offset_threshold) * 1000:
raise InvalidFile(
f"The last subtitle timestamp ({sub[-1].end/1000} sec) is {off/1000} sec ahead"
f" from the subtitle stream total duration ({subtitle.duration} sec)"
)
logger.debug("Integrity check passed (%d sec offset)", off / 1000)
else:
logger.warning(
"Ignoring duration check, subtitle stream has bad duration values: %s",
subtitle,
)
def to_srt(
source: str, output: Optional[str] = None, remove_source: bool = False
) -> str:
"""Convert a subtitle to SubRip. Currently, only ASS is supported. SubRip
files will be silently ignored.
raises: ConversionError, OSError"""
if source.endswith(".srt"):
return source
split_path = os.path.splitext(source)
if split_path[-1] not in (".ass"):
raise ConversionError(
f"No converter found for extension: {split_path[-1]}"
) from None
output = output or f"{split_path[0]}.srt"
try:
parsed = pysubs2.load(source)
parsed.save(output)
except (pysubs2.Pysubs2Error, UnicodeError) as error:
raise ConversionError(f"Exception converting {output}: {error}") from error
logger.debug("Converted: %s", output)
if remove_source and source != output:
try:
os.remove(source)
except OSError as error:
logger.debug("Can't remove source: %s (%s)", source, error)
return output
_subtitle_extensions = {
"subrip": "srt",
"ass": "ass",
"hdmv_pgs_subtitle": "sup",
"pgs": "sup",
}
_content_types = {
"hearing_impaired": re.compile(r"sdh|hearing impaired"),
"forced": re.compile(r"forced"),
"comment": re.compile(r"comment"),
"visual_impaired": re.compile(r"signs|visual impair"),
"karaoke": re.compile(r"karaoke|songs"),
}
_ffprobe_exceptions = (
subprocess.SubprocessError,
json.JSONDecodeError,
FileNotFoundError,
KeyError,
)
_extra_languages = {
"spa": {
"matches": (
"es-la",
"spa-la",
"spl",
"mx",
"latin",
"mexic",
"argent",
"latam",
),
"language_args": ("spa", "MX"),
},
"por": {
"matches": ("pt-br", "pob", "pb", "brazilian", "brasil", "brazil"),
"language_args": ("por", "BR"),
},
}
__version__ = "0.2"

@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
# License: GPL
from __future__ import annotations
import json
import logging
import os
import subprocess
from .exceptions import ExtractionError
from .exceptions import InvalidSource
from .exceptions import LanguageNotFound
from .exceptions import UnsupportedCodec
from .stream import FFprobeSubtitleStream
logger = logging.getLogger(__name__)
# Paths to executables
FFPROBE_PATH = os.environ.get("FFPROBE_PATH", "ffprobe")
FFMPEG_PATH = os.environ.get("FFMPEG_PATH", "ffmpeg")
FFMPEG_STATS = True
FF_LOG_LEVEL = "quiet"
class FFprobeVideoContainer:
def __init__(self, path: str):
self.path = path
@property
def extension(self):
return os.path.splitext(self.path)[-1].lstrip(".")
def get_subtitles(self, timeout: int = 600):
"""Factory function to create subtitle (stream) instances from FFprobe.
:param timeout: subprocess timeout in seconds (default: 600)
:raises: InvalidSource"""
ff_command = [
FFPROBE_PATH,
"-v",
FF_LOG_LEVEL,
"-print_format",
"json",
"-show_format",
"-show_streams",
self.path,
]
try:
result = subprocess.run(
ff_command, stdout=subprocess.PIPE, check=True, timeout=timeout
)
streams = json.loads(result.stdout)["streams"]
except _ffprobe_exceptions as error:
raise InvalidSource(
f"{error} trying to get information from {self.path}"
) from error # We want to see the traceback
subs = []
for stream in streams:
if stream.get("codec_type", "n/a") != "subtitle":
continue
try:
subs.append(FFprobeSubtitleStream(stream))
except (LanguageNotFound, UnsupportedCodec) as error:
logger.debug("Ignoring %s: %s", stream.get("codec_name"), error)
if not subs:
logger.debug("Source doesn't have any subtitle valid streams")
return []
logger.debug("Found subtitle streams: %s", subs)
return subs
def extract_subtitles(
self,
subtitles,
custom_dir=None,
overwrite=True,
timeout=600,
convert_format=None,
):
"""Extracts a list of subtitles converting them. Returns a dictionary of the
extracted filenames by index.
Most bitmap subtitles will raise UnsupportedCodec as they don't support conversion.
For such formats use copy instead.
:param subtitles: a list of FFprobeSubtitle instances
:param custom_dir: a custom directory to save the subtitles. Defaults to
same directory as the media file
:param overwrite: overwrite files with the same name (default: True)
:param timeout: subprocess timeout in seconds (default: 600)
:param convert_format: format to convert selected subtitles. Defaults to
srt
:raises: ExtractionError, UnsupportedCodec, OSError
"""
extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
if FFMPEG_STATS:
extract_command.append("-stats")
extract_command.extend(["-y", "-i", self.path])
if custom_dir is not None:
# May raise OSError
os.makedirs(custom_dir, exist_ok=True)
items = {}
collected_paths = set()
for subtitle in subtitles:
extension_to_use = convert_format or subtitle.convert_default_format
sub_path = (
f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}.{extension_to_use}"
)
if custom_dir is not None:
sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
if not overwrite and sub_path in collected_paths:
sub_path = f"{os.path.splitext(sub_path)[0]}.{len(collected_paths):02}.{extension_to_use}"
if not overwrite and os.path.isfile(sub_path):
logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
continue
extract_command.extend(subtitle.convert_args(convert_format, sub_path))
logger.debug("Appending subtitle path: %s", sub_path)
collected_paths.add(sub_path)
items[subtitle.index] = sub_path
if not items:
logger.debug("No subtitles to extract")
return {}
logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
try:
subprocess.run(extract_command, timeout=timeout, check=True)
except (subprocess.SubprocessError, FileNotFoundError) as error:
raise ExtractionError(f"Error calling ffmpeg: {error}") from error
for path in items.values():
if not os.path.isfile(path):
logger.warning("%s was not extracted", path)
return items
def copy_subtitles(
self,
subtitles,
custom_dir=None,
overwrite=True,
timeout=600,
fallback_to_convert=True,
):
"""Extracts a list of subtitles with ffmpeg's copy method. Returns a dictionary
of the extracted filenames by index.
:param subtitles: a list of FFprobeSubtitle instances
:param custom_dir: a custom directory to save the subtitles. Defaults to
same directory as the media file
:param overwrite: overwrite files with the same name (default: True)
:param timeout: subprocess timeout in seconds (default: 600)
:param fallback_to_convert: fallback to stream's default convert format if it is
incompatible with copy
:raises: ExtractionError, UnsupportedCodec, OSError
"""
extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
if FFMPEG_STATS:
extract_command.append("-stats")
extract_command.extend(["-y", "-i", self.path])
if custom_dir is not None:
# May raise OSError
os.makedirs(custom_dir, exist_ok=True)
items = {}
collected_paths = set()
for subtitle in subtitles:
sub_path = f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}.{subtitle.extension}"
if custom_dir is not None:
sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
if not overwrite and sub_path in collected_paths:
sub_path = f"{os.path.splitext(sub_path)[0]}.{len(collected_paths):02}.{subtitle.extension}"
if not overwrite and os.path.isfile(sub_path):
logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
continue
try:
extract_command.extend(subtitle.copy_args(sub_path))
except UnsupportedCodec:
if fallback_to_convert:
logger.warning(
"%s incompatible with copy. Using fallback", subtitle
)
extract_command.extend(subtitle.convert_args(None, sub_path))
else:
raise
logger.debug("Appending subtitle path: %s", sub_path)
collected_paths.add(sub_path)
items[subtitle.index] = sub_path
if not items:
logger.debug("No subtitles to extract")
return {}
logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
try:
subprocess.run(extract_command, timeout=timeout, check=True)
except (subprocess.SubprocessError, FileNotFoundError) as error:
raise ExtractionError(f"Error calling ffmpeg: {error}") from error
for path in items.values():
if not os.path.isfile(path):
logger.warning("%s was not extracted", path)
return items
def __repr__(self) -> str:
return f"<FFprobeVideoContainer {self.extension}: {self.path}>"
_ffprobe_exceptions = (
subprocess.SubprocessError,
json.JSONDecodeError,
FileNotFoundError,
KeyError,
)

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import logging
import re
logger = logging.getLogger(__name__)
class FFprobeSubtitleDisposition:
def __init__(self, data: dict):
self.default = False
self.generic = False
self.dub = False
self.original = False
self.comment = False
self.lyrics = False
self.karaoke = False
self.forced = False
self.hearing_impaired = False
self.visual_impaired = False
self.clean_effects = False
self.attached_pic = False
self.timed_thumbnails = False
self._content_type = None
for key, val in data.items():
if hasattr(self, key):
setattr(self, key, bool(val))
for key in _content_types.keys():
if getattr(self, key, None):
self._content_type = key
def update_from_tags(self, tags):
tag_title = tags.get("title")
if tag_title is None:
logger.debug("Title not found. Marking as generic")
self.generic = True
return None
l_tag_title = tag_title.lower()
for key, val in _content_types.items():
if val.search(l_tag_title) is not None:
logger.debug("Found %s: %s", key, l_tag_title)
self._content_type = key
setattr(self, key, True)
return None
logger.debug("Generic disposition title found: %s", l_tag_title)
self.generic = True
return None
@property
def suffix(self):
return self._content_type or ""
def __str__(self):
return self.suffix.upper() or "GENERIC"
_content_types = {
"hearing_impaired": re.compile(r"sdh|hearing impaired|cc"),
"forced": re.compile(r"forced|non[- ]english"),
"comment": re.compile(r"comment"),
"visual_impaired": re.compile(r"signs|visual impair"),
"karaoke": re.compile(r"karaoke|songs"),
}

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
class FeseError(Exception):
pass
class ExtractionError(FeseError):
pass
class InvalidFile(FeseError):
pass
class InvalidStream(FeseError):
pass
class InvalidSource(FeseError):
pass
class ConversionError(FeseError):
pass
class LanguageNotFound(FeseError):
pass
class UnsupportedCodec(FeseError):
pass

@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import timedelta
import logging
from .disposition import FFprobeSubtitleDisposition
from .exceptions import UnsupportedCodec
from .tags import FFprobeGenericSubtitleTags
logger = logging.getLogger(__name__)
class FFprobeSubtitleStream:
"""Base class for FFprobe (FFmpeg) extractable subtitle streams."""
def __init__(self, stream: dict):
"""
:raises: LanguageNotFound, UnsupportedCodec
"""
self.index = int(stream["index"])
self.codec_name = stream["codec_name"]
try:
self._codec = _codecs[self.codec_name]
except KeyError:
raise UnsupportedCodec(f"{self.codec_name} is not supported")
self.r_frame_rate = stream.get("r_frame_rate")
self.avg_frame_rate = stream.get("avg_frame_rate")
self.start_time = timedelta(seconds=float(stream.get("start_time", 0)))
self.start_pts = timedelta(milliseconds=int(stream.get("start_pts", 0)))
self.duration_ts = timedelta(milliseconds=int(stream.get("duration_ts", 0)))
self.duration = timedelta(seconds=float(stream.get("duration", 0)))
self.tags = FFprobeGenericSubtitleTags.detect_cls_from_data(
stream.get("tags", {})
)
self.disposition = FFprobeSubtitleDisposition(stream.get("disposition", {}))
if stream.get("tags") is not None:
self.disposition.update_from_tags(stream["tags"])
def convert_args(self, convert_format, outfile):
"""
convert_format: Union[str, None] = the codec format to convert. if None is set, defaults
to 'convert_default_format' codec's key
outfile: str = output file
raises UnsupportedCodec if convert_format doesn't exist or if the codec doesn't
support conversion
"""
convert_format = convert_format or self._codec["convert_default_format"]
if convert_format is None or not any(
convert_format == item["copy_format"] for item in _codecs.values()
):
raise UnsupportedCodec(f"Unknown convert format: {convert_format}")
if not self._codec["convert"]:
raise UnsupportedCodec(
f"{self.codec_name} codec doesn't support conversion"
)
return ["-map", f"0:{self.index}", "-f", convert_format, outfile]
def copy_args(self, outfile):
"raises UnsupportedCodec if the codec doesn't support copy"
if not self._codec["copy"] or not self._codec["copy_format"]:
raise UnsupportedCodec(f"{self.codec_name} doesn't support copy")
return [
"-map",
f"0:{self.index}",
"-c:s",
"copy",
"-f",
self._codec["copy_format"],
outfile,
]
@property
def language(self):
# Legacy
return self.tags.language
@property
def extension(self):
return self._codec["copy_format"] or self._codec["convert_default_format"] or ""
@property
def convert_default_format(self):
return self._codec["convert_default_format"]
@property
def type(self):
return self._codec["type"]
@property
def suffix(self):
return ".".join(
item
for item in (self.tags.suffix, self.disposition.suffix, self.extension)
if item
)
def __repr__(self) -> str:
return f"<{self.codec_name.upper()}: {self.tags}@{self.disposition}>"
_codecs = {
"ass": {
"type": "text",
"copy": True,
"copy_format": "ass",
"convert": True,
"convert_default_format": "srt",
},
"subrip": {
"type": "text",
"copy": True,
"copy_format": "srt",
"convert": True,
"convert_default_format": "srt",
},
"webvtt": {
"type": "text",
"copy": True,
"copy_format": "webvtt",
"convert": True,
"convert_default_format": "srt",
},
"mov_text": {
"type": "text",
"copy": False,
"copy_format": None,
"convert": True,
"convert_default_format": "srt",
},
"hdmv_pgs_subtitle": {
"type": "bitmap",
"copy": True,
"copy_format": "sup",
"convert": False,
"convert_default_format": None,
},
"dvb_subtitle": {
"type": "bitmap",
"copy": True,
"copy_format": "sup",
"convert": False,
"convert_default_format": None,
},
"dvd_subtitle": {
"type": "bitmap",
"copy": True,
"copy_format": "sup",
"convert": False,
"convert_default_format": None,
},
}

@ -0,0 +1,175 @@
from datetime import timedelta
import logging
from babelfish import Language
from babelfish.exceptions import LanguageError
from .exceptions import LanguageNotFound
logger = logging.getLogger(__name__)
class FFprobeGenericSubtitleTags:
_DETECTABLE_TAGS = None
def __init__(self, data: dict):
self.language = _get_language(data)
self._data = data
@classmethod
def detect_cls_from_data(cls, data):
for cls_ in (FFprobeMkvSubtitleTags, FFprobeMp4SubtitleTags):
if cls_.is_compatible(data):
logger.debug("Detected tags class: %s", cls_)
return cls_(data)
logger.debug("Unable to detect tags class. Using generic")
return FFprobeGenericSubtitleTags(data)
@property
def suffix(self):
lang = self.language.alpha2
if self.language.country is not None:
lang = f"{lang}-{self.language.country}"
return str(lang)
@property
def frames(self):
return 0
@classmethod
def is_compatible(cls, data):
return False
def __str__(self) -> str:
return f"{type(self).__name__}: {self.suffix}"
class FFprobeMkvSubtitleTags(FFprobeGenericSubtitleTags):
_DETECTABLE_TAGS = (
"BPS",
"BPS-eng",
"DURATION",
"DURATION-eng",
"NUMBER_OF_FRAMES",
"NUMBER_OF_FRAMES-eng",
"NUMBER_OF_BYTES",
"NUMBER_OF_BYTES-eng",
)
def __init__(self, data: dict):
super().__init__(data)
self.title = data.get("title")
self.bps = _safe_int(data.get("BPS"))
self.bps_eng = _safe_int(data.get("BPS-eng"))
self.duration = _safe_td(data.get("DURATION"))
self.duration_eng = _safe_td(data.get("DURATION-eng"))
self.number_of_frames = _safe_int(data.get("NUMBER_OF_FRAMES"))
self.number_of_frames_eng = _safe_int(data.get("NUMBER_OF_FRAMES-eng"))
self.number_of_bytes = _safe_int(data.get("NUMBER_OF_BYTES"))
self.number_of_bytes_eng = _safe_int(data.get("NUMBER_OF_BYTES-eng"))
@property
def frames(self):
return self.number_of_frames or self.number_of_frames_eng or 0
@classmethod
def is_compatible(cls, data):
return any(
key
in (
"BPS",
"BPS-eng",
"DURATION",
"DURATION-eng",
"NUMBER_OF_FRAMES",
"NUMBER_OF_FRAMES-eng",
"NUMBER_OF_BYTES",
"NUMBER_OF_BYTES-eng",
)
for key in data.keys()
)
class FFprobeMp4SubtitleTags(FFprobeGenericSubtitleTags):
_DETECTABLE_TAGS = ("creation_time", "handler_name")
def __init__(self, data: dict):
super().__init__(data)
self.creation_time = data.get("creation_time")
self.handler_name = data.get("handler_name")
@classmethod
def is_compatible(cls, data):
return any(key in ("creation_time", "handler_name") for key in data.keys())
def _get_language(tags) -> Language:
og_lang = tags.get("language")
last_exc = None
if og_lang is not None:
if og_lang in _extra_languages:
extra = _extra_languages[og_lang]
title = tags.get("title", "n/a").lower()
if any(possible in title for possible in extra["matches"]):
logger.debug("Found extra language %s", extra["language_args"])
return Language(*extra["language_args"])
try:
lang = Language.fromalpha3b(og_lang)
# Test for suffix
assert lang.alpha2
return lang
except LanguageError as error:
last_exc = error
logger.debug("Error with '%s' language: %s", og_lang, error)
raise LanguageNotFound(f"Couldn't detect language from tags: {tags}") from last_exc
def _safe_td(value, default=None):
if value is None:
return default
try:
h, m, s = [float(ts.replace(",", ".").strip()) for ts in value.split(":")]
return timedelta(hours=h, minutes=m, seconds=s)
except ValueError as error:
logger.warning("Couldn't get duration field: %s. Returning %s", error, default)
return default
def _safe_int(value, default=None):
if value is None:
return default
try:
return int(value)
except ValueError:
logger.warning("Couldn't convert to int: %s. Returning %s", value, default)
return default
_extra_languages = {
"spa": {
"matches": (
"es-la",
"spa-la",
"spl",
"mx",
"latin",
"mexic",
"argent",
"latam",
),
"language_args": ("spa", "MX"),
},
"por": {
"matches": ("pt-br", "pob", "pb", "brazilian", "brasil", "brazil"),
"language_args": ("por", "BR"),
},
}

@ -7,16 +7,14 @@ import shutil
import tempfile
from babelfish import language_converters
import fese
from fese import check_integrity
from fese import tags
from fese import container
from fese import FFprobeSubtitleStream
from fese import FFprobeVideoContainer
from fese import InvalidFile
from fese import to_srt
from fese.exceptions import InvalidSource
from subliminal.subtitle import fix_line_ending
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import MustGetBlacklisted
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle
from subzero.language import Language
@ -24,7 +22,7 @@ from subzero.language import Language
logger = logging.getLogger(__name__)
# Replace Babelfish's Language with Subzero's Language
fese.Language = Language
tags.Language = Language
class EmbeddedSubtitle(Subtitle):
@ -57,6 +55,9 @@ class EmbeddedSubtitle(Subtitle):
return f"{self.container.path}_{self.stream.index}"
_ALLOWED_CODECS = ("ass", "subrip", "webvtt", "mov_text")
class EmbeddedSubtitlesProvider(Provider):
provider_name = "embeddedsubtitles"
@ -72,33 +73,37 @@ class EmbeddedSubtitlesProvider(Provider):
def __init__(
self,
include_ass=True,
include_srt=True,
included_codecs=None,
cache_dir=None,
ffprobe_path=None,
ffmpeg_path=None,
hi_fallback=False,
mergerfs_mode=False,
timeout=600,
include_ass=None,
include_srt=None,
mergerfs_mode=None,
):
self._include_ass = include_ass
self._include_srt = include_srt
self._included_codecs = set(included_codecs or _ALLOWED_CODECS)
for codec in self._included_codecs:
if codec not in _ALLOWED_CODECS:
logger.warning("Unallowed codec: %s", codec)
self._cache_dir = os.path.join(
cache_dir or tempfile.gettempdir(), self.__class__.__name__.lower()
)
self._hi_fallback = hi_fallback
self._cached_paths = {}
self._mergerfs_mode = mergerfs_mode
self._timeout = float(timeout)
self._timeout = int(timeout)
fese.FFPROBE_PATH = ffprobe_path or fese.FFPROBE_PATH
fese.FFMPEG_PATH = ffmpeg_path or fese.FFMPEG_PATH
container.FFPROBE_PATH = ffprobe_path or container.FFPROBE_PATH
container.FFMPEG_PATH = ffmpeg_path or container.FFMPEG_PATH
if logger.getEffectiveLevel() == logging.DEBUG:
fese.FF_LOG_LEVEL = "warning"
container.FF_LOG_LEVEL = "warning"
else:
# Default is True
fese.FFMPEG_STATS = False
container.FFMPEG_STATS = False
def initialize(self):
os.makedirs(self._cache_dir, exist_ok=True)
@ -111,8 +116,8 @@ class EmbeddedSubtitlesProvider(Provider):
video = _get_memoized_video_container(path)
try:
streams = filter(_check_allowed_extensions, video.get_subtitles())
except fese.InvalidSource as error:
streams = filter(_check_allowed_codecs, video.get_subtitles())
except InvalidSource as error:
logger.error("Error trying to get subtitles for %s: %s", video, error)
self._blacklist.add(path)
streams = []
@ -128,12 +133,12 @@ class EmbeddedSubtitlesProvider(Provider):
allowed_streams = []
for stream in streams:
if not self._include_ass and stream.extension == "ass":
logger.debug("Ignoring ASS: %s", stream)
continue
if not self._include_srt and stream.extension == "srt":
logger.debug("Ignoring SRT: %s", stream)
if stream.codec_name not in self._included_codecs:
logger.debug(
"Ignoring %s (codec not included in %s)",
stream,
self._included_codecs,
)
continue
if stream.language not in languages:
@ -188,28 +193,19 @@ class EmbeddedSubtitlesProvider(Provider):
if container.path not in self._cached_paths:
# Extract all subittle streams to avoid reading the entire
# container over and over
streams = filter(_check_allowed_extensions, container.get_subtitles())
extracted = container.extract_subtitles(
list(streams), self._cache_dir, timeout=self._timeout
streams = filter(_check_allowed_codecs, container.get_subtitles())
extracted = container.copy_subtitles(
list(streams),
self._cache_dir,
timeout=self._timeout,
fallback_to_convert=True,
)
# Add the extracted paths to the containter path key
self._cached_paths[container.path] = extracted
cached_path = self._cached_paths[container.path]
# Get the subtitle file by index
subtitle_path = cached_path[subtitle.stream.index]
try:
check_integrity(subtitle.stream, subtitle_path)
except InvalidFile as error:
raise MustGetBlacklisted(subtitle.id, subtitle.media_type) from error
# Convert to SRT if the subtitle is ASS
new_subtitle_path = to_srt(subtitle_path, remove_source=True)
if new_subtitle_path != subtitle_path:
cached_path[subtitle.stream.index] = new_subtitle_path
return new_subtitle_path
return cached_path[subtitle.stream.index]
def _is_path_valid(self, path):
if path in self._blacklist:
@ -220,10 +216,6 @@ class EmbeddedSubtitlesProvider(Provider):
logger.debug("Inexistent file: %s", path)
return False
if self._mergerfs_mode and _is_fuse_rclone_mount(path):
logger.debug("Potential cloud file: %s", path)
return False
return True
@ -239,8 +231,12 @@ def _get_memoized_video_container(path: str):
return _MemoizedFFprobeVideoContainer(path)
def _check_allowed_extensions(subtitle: FFprobeSubtitleStream):
return subtitle.extension in ("ass", "srt")
def _check_allowed_codecs(subtitle: FFprobeSubtitleStream):
if subtitle.codec_name not in _ALLOWED_CODECS:
logger.debug("Unallowed codec: %s", subtitle)
return False
return True
def _check_hi_fallback(streams, languages):
@ -270,10 +266,10 @@ def _check_hi_fallback(streams, languages):
def _discard_possible_incomplete_subtitles(streams):
"""Check number_of_frames attributes from subtitle streams in order to find
"""Check frame properties from subtitle streams in order to find
supposedly incomplete subtitles"""
try:
max_frames = max(stream.number_of_frames for stream in streams)
max_frames = max(stream.tags.frames for stream in streams)
except ValueError:
return []
@ -288,11 +284,11 @@ def _discard_possible_incomplete_subtitles(streams):
for stream in streams:
# 500 < 1200
if stream.number_of_frames < max_frames // 2:
if stream.tags.frames < max_frames // 2:
logger.debug(
"Possible bad subtitle found: %s (%s frames - %s frames)",
stream,
stream.number_of_frames,
stream.tags.frames,
max_frames,
)
continue
@ -302,20 +298,6 @@ def _discard_possible_incomplete_subtitles(streams):
return valid_streams
def _is_fuse_rclone_mount(path: str):
# Experimental!
# This function only makes sense if you are combining a rclone mount with a local mount
# with mergerfs or similar tools. Don't use it otherwise.
# It tries to guess whether a file is a cloud mount by the length
# of the inode number. See the following links for reference.
# https://forum.rclone.org/t/fuse-inode-number-aufs/215/5
# https://pkg.go.dev/bazil.org/fuse/fs?utm_source=godoc#GenerateDynamicInode
return len(str(os.stat(path).st_ino)) > 18
def _get_pretty_release_name(stream, container):
bname = os.path.basename(container.path)
return f"{os.path.splitext(bname)[0]}.{stream.suffix}"

@ -24,3 +24,13 @@ def test_get_providers_auth_with_provider_registry():
raise ValueError(f"'{sub_key}' parameter not present in {provider}")
assert sign.parameters[sub_key] is not None
def test_get_providers_auth_embeddedsubtitles():
item = get_providers.get_providers_auth()["embeddedsubtitles"]
assert isinstance(item["included_codecs"], list)
assert isinstance(item["hi_fallback"], bool)
assert isinstance(item["cache_dir"], str)
assert isinstance(item["ffprobe_path"], str)
assert isinstance(item["ffmpeg_path"], str)
assert isinstance(item["timeout"], str)

@ -1,23 +1,21 @@
# -*- coding: utf-8 -*-
import os
import tempfile
import fese
from fese import FFprobeSubtitleStream
from fese import FFprobeVideoContainer
from fese import tags
import pytest
import subliminal_patch
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import MustGetBlacklisted
from subliminal_patch.providers.embeddedsubtitles import _MemoizedFFprobeVideoContainer
from subliminal_patch.providers.embeddedsubtitles import EmbeddedSubtitlesProvider
from subliminal_patch.providers.embeddedsubtitles import (
_discard_possible_incomplete_subtitles,
)
from subliminal_patch.providers.embeddedsubtitles import _get_pretty_release_name
from subliminal_patch.providers.embeddedsubtitles import _MemoizedFFprobeVideoContainer
from subliminal_patch.providers.embeddedsubtitles import EmbeddedSubtitlesProvider
from subzero.language import Language
fese.Language = Language
tags.Language = Language
@pytest.fixture
@ -46,8 +44,7 @@ def video_multiple_languages(data):
@pytest.fixture
def config(tmpdir):
return {
"include_ass": True,
"include_srt": True,
"included_codecs": None,
"cache_dir": tmpdir,
"ffprobe_path": None,
"ffmpeg_path": None,
@ -65,11 +62,25 @@ def video_inexistent(tmpdir):
)
def test_language_is_subzero_type():
assert tags.Language == Language
def test_init(config):
with EmbeddedSubtitlesProvider(**config) as provider:
assert provider is not None
def test_init_empty_included_codecs():
with EmbeddedSubtitlesProvider(included_codecs=[]) as provider:
assert provider._included_codecs == {"ass", "subrip", "webvtt", "mov_text"}
def test_init_custom_included_codecs():
with EmbeddedSubtitlesProvider(included_codecs=["ass"]) as provider:
assert provider._included_codecs == {"ass"}
def test_inexistent_video(video_inexistent):
with EmbeddedSubtitlesProvider() as provider:
subtitles = provider.list_subtitles(video_inexistent, {})
@ -124,7 +135,6 @@ def test_list_subtitles_hi_fallback_one_stream(
)
fake = _MemoizedFFprobeVideoContainer.get_subtitles("")[0]
assert fake.disposition.hearing_impaired == True
subs = provider.list_subtitles(video_single_language, {language})
assert subs
assert subs[0].hearing_impaired == False
@ -154,13 +164,17 @@ def test_list_subtitles_hi_fallback_multiple_language_streams(
mocker.patch(
# "fese.FFprobeVideoContainer.get_subtitles",
"subliminal_patch.providers.embeddedsubtitles._MemoizedFFprobeVideoContainer.get_subtitles",
return_value=[fake_streams["en_hi"], fake_streams["es"], fake_streams["es_hi"]],
return_value=[
fake_streams["en_hi"],
fake_streams["es"],
fake_streams["es_hi"],
],
)
subs = provider.list_subtitles(video_single_language, languages)
assert len(subs) == 3
assert subs[0].hearing_impaired == False # English subittle
assert subs[1].hearing_impaired == False # Spanish subtitle
assert subs[2].hearing_impaired == True # Spanish HI subtitle
assert subs[2].hearing_impaired == True # Spanish HI subtitle
def test_list_subtitles_hi_fallback_multiple_hi_streams(
@ -218,7 +232,7 @@ def test_list_subtitles_multiple_languages(video_multiple_languages):
def test_list_subtitles_wo_ass(video_single_language):
with EmbeddedSubtitlesProvider(include_ass=False) as provider:
with EmbeddedSubtitlesProvider(included_codecs=("srt",)) as provider:
subs = provider.list_subtitles(
video_single_language, {Language.fromalpha2("en")}
)
@ -226,13 +240,25 @@ def test_list_subtitles_wo_ass(video_single_language):
def test_list_subtitles_wo_srt(video_multiple_languages):
with EmbeddedSubtitlesProvider(include_srt=False) as provider:
with EmbeddedSubtitlesProvider(included_codecs=("ass",)) as provider:
subs = provider.list_subtitles(
video_multiple_languages, {Language.fromalpha2("en")}
)
assert not subs
def test_get_pretty_release_name():
stream = FFprobeSubtitleStream(
{
"index": 1,
"codec_name": "subrip",
"tags": {"language": "eng", "title": "forced"},
}
)
container = FFprobeVideoContainer("foo.mkv")
assert _get_pretty_release_name(stream, container) == "foo.en.forced.srt"
def test_download_subtitle_multiple(video_multiple_languages):
with EmbeddedSubtitlesProvider() as provider:
languages = {Language.fromalpha2(code) for code in ("en", "it", "fr")} | {
@ -242,7 +268,7 @@ def test_download_subtitle_multiple(video_multiple_languages):
subs = provider.list_subtitles(video_multiple_languages, languages)
for sub in subs:
provider.download_subtitle(sub)
assert sub.content is not None
assert sub.is_valid()
def test_download_subtitle_single(video_single_language):
@ -251,23 +277,7 @@ def test_download_subtitle_single(video_single_language):
video_single_language, {Language.fromalpha2("en")}
)[0]
provider.download_subtitle(subtitle)
assert subtitle.content is not None
def test_download_invalid_subtitle(video_single_language):
with EmbeddedSubtitlesProvider() as provider:
subtitle = provider.list_subtitles(
video_single_language, {Language.fromalpha2("en")}
)[0]
provider._cached_paths[subtitle.container.path] = {
subtitle.stream.index: "dummy.srt"
}
try:
provider.download_subtitle(subtitle)
except MustGetBlacklisted as error:
assert error.id == subtitle.id
assert error.media_type == subtitle.media_type
assert subtitle.is_valid()
def test_memoized(video_single_language, mocker):

Loading…
Cancel
Save