diff --git a/libs/subliminal_patch/providers/argenteam.py b/libs/subliminal_patch/providers/argenteam.py index 020bbc9be..2945104fc 100644 --- a/libs/subliminal_patch/providers/argenteam.py +++ b/libs/subliminal_patch/providers/argenteam.py @@ -9,141 +9,45 @@ from zipfile import ZipFile from guessit import guessit from requests import Session from subliminal import Episode, Movie -from subliminal.score import get_equivalent_release_groups -from subliminal.utils import sanitize_release_group, sanitize +from subliminal.utils import sanitize from subliminal_patch.providers import Provider from subliminal_patch.subtitle import Subtitle, guess_matches from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin from subzero.language import Language +BASE_URL = "https://argenteam.net/" +API_URL = BASE_URL + "api/v1/" + logger = logging.getLogger(__name__) class ArgenteamSubtitle(Subtitle): - provider_name = 'argenteam' + provider_name = "argenteam" hearing_impaired_verifiable = False - _release_info = None - def __init__(self, language, page_link, download_link, movie_kind, title, season, episode, year, release, version, source, - video_codec, tvdb_id, imdb_id, asked_for_episode=None, asked_for_release_group=None, *args, **kwargs): - super(ArgenteamSubtitle, self).__init__(language, page_link=page_link, *args, **kwargs) + def __init__(self, language, page_link, download_link, release_info, matches): + super(ArgenteamSubtitle, self).__init__(language, page_link=page_link) self.page_link = page_link self.download_link = download_link - self.movie_kind = movie_kind - self.title = title - self.year = year - self.season = season - self.episode = episode - self.release = release - self.version = version - self.asked_for_release_group = asked_for_release_group - self.asked_for_episode = asked_for_episode - self.matches = None - self.source = source - self.video_codec = video_codec - self.tvdb_id = tvdb_id - self.imdb_id = "tt" + imdb_id if imdb_id else None - self.releases = self.release_info + self.found_matches = matches + self.release_info = release_info @property def id(self): return self.download_link - @property - def release_info(self): - if self._release_info: - return self._release_info - - combine = [] - for attr in ("source", "version"): - value = getattr(self, attr) - if value: - combine.append(value) - - self._release_info = u".".join(combine) + (u"-"+self.release if self.release else "") - return self._release_info - - def __repr__(self): - ep_addon = (" S%02dE%02d" % (self.season, self.episode)) if self.episode else "" - return '<%s %r [%s]>' % ( - self.__class__.__name__, u"%s%s%s." % (self.title, " (%s)" % self.year if self.year else "", ep_addon) + - self.release_info, self.language) - def get_matches(self, video): - matches = set() - # series - if isinstance(video, Episode) and self.movie_kind == 'episode': - if video.series and (sanitize(self.title) in ( - sanitize(name) for name in [video.series] + video.alternative_series)): - matches.add('series') - - # season - if video.season and self.season == video.season: - matches.add('season') - - # episode - if video.episode and self.episode == video.episode: - matches.add('episode') - - # tvdb_id - if video.tvdb_id and str(self.tvdb_id) == str(video.tvdb_id): - matches.add('tvdb_id') - - # year (year is not available for series, but we assume it matches) - matches.add('year') - - elif isinstance(video, Movie) and self.movie_kind == 'movie': - # title - if video.title and (sanitize(self.title) in ( - sanitize(name) for name in [video.title] + video.alternative_titles)): - matches.add('title') - - # imdb_id - if video.imdb_id and self.imdb_id and str(self.imdb_id) == str(video.imdb_id): - matches.add('imdb_id') - - # year - if video.year and self.year == video.year: - matches.add('year') - else: - logger.info('%r is not a valid movie_kind', self.movie_kind) - return matches - - # release_group - if video.release_group and self.release: - rg = sanitize_release_group(video.release_group) - if any(r in sanitize_release_group(self.release) for r in get_equivalent_release_groups(rg)): - matches.add('release_group') - - # blatantly assume we've got a matching source if the release group matches - # fixme: smart? - #matches.add('source') - - # resolution - if video.resolution and self.version and str(video.resolution) in self.version.lower(): - matches.add('resolution') - # source - if video.source and self.source: - formats = [video.source] - if video.source == "Web": - formats.append("WEB") - - for fmt in formats: - if fmt.lower() in self.source.lower(): - matches.add('source') - break - - matches |= guess_matches(video, guessit(self.release_info), partial=True) - self.matches = matches - return matches + # Download links always have the srt filename with the release info. + # We combine it with the release info as guessit will return the first key match. + new_file = self.download_link.split("/")[-1] + self.release_info + self.found_matches |= guess_matches(video, guessit(new_file)) + return self.found_matches class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin): - provider_name = 'argenteam' - languages = {Language.fromalpha2(l) for l in ['es']} + provider_name = "argenteam" + languages = {Language.fromalpha2(l) for l in ["es"]} video_types = (Episode, Movie) - BASE_URL = "https://argenteam.net/" - API_URL = BASE_URL + "api/v1/" subtitle_class = ArgenteamSubtitle hearing_impaired_verifiable = False language_list = list(languages) @@ -155,77 +59,130 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin): def initialize(self): self.session = Session() - self.session.headers = {'User-Agent': os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")} + self.session.headers = { + "User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2") + } def terminate(self): self.session.close() - def search_ids(self, title, year=None, imdb_id=None, season=None, episode=None, titles=None): - """Search movie or episode id from the `title`, `season` and `episode`. - - :param imdb_id: imdb id of the given movie - :param titles: all titles of the given series or movie - :param year: release year of the given movie - :param str title: series of the episode or movie name - :param int season: season of the episode. - :param int episode: episode number. - :return: list of ids - :rtype: list - - """ - # make the search + def search_ids(self, title, **kwargs): query = title - titles = titles or [] + titles = kwargs.get("titles") or [] is_episode = False - if season and episode: + if kwargs.get("season") and kwargs.get("episode"): is_episode = True - query = '%s S%#02dE%#02d' % (title, season, episode) + query = f"{title} S{kwargs['season']:02}E{kwargs['episode']:02}" + + logger.info(f"Searching ID (episode: {is_episode}) for {query}") - logger.info(u'Searching %s ID for %r', "episode" if is_episode else "movie", query) - r = self.session.get(self.API_URL + 'search', params={'q': query}, timeout=10) + r = self.session.get(API_URL + "search", params={"q": query}, timeout=10) r.raise_for_status() + results = r.json() match_ids = [] - if results['total'] >= 1: + if results["total"] >= 1: for result in results["results"]: - if (result['type'] == "episode" and not is_episode) or (result['type'] == "movie" and is_episode): + if (result["type"] == "episode" and not is_episode) or ( + result["type"] == "movie" and is_episode + ): continue - # shortcut in case of matching imdb id - if not is_episode and imdb_id and "imdb" in result and "tt%s" % result["imdb"] == str(imdb_id): - logger.debug("Movie matched by IMDB ID %s, taking shortcut", imdb_id) - match_ids = [result['id']] + # shortcut in case of matching imdb id (don't match NoneType) + if not is_episode and f"tt{result.get('imdb', 'n/a')}" == kwargs.get( + "imdb_id" + ): + logger.debug(f"Movie matched by IMDB ID, taking shortcut") + match_ids = [result["id"]] break # advanced title check in case of multiple movie results - if results['total'] > 1: - if not is_episode and year: - if result["title"] and not (sanitize(result["title"]) in (u"%s %s" % (sanitize(name), year) - for name in titles)): + if results["total"] > 1: + if not is_episode and kwargs.get("year"): + if result["title"] and not ( + sanitize(result["title"]) + in ( + "%s %s" % (sanitize(name), kwargs.get("year")) + for name in titles + ) + ): continue - match_ids.append(result['id']) + match_ids.append(result["id"]) else: - logger.error(u'No episode ID found for %r', query) + logger.error(f"No episode ID found for {query}") if match_ids: - logger.debug(u"Found matching IDs: %s", ", ".join(str(id) for id in match_ids)) + logger.debug( + f"Found matching IDs: {', '.join(str(id) for id in match_ids)}" + ) return match_ids + def get_query_matches(self, video, **kwargs): + matches = set() + if isinstance(video, Episode) and kwargs.get("movie_kind") == "episode": + if video.series and ( + sanitize(kwargs.get("title")) + in ( + sanitize(name) for name in [video.series] + video.alternative_series + ) + ): + matches.add("series") + + if video.season and kwargs.get("season") == video.season: + matches.add("season") + + if video.episode and kwargs.get("episode") == video.episode: + matches.add("episode") + + if video.tvdb_id and kwargs.get("tvdb_id") == str(video.tvdb_id): + matches.add("tvdb_id") + + # year (year is not available for series, but we assume it matches) + matches.add("year") + + elif isinstance(video, Movie) and kwargs.get("movie_kind") == "movie": + if video.title and ( + sanitize(kwargs.get("title")) + in (sanitize(name) for name in [video.title] + video.alternative_titles) + ): + matches.add("title") + + if video.imdb_id and f"tt{kwargs.get('imdb_id')}" == str(video.imdb_id): + matches.add("imdb_id") + + if video.year and kwargs.get("year") == video.year: + matches.add("year") + else: + logger.info(f"{kwargs.get('movie_kind')} is not a valid movie_kind") + + return matches + + def combine_release_info(self, release_dict): + keys = ("source", "codec", "tags", "team") + combine = [release_dict.get(key) for key in keys if release_dict.get(key)] + if combine: + return ".".join(combine) + return "Unknown" + def query(self, title, video, titles=None): is_episode = isinstance(video, Episode) season = episode = None - url = self.API_URL + 'movie' + url = API_URL + "movie" if is_episode: season = video.season episode = video.episode - url = self.API_URL + 'episode' - argenteam_ids = self.search_ids(title, season=season, episode=episode, titles=titles) + url = API_URL + "episode" + argenteam_ids = self.search_ids( + title, season=season, episode=episode, titles=titles + ) else: - argenteam_ids = self.search_ids(title, year=video.year, imdb_id=video.imdb_id, titles=titles) + argenteam_ids = self.search_ids( + title, year=video.year, imdb_id=video.imdb_id, titles=titles + ) if not argenteam_ids: return [] @@ -234,30 +191,45 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin): subtitles = [] has_multiple_ids = len(argenteam_ids) > 1 for aid in argenteam_ids: - response = self.session.get(url, params={'id': aid}, timeout=10) + response = self.session.get(url, params={"id": aid}, timeout=10) response.raise_for_status() content = response.json() - - if content is not None: # eg https://argenteam.net/api/v1/episode?id=11534 - imdb_id = year = None - returned_title = title - if not is_episode and "info" in content: - imdb_id = content["info"].get("imdb") - year = content["info"].get("year") - returned_title = content["info"].get("title", title) - - for r in content['releases']: - for s in r['subtitles']: - movie_kind = "episode" if is_episode else "movie" - page_link = self.BASE_URL + movie_kind + "/" + str(aid) - # use https and new domain - download_link = s['uri'].replace('http://www.argenteam.net/', self.BASE_URL) - sub = ArgenteamSubtitle(language, page_link, download_link, movie_kind, returned_title, - season, episode, year, r.get('team'), r.get('tags'), - r.get('source'), r.get('codec'), content.get("tvdb"), imdb_id, - asked_for_release_group=video.release_group, - asked_for_episode=episode) - subtitles.append(sub) + if not content: + continue + + imdb_id = year = None + returned_title = title + if not is_episode and "info" in content: + imdb_id = content["info"].get("imdb") + year = content["info"].get("year") + returned_title = content["info"].get("title", title) + + for r in content["releases"]: + for s in r["subtitles"]: + movie_kind = "episode" if is_episode else "movie" + page_link = f"{BASE_URL}{movie_kind}/{aid}" + release_info = self.combine_release_info(r) + download_link = s["uri"].replace("http", "https") + + matches_ = self.get_query_matches( + video, + movie_kind=movie_kind, + season=season, + episode=episode, + title=returned_title, + year=year, + imdb_id=imdb_id, + tvdb_id=content.get("tvdb"), + ) + subtitles.append( + ArgenteamSubtitle( + language, + page_link, + download_link, + release_info, + matches_, + ) + ) if has_multiple_ids: time.sleep(self.multi_result_throttle) @@ -280,7 +252,7 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin): def download_subtitle(self, subtitle): # download as a zip - logger.info('Downloading subtitle %r', subtitle) + logger.info("Downloading subtitle %r", subtitle) r = self.session.get(subtitle.download_link, timeout=10) r.raise_for_status() diff --git a/libs/subliminal_patch/providers/subdivx.py b/libs/subliminal_patch/providers/subdivx.py index e6cc4039c..dbe3a6913 100644 --- a/libs/subliminal_patch/providers/subdivx.py +++ b/libs/subliminal_patch/providers/subdivx.py @@ -13,7 +13,7 @@ from requests import Session from subliminal import __short_version__ from subliminal.exceptions import ServiceUnavailable from subliminal.providers import ParserBeautifulSoup -from subliminal.subtitle import SUBTITLE_EXTENSIONS, fix_line_ending,guess_matches +from subliminal.subtitle import SUBTITLE_EXTENSIONS, fix_line_ending, guess_matches from subliminal.video import Episode, Movie from subliminal_patch.exceptions import APIThrottled from six.moves import range @@ -26,18 +26,20 @@ logger = logging.getLogger(__name__) class SubdivxSubtitle(Subtitle): - provider_name = 'subdivx' + provider_name = "subdivx" hash_verifiable = False def __init__(self, language, video, page_link, title, description, uploader): - super(SubdivxSubtitle, self).__init__(language, hearing_impaired=False, page_link=page_link) + super(SubdivxSubtitle, self).__init__( + language, hearing_impaired=False, page_link=page_link + ) self.video = video self.title = title self.description = description self.uploader = uploader self.release_info = self.title if self.description and self.description.strip(): - self.release_info += ' | ' + self.description + self.release_info += " | " + self.description @property def id(self): @@ -49,60 +51,33 @@ class SubdivxSubtitle(Subtitle): # episode if isinstance(video, Episode): # already matched in search query - matches.update(['title', 'series', 'season', 'episode', 'year']) + matches.update(["title", "series", "season", "episode", "year"]) # movie elif isinstance(video, Movie): # already matched in search query - matches.update(['title', 'year']) - - # release_group - if video.release_group and video.release_group.lower() in self.description: - matches.add('release_group') - - # resolution - if video.resolution and video.resolution.lower() in self.description: - matches.add('resolution') - - # source - if video.source: - formats = [video.source.lower()] - if formats[0] == "web": - formats.append("webdl") - formats.append("web-dl") - formats.append("webrip") - formats.append("web ") - for frmt in formats: - if frmt in self.description: - matches.add('source') - break - - # video_codec - if video.video_codec: - video_codecs = [video.video_codec.lower()] - if video_codecs[0] == "h.264": - video_codecs.append("h264") - video_codecs.append("x264") - elif video_codecs[0] == "h.265": - video_codecs.append("h265") - video_codecs.append("x265") - elif video_codecs[0] == "divx": - video_codecs.append("divx") - for vc in video_codecs: - if vc in self.description: - matches.add('video_codec') - break + matches.update(["title", "year"]) + + # Special string comparisons are unnecessary. Guessit can match keys + # from any string and find even more keywords. + matches |= guess_matches( + video, + guessit( + self.description, + {"type": "episode" if isinstance(video, Episode) else "movie"}, + ), + ) return matches class SubdivxSubtitlesProvider(Provider): - provider_name = 'subdivx' + provider_name = "subdivx" hash_verifiable = False - languages = {Language.fromalpha2(lang) for lang in ['es']} + languages = {Language.fromalpha2(lang) for lang in ["es"]} subtitle_class = SubdivxSubtitle - server_url = 'https://www.subdivx.com/' + server_url = "https://www.subdivx.com/" multi_result_throttle = 2 language_list = list(languages) @@ -111,36 +86,31 @@ class SubdivxSubtitlesProvider(Provider): def initialize(self): self.session = Session() - self.session.headers['User-Agent'] = 'Subliminal/{}'.format(__short_version__) + self.session.headers["User-Agent"] = f"Subliminal/{__short_version__}" def terminate(self): self.session.close() def query(self, video, languages): if isinstance(video, Episode): - query = "{} S{:02d}E{:02d}".format(video.series, video.season, video.episode) + query = f"{video.series} S{video.season:02}E{video.episode:02}" else: # Subdvix has problems searching foreign movies if the year is - # appended. For example: if we search "Memories of Murder 2003", - # Subdix won't return any results; but if we search "Memories of - # Murder", it will. That's because in Subdvix foreign titles have - # the year after the original title ("Salinui chueok (2003) aka - # Memories of Murder"). - # A proper solution would be filtering results with the year in - # _parse_subtitles_page. + # appended. A proper solution would be filtering results with the + # year in self._parse_subtitles_page. query = video.title params = { - 'q': query, # search string - 'accion': 5, # action search - 'oxdown': 1, # order by downloads descending - 'pg': 1 # page 1 + "q": query, # search string + "accion": 5, # action search + "oxdown": 1, # order by downloads descending + "pg": 1, # page 1 } - logger.debug('Searching subtitles %r', query) + logger.debug(f"Searching subtitles: {query}") subtitles = [] language = self.language_list[0] - search_link = self.server_url + 'index.php' + search_link = self.server_url + "index.php" while True: response = self.session.get(search_link, params=params, timeout=20) self._check_response(response) @@ -148,7 +118,7 @@ class SubdivxSubtitlesProvider(Provider): try: page_subtitles = self._parse_subtitles_page(video, response, language) except Exception as e: - logger.error('Error parsing subtitles list: ' + str(e)) + logger.error(f"Error parsing subtitles list: {e}") break subtitles += page_subtitles @@ -156,7 +126,7 @@ class SubdivxSubtitlesProvider(Provider): if len(page_subtitles) < 100: break # this is the last page - params['pg'] += 1 # search next page + params["pg"] += 1 # search next page time.sleep(self.multi_result_throttle) return subtitles @@ -167,14 +137,17 @@ class SubdivxSubtitlesProvider(Provider): def download_subtitle(self, subtitle): if isinstance(subtitle, SubdivxSubtitle): # download the subtitle - logger.info('Downloading subtitle %r', subtitle) + logger.info("Downloading subtitle %r", subtitle) # get download link download_link = self._get_download_link(subtitle) # download zip / rar file with the subtitle - response = self.session.get(self.server_url + download_link, headers={'Referer': subtitle.page_link}, - timeout=30) + response = self.session.get( + self.server_url + download_link, + headers={"Referer": subtitle.page_link}, + timeout=30, + ) self._check_response(response) # open the compressed archive @@ -187,9 +160,11 @@ class SubdivxSubtitlesProvider(Provider): def _parse_subtitles_page(self, video, response, language): subtitles = [] - page_soup = ParserBeautifulSoup(response.content.decode('utf-8', 'ignore'), ['lxml', 'html.parser']) - title_soups = page_soup.find_all("div", {'id': 'menu_detalle_buscador'}) - body_soups = page_soup.find_all("div", {'id': 'buscador_detalle'}) + page_soup = ParserBeautifulSoup( + response.content.decode("utf-8", "ignore"), ["lxml", "html.parser"] + ) + title_soups = page_soup.find_all("div", {"id": "menu_detalle_buscador"}) + body_soups = page_soup.find_all("div", {"id": "buscador_detalle"}) for subtitle in range(0, len(title_soups)): title_soup, body_soup = title_soups[subtitle], body_soups[subtitle] @@ -204,15 +179,17 @@ class SubdivxSubtitlesProvider(Provider): page_link = title_soup.find("a")["href"] # description - description = body_soup.find("div", {'id': 'buscador_detalle_sub'}).text + description = body_soup.find("div", {"id": "buscador_detalle_sub"}).text description = description.replace(",", " ").lower() # uploader - uploader = body_soup.find("a", {'class': 'link1'}).text + uploader = body_soup.find("a", {"class": "link1"}).text - subtitle = self.subtitle_class(language, video, page_link, title, description, uploader) + subtitle = self.subtitle_class( + language, video, page_link, title, description, uploader + ) - logger.debug('Found subtitle %r', subtitle) + logger.debug("Found subtitle %r", subtitle) subtitles.append(subtitle) return subtitles @@ -221,37 +198,39 @@ class SubdivxSubtitlesProvider(Provider): response = self.session.get(subtitle.page_link, timeout=20) self._check_response(response) try: - page_soup = ParserBeautifulSoup(response.content.decode('utf-8', 'ignore'), ['lxml', 'html.parser']) - links_soup = page_soup.find_all("a", {'class': 'detalle_link'}) + page_soup = ParserBeautifulSoup( + response.content.decode("utf-8", "ignore"), ["lxml", "html.parser"] + ) + links_soup = page_soup.find_all("a", {"class": "detalle_link"}) for link_soup in links_soup: - if link_soup['href'].startswith('bajar'): - return self.server_url + link_soup['href'] - links_soup = page_soup.find_all("a", {'class': 'link1'}) + if link_soup["href"].startswith("bajar"): + return self.server_url + link_soup["href"] + links_soup = page_soup.find_all("a", {"class": "link1"}) for link_soup in links_soup: - if "bajar.php" in link_soup['href']: - return link_soup['href'] + if "bajar.php" in link_soup["href"]: + return link_soup["href"] except Exception as e: - raise APIThrottled('Error parsing download link: ' + str(e)) + raise APIThrottled(f"Error parsing download link: {e}") - raise APIThrottled('Download link not found') + raise APIThrottled("Download link not found") @staticmethod def _check_response(response): if response.status_code != 200: - raise ServiceUnavailable('Bad status code: ' + str(response.status_code)) + raise ServiceUnavailable(f"Bad status code: {response.status_code}") @staticmethod def _get_archive(content): # open the archive archive_stream = io.BytesIO(content) if rarfile.is_rarfile(archive_stream): - logger.debug('Identified rar archive') + logger.debug("Identified rar archive") archive = rarfile.RarFile(archive_stream) elif zipfile.is_zipfile(archive_stream): - logger.debug('Identified zip archive') + logger.debug("Identified zip archive") archive = zipfile.ZipFile(archive_stream) else: - raise APIThrottled('Unsupported compressed format') + raise APIThrottled("Unsupported compressed format") return archive @@ -261,12 +240,16 @@ class SubdivxSubtitlesProvider(Provider): for name in archive.namelist(): # discard hidden files # discard non-subtitle files - if not os.path.split(name)[-1].startswith('.') and name.lower().endswith(SUBTITLE_EXTENSIONS): + if not os.path.split(name)[-1].startswith(".") and name.lower().endswith( + SUBTITLE_EXTENSIONS + ): _valid_names.append(name) # archive with only 1 subtitle if len(_valid_names) == 1: - logger.debug("returning from archive: {} (single subtitle file)".format(_valid_names[0])) + logger.debug( + f"returning from archive: {_valid_names[0]} (single subtitle file)" + ) return archive.read(_valid_names[0]) # in archives with more than 1 subtitle (season pack) we try to guess the best subtitle file @@ -275,31 +258,36 @@ class SubdivxSubtitlesProvider(Provider): _max_name = "" for name in _valid_names: _guess = guessit(name) - if 'season' not in _guess: - _guess['season'] = -1 - if 'episode' not in _guess: - _guess['episode'] = -1 + if "season" not in _guess: + _guess["season"] = -1 + if "episode" not in _guess: + _guess["episode"] = -1 if isinstance(subtitle.video, Episode): logger.debug("guessing %s" % name) - logger.debug("subtitle S{}E{} video S{}E{}".format( - _guess['season'], _guess['episode'], subtitle.video.season, subtitle.video.episode)) - - if subtitle.video.episode != _guess['episode'] or subtitle.video.season != _guess['season']: - logger.debug('subtitle does not match video, skipping') + logger.debug( + f"subtitle S{_guess['season']}E{_guess['episode']} video " + f"S{subtitle.video.season}E{subtitle.video.episode}" + ) + + if ( + subtitle.video.episode != _guess["episode"] + or subtitle.video.season != _guess["season"] + ): + logger.debug("subtitle does not match video, skipping") continue matches = set() matches |= guess_matches(subtitle.video, _guess) _score = sum((_scores.get(match, 0) for match in matches)) - logger.debug('srt matches: %s, score %d' % (matches, _score)) + logger.debug("srt matches: %s, score %d" % (matches, _score)) if _score > _max_score: _max_score = _score _max_name = name - logger.debug("new max: {} {}".format(name, _score)) + logger.debug(f"new max: {name} {_score}") if _max_score > 0: - logger.debug("returning from archive: {} scored {}".format(_max_name, _max_score)) + logger.debug(f"returning from archive: {_max_name} scored {_max_score}") return archive.read(_max_name) - raise APIThrottled('Can not find the subtitle in the compressed file') + raise APIThrottled("Can not find the subtitle in the compressed file") diff --git a/libs/subliminal_patch/providers/sucha.py b/libs/subliminal_patch/providers/sucha.py index 463e5157d..073935105 100644 --- a/libs/subliminal_patch/providers/sucha.py +++ b/libs/subliminal_patch/providers/sucha.py @@ -17,8 +17,9 @@ from subzero.language import Language logger = logging.getLogger(__name__) -server_url = "http://sapidb.caretas.club/" -page_url = "https://sucha.caretas.club/" +SERVER_URL = "http://sapidb.caretas.club/" +PAGE_URL = "https://sucha.caretas.club/" +UNDESIRED_FILES = ("[eng]", ".en.", ".eng.", ".fr.", ".pt.") class SuchaSubtitle(Subtitle): @@ -35,7 +36,7 @@ class SuchaSubtitle(Subtitle): matches, ): super(SuchaSubtitle, self).__init__( - language, hearing_impaired=False, page_link=page_url + language, hearing_impaired=False, page_link=PAGE_URL ) self.download_id = download_id self.download_type = download_type @@ -71,7 +72,6 @@ class SuchaSubtitle(Subtitle): class SuchaProvider(Provider): """Sucha Provider""" - languages = {Language.fromalpha2(l) for l in ["es"]} language_list = list(languages) video_types = (Episode, Movie) @@ -89,22 +89,21 @@ class SuchaProvider(Provider): movie_year = video.year if video.year else "0" is_episode = isinstance(video, Episode) language = self.language_list[0] + if is_episode: - q = { - "query": "{} S{:02}E{:02}".format( - video.series, video.season, video.episode - ) - } + q = {"query": f"{video.series} S{video.season:02}E{video.episode:02}"} else: q = {"query": video.title, "year": movie_year} - logger.debug("Searching subtitles: {}".format(q["query"])) - res = self.session.get( - server_url + ("episode" if is_episode else "movie"), params=q, timeout=10 + + logger.debug(f"Searching subtitles: {q}") + result = self.session.get( + SERVER_URL + ("episode" if is_episode else "movie"), params=q, timeout=10 ) - res.raise_for_status() - result = res.json() + result.raise_for_status() + + result_ = result.json() subtitles = [] - for i in result: + for i in result_: matches = set() try: if ( @@ -115,18 +114,18 @@ class SuchaProvider(Provider): except TypeError: logger.debug("No subtitles found") return [] + if is_episode: if ( q["query"].lower() in i["title"].lower() or q["query"].lower() in i["alt_title"].lower() ): - matches.add("title") - matches.add("series") - matches.add("season") - matches.add("episode") - matches.add("year") + matches_ = ("title", "series", "season", "episode", "year") + [matches.add(match) for match in matches_] + if str(i["year"]) == video.year: matches.add("year") + subtitles.append( SuchaSubtitle( language, @@ -144,40 +143,41 @@ class SuchaProvider(Provider): def _check_response(self, response): if response.status_code != 200: - raise ServiceUnavailable("Bad status code: " + str(response.status_code)) + raise ServiceUnavailable(f"Bad status code: {response.status_code}") def _get_archive(self, content): archive_stream = io.BytesIO(content) + if rarfile.is_rarfile(archive_stream): logger.debug("Identified rar archive") - archive = rarfile.RarFile(archive_stream) - elif zipfile.is_zipfile(archive_stream): + return rarfile.RarFile(archive_stream) + + if zipfile.is_zipfile(archive_stream): logger.debug("Identified zip archive") - archive = zipfile.ZipFile(archive_stream) - else: - raise APIThrottled("Unsupported compressed format") - return archive + return zipfile.ZipFile(archive_stream) + + raise APIThrottled("Unsupported compressed format") def get_file(self, archive): for name in archive.namelist(): if os.path.split(name)[-1].startswith("."): continue + if not name.lower().endswith(SUBTITLE_EXTENSIONS): continue - if ( - "[eng]" in name.lower() - or ".en." in name.lower() - or ".eng." in name.lower() - ): + + if any(undesired in name.lower() for undesired in UNDESIRED_FILES): continue - logger.debug("Returning from archive: {}".format(name)) + + logger.debug(f"Returning from archive: {name}") return archive.read(name) + raise APIThrottled("Can not find the subtitle in the compressed file") def download_subtitle(self, subtitle): logger.info("Downloading subtitle %r", subtitle) response = self.session.get( - server_url + "download", + SERVER_URL + "download", params={"id": subtitle.download_id, "type": subtitle.download_type}, timeout=10, ) diff --git a/libs/subliminal_patch/providers/tusubtitulo.py b/libs/subliminal_patch/providers/tusubtitulo.py index e764ebcde..7dd5a9ca1 100644 --- a/libs/subliminal_patch/providers/tusubtitulo.py +++ b/libs/subliminal_patch/providers/tusubtitulo.py @@ -1,75 +1,64 @@ # -*- coding: utf-8 -*- import logging -from urllib import parse + +import random import re -from bs4 import BeautifulSoup as bso +from urllib import parse + +from bs4 import BeautifulSoup as bso from requests import Session from subzero.language import Language +from guessit import guessit from subliminal import Episode from subliminal.exceptions import ServiceUnavailable -from subliminal_patch.subtitle import Subtitle -from subliminal.subtitle import fix_line_ending +from subliminal_patch.exceptions import APIThrottled from subliminal_patch.providers import Provider +from subliminal_patch.subtitle import Subtitle +from subliminal.subtitle import fix_line_ending, guess_matches + +from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST logger = logging.getLogger(__name__) -BASE = "https://www.tusubtitulo.com/series.php?/" +BASE_URL = "https://www.tusubtitulo.com" +CSS1 = "span.iconos-subtitulos" +CSS2 = "ul > li.rng.download.green > a.fas.fa-bullhorn.notifi_icon" class TuSubtituloSubtitle(Subtitle): provider_name = "tusubtitulo" hash_verifiable = False - def __init__(self, language, filename, download_link, page_link, matches): + def __init__(self, language, sub_dict, matches): super(TuSubtituloSubtitle, self).__init__( - language, hearing_impaired=False, page_link=page_link + language, hearing_impaired=False, page_link=sub_dict["download_url"] ) - self.download_link = download_link - self.page_link = page_link self.language = language - self.release_info = filename - self.filename = filename + self.sub_dict = sub_dict + self.release_info = sub_dict["metadata"] self.found_matches = matches @property def id(self): - return self.download_link + return self.sub_dict["download_url"] def get_matches(self, video): - if video.resolution and video.resolution.lower() in self.release_info.lower(): - self.found_matches.add("resolution") - - if video.source and video.source.lower() in self.release_info.lower(): - self.found_matches.add("source") - - if video.video_codec: - if video.video_codec == "H.264" and "x264" in self.release_info.lower(): - self.found_matches.add("video_codec") - elif video.video_codec == "H.265" and "x265" in self.release_info.lower(): - self.found_matches.add("video_codec") - elif video.video_codec.lower() in self.release_info.lower(): - self.found_matches.add("video_codec") - - if ( - video.release_group - and video.release_group.lower() in self.release_info.lower() - ): - self.found_matches.add("release_group") - - if video.audio_codec: - if video.audio_codec.lower().replace(" ", ".") in self.release_info.lower(): - self.found_matches.add("audio_codec") - + self.found_matches |= guess_matches( + video, + guessit( + self.release_info, + {"type": "episode"}, + ), + ) return self.found_matches class TuSubtituloProvider(Provider): """TuSubtitulo.com Provider""" - BASE = "https://www.tusubtitulo.com/series.php?/" languages = {Language.fromietf(lang) for lang in ["en", "es"]} logger.debug(languages) video_types = (Episode,) @@ -77,41 +66,29 @@ class TuSubtituloProvider(Provider): def initialize(self): self.session = Session() self.session.headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36", - "referer": "https://www.tusubtitulo.com", + "User-Agent": random.choice(AGENT_LIST), + "referer": BASE_URL, } def terminate(self): self.session.close() def index_titles(self): - r = self.session.get(BASE) + r = self.session.get(f"{BASE_URL}/series.php?/") r.raise_for_status() soup = bso(r.content, "html.parser") - titles = [] + for a in soup.find_all("a"): href_url = a.get("href") if "show" in href_url: - titles.append({"title": a.text, "url": href_url}) - return titles - - def is_season_available(self, seasons, season): - for i in seasons: - if i == season: - return True + yield {"title": a.text, "url": href_url} def title_available(self, item): try: - title_content = item[2].find_all("a")[0] - episode_number = re.search( - r".*\d+x(0+)?(\d+) - .*?", title_content.text - ).group(2) - episode_id = title_content.get("href").split("/")[4] - return { - "episode_number": episode_number, - "episode_id": episode_id, - "episode_url": title_content.get("href"), - } + title = item[2].find_all("a")[0] + episode_number = re.search(r".*\d+x(0+)?(\d+) - .*?", title.text).group(2) + episode_id = title.get("href").split("/")[4] + return {"episode_number": episode_number, "episode_id": episode_id} except IndexError: return @@ -121,86 +98,84 @@ class TuSubtituloProvider(Provider): if "Vers" in text: source = text.replace("VersiĆ³n ", "") if not source: - source = "Unknown" + return "Unknown" return source except IndexError: return + def get_episode_dicts(self, episodes, season_subs, season_number): + for i in episodes: + for t in season_subs: + if i["episode_id"] == t["episode_id"]: + yield { + "episode": i["episode_number"], + "season": season_number, + "metadata": t["metadata"], + "download_url": t["download_url"], + "language": t["language"], + } + + def scrape_episode_info(self, source_var, tables, tr): + inc = 1 + while True: + try: + content = tables[tr + inc].find_all("td") + + language = content[4].text + if "eng" in language.lower(): + language = "en" + elif "esp" in language.lower(): + language = "es" + else: + language = None + + completed = "%" not in content[5].text + download_url = ( + content[6].find_all("a")[1].get("href").split("?sub=")[-1] + ) + episode_id = download_url.split("/")[4] + + if language and completed: + yield { + "episode_id": episode_id, + "metadata": source_var, + "download_url": download_url, + "language": language, + } + inc += 1 + except IndexError: + break + def get_episodes(self, show_id, season): - logger.debug("https://www.tusubtitulo.com/show/{}/{}".format(show_id, season)) - r2 = self.session.get( - "https://www.tusubtitulo.com/show/{}/{}".format(show_id, season), - ) - r2.raise_for_status() - sopa = bso(r2.content, "lxml") + r = self.session.get(f"{BASE_URL}/show/{show_id}/{season}") + r.raise_for_status() + sopa = bso(r.content, "lxml") tables = sopa.find_all("tr") seasons = [i.text for i in tables[1].find_all("a")] - if not self.is_season_available(seasons, season): - logger.debug("Season not found") + + if not any(season == season_ for season_ in seasons): return + season_subs = [] episodes = [] for tr in range(len(tables)): data = tables[tr].find_all("td") + title = self.title_available(data) if title: episodes.append(title) + source_var = self.source_separator(data) - if source_var: - inc = 1 - while True: - try: - content = tables[tr + inc].find_all("td") - language = content[4].text - if "eng" in language.lower(): - language = "en" - elif "esp" in language.lower(): - language = "es" - else: - language = None - completed = True if not "%" in content[5].text else False - url = content[6].find_all("a")[0].get("href") - sub_id = parse.parse_qs(parse.urlparse(url).query)["id"][0] - lang_id = parse.parse_qs(parse.urlparse(url).query)["lang"][0] - version_ = parse.parse_qs(parse.urlparse(url).query)["version"][ - 0 - ] - download_url = ( - "https://www.tusubtitulo.com/updated/{}/{}/{}".format( - lang_id, sub_id, version_ - ) - ) - if language and completed: - season_subs.append( - { - "episode_id": sub_id, - "metadata": source_var, - "download_url": download_url, - "language": language, - } - ) - inc += 1 - except IndexError: - break - - final_list = [] - for i in episodes: - for t in season_subs: - if i["episode_id"] == t["episode_id"]: - final_list.append( - { - "episode_number": i["episode_number"], - "episode_url": i["episode_url"], - "metadata": t["metadata"], - "download_url": t["download_url"], - "language": t["language"], - } - ) - return final_list + if not source_var: + continue + + season_subs += list(self.scrape_episode_info(source_var, tables, tr)) + + return list(self.get_episode_dicts(episodes, season_subs, season)) def search(self, title, season, episode): - titles = self.index_titles() + titles = list(self.index_titles()) found_tv_show = None for i in titles: if title.lower() == i["title"].lower(): @@ -209,58 +184,81 @@ class TuSubtituloProvider(Provider): if not found_tv_show: logger.debug("Show not found") return + tv_show_id = found_tv_show["url"].split("/")[2].replace(" ", "") results = self.get_episodes(tv_show_id, season) episode_list = [] if results: for i in results: - if i["episode_number"] == episode: + if i["episode"] == episode: episode_list.append(i) if episode_list: return episode_list - logger.debug("Episode not found") + else: + logger.debug("No results") + + logger.debug("No results") + + def scrape_download_url(self, episode_dict): + logger.debug("Scrapping download URL") + r = self.session.get(episode_dict["download_url"]) + r.raise_for_status() + + discriminator = f".{episode_dict['season']}.{episode_dict['episode']}." + soup = bso(r.content, "lxml") + + for url, selected in zip(soup.select(CSS1), soup.select(CSS2)): + meta = ".".join( + selected.get("href").split(discriminator)[-1].split(".")[:-1] + ) + if meta in episode_dict["download_url"]: + + id_url = url.find_all("a")[0].get("href") + sub_id = parse.parse_qs(parse.urlparse(id_url).query)["id"][0] + lang_id = parse.parse_qs(parse.urlparse(id_url).query)["lang"][0] + version_ = parse.parse_qs(parse.urlparse(id_url).query)["fversion"][0] + + return f"{BASE_URL}/updated/{lang_id}/{sub_id}/{version_}" def query(self, languages, video): - query = "{} {} {}".format(video.series, video.season, video.episode) - logger.debug("Searching subtitles: {}".format(query)) + query = f"{video.series} {video.season} {video.episode}" + logger.debug(f"Searching subtitles: {query}") results = self.search(video.series, str(video.season), str(video.episode)) if results: subtitles = [] - for i in results: + for sub in results: matches = set() # self.search only returns results for the specific episode - matches.add("title") - matches.add("series") - matches.add("season") - matches.add("episode") - matches.add("year") + matches_ = ("title", "series", "season", "episode", "year") + [matches.add(match) for match in matches_] subtitles.append( TuSubtituloSubtitle( - Language.fromietf(i["language"]), - i["metadata"], - i["download_url"], - i["episode_url"], + Language.fromietf(sub["language"]), + sub, matches, ) ) return subtitles - else: - logger.debug("No subtitles found") - return [] + + logger.debug("No subtitles found") + return [] def list_subtitles(self, video, languages): return self.query(languages, video) def _check_response(self, response): if response.status_code != 200: - raise ServiceUnavailable("Bad status code: " + str(response.status_code)) + raise ServiceUnavailable(f"Bad status code: {response.status_code}") def download_subtitle(self, subtitle): logger.info("Downloading subtitle %r", subtitle) - response = self.session.get( - subtitle.download_link, headers={"Referer": subtitle.page_link}, timeout=10 - ) + download_url_ = self.scrape_download_url(subtitle.sub_dict) + + if not download_url_: + raise APIThrottled("Can't scrape download url") + + response = self.session.get(download_url_, timeout=10, allow_redirects=True) response.raise_for_status() self._check_response(response) subtitle.content = fix_line_ending(response.content)