You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/subliminal_patch/providers/argenteam.py

284 lines
9.4 KiB

# coding=utf-8
from __future__ import absolute_import
import logging
import os
import io
import time
import urllib.parse
from json import JSONDecodeError
from zipfile import ZipFile
from guessit import guessit
from requests import Session
from subliminal import Episode, Movie
from subliminal.utils import sanitize
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle, guess_matches
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subzero.language import Language
BASE_URL = "https://argenteam.net"
API_URL = f"{BASE_URL}/api/v1"
logger = logging.getLogger(__name__)
class ArgenteamSubtitle(Subtitle):
provider_name = "argenteam"
hearing_impaired_verifiable = False
def __init__(self, language, page_link, download_link, release_info, matches):
super(ArgenteamSubtitle, self).__init__(language, page_link=page_link)
self.page_link = page_link
self.download_link = download_link
self.found_matches = matches
self._release_info = release_info
# Original subtitle filename guessed from the URL
self.release_info = urllib.parse.unquote(self.download_link.split("/")[-1])
@property
def id(self):
return self.download_link
def get_matches(self, video):
type_ = "episode" if isinstance(video, Episode) else "movie"
self.found_matches |= guess_matches(
video,
guessit(self.release_info, {"type": type_}),
)
self.found_matches |= guess_matches(
video,
guessit(self._release_info, {"type": type_}),
)
return self.found_matches
class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
provider_name = "argenteam"
# Safe to assume every subtitle from Argenteam is Latam Spanish
languages = {Language("spa", "MX")}
video_types = (Episode, Movie)
subtitle_class = ArgenteamSubtitle
hearing_impaired_verifiable = False
language_list = list(languages)
multi_result_throttle = 2 # seconds
def __init__(self):
self.session = Session()
def initialize(self):
self.session.headers.update(
{"User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")}
)
def terminate(self):
self.session.close()
def query(self, title, video, titles=None):
is_episode = isinstance(video, Episode)
season = episode = None
url = f"{API_URL}/movie"
if is_episode:
season = video.season
episode = video.episode
url = f"{API_URL}/episode"
argenteam_ids = self._search_ids(
title, season=season, episode=episode, titles=titles
)
else:
argenteam_ids = self._search_ids(
title, year=video.year, imdb_id=video.imdb_id, titles=titles
)
if not argenteam_ids:
return []
language = self.language_list[0]
subtitles = []
has_multiple_ids = len(argenteam_ids) > 1
for aid in argenteam_ids:
response = self.session.get(url, params={"id": aid}, timeout=10)
response.raise_for_status()
try:
content = response.json()
except JSONDecodeError:
continue
if not content or not content.get("releases"):
continue
imdb_id = year = None
returned_title = title
if not is_episode and "info" in content:
imdb_id = content["info"].get("imdb")
year = content["info"].get("year")
returned_title = content["info"].get("title", title)
for r in content["releases"]:
for s in r["subtitles"]:
movie_kind = "episode" if is_episode else "movie"
page_link = f"{BASE_URL}/{movie_kind}/{aid}"
release_info = self._combine_release_info(r)
download_link = s["uri"].replace("http://", "https://")
matches_ = self._get_query_matches(
video,
movie_kind=movie_kind,
season=season,
episode=episode,
title=returned_title,
year=year,
imdb_id=imdb_id,
tvdb_id=content.get("tvdb"),
)
if matches_ is not None:
subtitles.append(
ArgenteamSubtitle(
language,
page_link,
download_link,
release_info,
matches_,
)
)
if has_multiple_ids:
time.sleep(self.multi_result_throttle)
return subtitles
def list_subtitles(self, video, languages):
if isinstance(video, Episode):
titles = [video.series] + video.alternative_series[:2]
else:
titles = [video.title] + video.alternative_titles[:2]
for title in titles:
subs = self.query(title, video, titles=titles)
if subs:
return subs
time.sleep(self.multi_result_throttle)
return []
def download_subtitle(self, subtitle):
# download as a zip
logger.info("Downloading subtitle %r", subtitle)
r = self.session.get(subtitle.download_link, timeout=10)
r.raise_for_status()
# open the zip
with ZipFile(io.BytesIO(r.content)) as zf:
subtitle.content = self.get_subtitle_from_archive(subtitle, zf)
def _search_ids(self, title, **kwargs):
query = title
titles = kwargs.get("titles") or []
is_episode = False
if kwargs.get("season") and kwargs.get("episode"):
is_episode = True
query = f"{title} S{kwargs['season']:02}E{kwargs['episode']:02}"
logger.debug(f"Searching ID (episode: {is_episode}) for {query}")
r = self.session.get(f"{API_URL}/search", params={"q": query}, timeout=10)
r.raise_for_status()
try:
results = r.json()
except JSONDecodeError:
return []
if not results.get("results"):
return []
match_ids = []
for result in results["results"]:
if result["type"] == "movie" and is_episode:
continue
imdb = f"tt{result.get('imdb', 'n/a')}"
if not is_episode and imdb == kwargs.get("imdb_id"):
logger.debug("Movie matched by IMDB ID, taking shortcut")
match_ids = [result["id"]]
break
# advanced title check in case of multiple movie results
title_year = kwargs.get("year") and kwargs.get("title")
if results["total"] > 1 and not is_episode and title_year:
sanitized = sanitize(result["title"])
titles = [f"{sanitize(name)} {kwargs['year']}" for name in titles]
if sanitized not in titles:
continue
match_ids.append(result["id"])
if match_ids:
ids = ", ".join(str(id) for id in match_ids)
logger.debug("Found matching IDs: %s", ids)
else:
logger.debug("Nothing found from %s query", query)
return match_ids
def _get_query_matches(self, video, **kwargs):
matches = set()
if isinstance(video, Episode) and kwargs.get("movie_kind") == "episode":
if (kwargs.get("tvdb_id") and video.series_tvdb_id) and str(
video.series_tvdb_id
) != str(kwargs.get("tvdb_id")):
logger.debug(
"TVDB ID not matched: %s - %s", kwargs, video.series_tvdb_id
)
return None
if video.series and (
sanitize(kwargs.get("title"))
in (
sanitize(name) for name in [video.series] + video.alternative_series
)
):
matches.add("series")
if video.season and kwargs.get("season") == video.season:
matches.add("season")
if video.episode and kwargs.get("episode") == video.episode:
matches.add("episode")
# year (year is not available for series, but we assume it matches)
matches.add("year")
elif isinstance(video, Movie) and kwargs.get("movie_kind") == "movie":
if video.title and (
sanitize(kwargs.get("title"))
in (sanitize(name) for name in [video.title] + video.alternative_titles)
):
matches.add("title")
if video.imdb_id and f"tt{kwargs.get('imdb_id')}" == str(video.imdb_id):
matches.add("imdb_id")
if video.year and kwargs.get("year") == video.year:
matches.add("year")
else:
logger.info(f"{kwargs.get('movie_kind')} is not a valid movie_kind")
return matches
def _combine_release_info(self, release_dict):
keys = ("source", "codec", "tags", "team")
combine = [release_dict.get(key) for key in keys if release_dict.get(key)]
if combine:
return ".".join(combine)
return "Unknown"