diff --git a/libs/subliminal_patch/providers/tusubtitulo.py b/libs/subliminal_patch/providers/tusubtitulo.py index e764ebcde..7dd5a9ca1 100644 --- a/libs/subliminal_patch/providers/tusubtitulo.py +++ b/libs/subliminal_patch/providers/tusubtitulo.py @@ -1,75 +1,64 @@ # -*- coding: utf-8 -*- import logging -from urllib import parse + +import random import re -from bs4 import BeautifulSoup as bso +from urllib import parse + +from bs4 import BeautifulSoup as bso from requests import Session from subzero.language import Language +from guessit import guessit from subliminal import Episode from subliminal.exceptions import ServiceUnavailable -from subliminal_patch.subtitle import Subtitle -from subliminal.subtitle import fix_line_ending +from subliminal_patch.exceptions import APIThrottled from subliminal_patch.providers import Provider +from subliminal_patch.subtitle import Subtitle +from subliminal.subtitle import fix_line_ending, guess_matches + +from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST logger = logging.getLogger(__name__) -BASE = "https://www.tusubtitulo.com/series.php?/" +BASE_URL = "https://www.tusubtitulo.com" +CSS1 = "span.iconos-subtitulos" +CSS2 = "ul > li.rng.download.green > a.fas.fa-bullhorn.notifi_icon" class TuSubtituloSubtitle(Subtitle): provider_name = "tusubtitulo" hash_verifiable = False - def __init__(self, language, filename, download_link, page_link, matches): + def __init__(self, language, sub_dict, matches): super(TuSubtituloSubtitle, self).__init__( - language, hearing_impaired=False, page_link=page_link + language, hearing_impaired=False, page_link=sub_dict["download_url"] ) - self.download_link = download_link - self.page_link = page_link self.language = language - self.release_info = filename - self.filename = filename + self.sub_dict = sub_dict + self.release_info = sub_dict["metadata"] self.found_matches = matches @property def id(self): - return self.download_link + return self.sub_dict["download_url"] def get_matches(self, video): - if video.resolution and video.resolution.lower() in self.release_info.lower(): - self.found_matches.add("resolution") - - if video.source and video.source.lower() in self.release_info.lower(): - self.found_matches.add("source") - - if video.video_codec: - if video.video_codec == "H.264" and "x264" in self.release_info.lower(): - self.found_matches.add("video_codec") - elif video.video_codec == "H.265" and "x265" in self.release_info.lower(): - self.found_matches.add("video_codec") - elif video.video_codec.lower() in self.release_info.lower(): - self.found_matches.add("video_codec") - - if ( - video.release_group - and video.release_group.lower() in self.release_info.lower() - ): - self.found_matches.add("release_group") - - if video.audio_codec: - if video.audio_codec.lower().replace(" ", ".") in self.release_info.lower(): - self.found_matches.add("audio_codec") - + self.found_matches |= guess_matches( + video, + guessit( + self.release_info, + {"type": "episode"}, + ), + ) return self.found_matches class TuSubtituloProvider(Provider): """TuSubtitulo.com Provider""" - BASE = "https://www.tusubtitulo.com/series.php?/" languages = {Language.fromietf(lang) for lang in ["en", "es"]} logger.debug(languages) video_types = (Episode,) @@ -77,41 +66,29 @@ class TuSubtituloProvider(Provider): def initialize(self): self.session = Session() self.session.headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36", - "referer": "https://www.tusubtitulo.com", + "User-Agent": random.choice(AGENT_LIST), + "referer": BASE_URL, } def terminate(self): self.session.close() def index_titles(self): - r = self.session.get(BASE) + r = self.session.get(f"{BASE_URL}/series.php?/") r.raise_for_status() soup = bso(r.content, "html.parser") - titles = [] + for a in soup.find_all("a"): href_url = a.get("href") if "show" in href_url: - titles.append({"title": a.text, "url": href_url}) - return titles - - def is_season_available(self, seasons, season): - for i in seasons: - if i == season: - return True + yield {"title": a.text, "url": href_url} def title_available(self, item): try: - title_content = item[2].find_all("a")[0] - episode_number = re.search( - r".*\d+x(0+)?(\d+) - .*?", title_content.text - ).group(2) - episode_id = title_content.get("href").split("/")[4] - return { - "episode_number": episode_number, - "episode_id": episode_id, - "episode_url": title_content.get("href"), - } + title = item[2].find_all("a")[0] + episode_number = re.search(r".*\d+x(0+)?(\d+) - .*?", title.text).group(2) + episode_id = title.get("href").split("/")[4] + return {"episode_number": episode_number, "episode_id": episode_id} except IndexError: return @@ -121,86 +98,84 @@ class TuSubtituloProvider(Provider): if "Vers" in text: source = text.replace("VersiĆ³n ", "") if not source: - source = "Unknown" + return "Unknown" return source except IndexError: return + def get_episode_dicts(self, episodes, season_subs, season_number): + for i in episodes: + for t in season_subs: + if i["episode_id"] == t["episode_id"]: + yield { + "episode": i["episode_number"], + "season": season_number, + "metadata": t["metadata"], + "download_url": t["download_url"], + "language": t["language"], + } + + def scrape_episode_info(self, source_var, tables, tr): + inc = 1 + while True: + try: + content = tables[tr + inc].find_all("td") + + language = content[4].text + if "eng" in language.lower(): + language = "en" + elif "esp" in language.lower(): + language = "es" + else: + language = None + + completed = "%" not in content[5].text + download_url = ( + content[6].find_all("a")[1].get("href").split("?sub=")[-1] + ) + episode_id = download_url.split("/")[4] + + if language and completed: + yield { + "episode_id": episode_id, + "metadata": source_var, + "download_url": download_url, + "language": language, + } + inc += 1 + except IndexError: + break + def get_episodes(self, show_id, season): - logger.debug("https://www.tusubtitulo.com/show/{}/{}".format(show_id, season)) - r2 = self.session.get( - "https://www.tusubtitulo.com/show/{}/{}".format(show_id, season), - ) - r2.raise_for_status() - sopa = bso(r2.content, "lxml") + r = self.session.get(f"{BASE_URL}/show/{show_id}/{season}") + r.raise_for_status() + sopa = bso(r.content, "lxml") tables = sopa.find_all("tr") seasons = [i.text for i in tables[1].find_all("a")] - if not self.is_season_available(seasons, season): - logger.debug("Season not found") + + if not any(season == season_ for season_ in seasons): return + season_subs = [] episodes = [] for tr in range(len(tables)): data = tables[tr].find_all("td") + title = self.title_available(data) if title: episodes.append(title) + source_var = self.source_separator(data) - if source_var: - inc = 1 - while True: - try: - content = tables[tr + inc].find_all("td") - language = content[4].text - if "eng" in language.lower(): - language = "en" - elif "esp" in language.lower(): - language = "es" - else: - language = None - completed = True if not "%" in content[5].text else False - url = content[6].find_all("a")[0].get("href") - sub_id = parse.parse_qs(parse.urlparse(url).query)["id"][0] - lang_id = parse.parse_qs(parse.urlparse(url).query)["lang"][0] - version_ = parse.parse_qs(parse.urlparse(url).query)["version"][ - 0 - ] - download_url = ( - "https://www.tusubtitulo.com/updated/{}/{}/{}".format( - lang_id, sub_id, version_ - ) - ) - if language and completed: - season_subs.append( - { - "episode_id": sub_id, - "metadata": source_var, - "download_url": download_url, - "language": language, - } - ) - inc += 1 - except IndexError: - break - - final_list = [] - for i in episodes: - for t in season_subs: - if i["episode_id"] == t["episode_id"]: - final_list.append( - { - "episode_number": i["episode_number"], - "episode_url": i["episode_url"], - "metadata": t["metadata"], - "download_url": t["download_url"], - "language": t["language"], - } - ) - return final_list + if not source_var: + continue + + season_subs += list(self.scrape_episode_info(source_var, tables, tr)) + + return list(self.get_episode_dicts(episodes, season_subs, season)) def search(self, title, season, episode): - titles = self.index_titles() + titles = list(self.index_titles()) found_tv_show = None for i in titles: if title.lower() == i["title"].lower(): @@ -209,58 +184,81 @@ class TuSubtituloProvider(Provider): if not found_tv_show: logger.debug("Show not found") return + tv_show_id = found_tv_show["url"].split("/")[2].replace(" ", "") results = self.get_episodes(tv_show_id, season) episode_list = [] if results: for i in results: - if i["episode_number"] == episode: + if i["episode"] == episode: episode_list.append(i) if episode_list: return episode_list - logger.debug("Episode not found") + else: + logger.debug("No results") + + logger.debug("No results") + + def scrape_download_url(self, episode_dict): + logger.debug("Scrapping download URL") + r = self.session.get(episode_dict["download_url"]) + r.raise_for_status() + + discriminator = f".{episode_dict['season']}.{episode_dict['episode']}." + soup = bso(r.content, "lxml") + + for url, selected in zip(soup.select(CSS1), soup.select(CSS2)): + meta = ".".join( + selected.get("href").split(discriminator)[-1].split(".")[:-1] + ) + if meta in episode_dict["download_url"]: + + id_url = url.find_all("a")[0].get("href") + sub_id = parse.parse_qs(parse.urlparse(id_url).query)["id"][0] + lang_id = parse.parse_qs(parse.urlparse(id_url).query)["lang"][0] + version_ = parse.parse_qs(parse.urlparse(id_url).query)["fversion"][0] + + return f"{BASE_URL}/updated/{lang_id}/{sub_id}/{version_}" def query(self, languages, video): - query = "{} {} {}".format(video.series, video.season, video.episode) - logger.debug("Searching subtitles: {}".format(query)) + query = f"{video.series} {video.season} {video.episode}" + logger.debug(f"Searching subtitles: {query}") results = self.search(video.series, str(video.season), str(video.episode)) if results: subtitles = [] - for i in results: + for sub in results: matches = set() # self.search only returns results for the specific episode - matches.add("title") - matches.add("series") - matches.add("season") - matches.add("episode") - matches.add("year") + matches_ = ("title", "series", "season", "episode", "year") + [matches.add(match) for match in matches_] subtitles.append( TuSubtituloSubtitle( - Language.fromietf(i["language"]), - i["metadata"], - i["download_url"], - i["episode_url"], + Language.fromietf(sub["language"]), + sub, matches, ) ) return subtitles - else: - logger.debug("No subtitles found") - return [] + + logger.debug("No subtitles found") + return [] def list_subtitles(self, video, languages): return self.query(languages, video) def _check_response(self, response): if response.status_code != 200: - raise ServiceUnavailable("Bad status code: " + str(response.status_code)) + raise ServiceUnavailable(f"Bad status code: {response.status_code}") def download_subtitle(self, subtitle): logger.info("Downloading subtitle %r", subtitle) - response = self.session.get( - subtitle.download_link, headers={"Referer": subtitle.page_link}, timeout=10 - ) + download_url_ = self.scrape_download_url(subtitle.sub_dict) + + if not download_url_: + raise APIThrottled("Can't scrape download url") + + response = self.session.get(download_url_, timeout=10, allow_redirects=True) response.raise_for_status() self._check_response(response) subtitle.content = fix_line_ending(response.content)