You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/subliminal_patch/providers/subscene_cloudscraper.py

411 lines
12 KiB

# -*- coding: utf-8 -*-
from difflib import SequenceMatcher
import functools
import logging
import re
import time
import urllib.parse
from bs4 import BeautifulSoup as bso
import cloudscraper
from guessit import guessit
from requests import Session
from requests.exceptions import HTTPError
from subliminal.exceptions import ProviderError
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import APIThrottled
from subliminal_patch.providers import Provider
from subliminal_patch.providers.utils import get_archive_from_bytes
from subliminal_patch.providers.utils import get_subtitle_from_archive
from subliminal_patch.providers.utils import update_matches
from subliminal_patch.subtitle import Subtitle
from subzero.language import Language
logger = logging.getLogger(__name__)
class SubsceneSubtitle(Subtitle):
provider_name = "subscene_cloudscraper"
hash_verifiable = False
def __init__(self, language, page_link, release_info, episode_number=None):
super().__init__(language, page_link=page_link)
self.release_info = release_info
self.episode_number = episode_number
self.episode_title = None
self._matches = set(
("title", "year")
if episode_number is None
else ("title", "series", "year", "season", "episode")
)
def get_matches(self, video):
update_matches(self._matches, video, self.release_info)
return self._matches
@property
def id(self):
return self.page_link
_BASE_URL = "https://subscene.com"
# TODO: add more seasons and languages
_SEASONS = (
"First",
"Second",
"Third",
"Fourth",
"Fifth",
"Sixth",
"Seventh",
"Eighth",
"Ninth",
"Tenth",
"Eleventh",
"Twelfth",
"Thirdteenth",
"Fourthteenth",
"Fifteenth",
"Sixteenth",
"Seventeenth",
"Eightheenth",
"Nineteenth",
"Tweentieth",
)
_LANGUAGE_MAP = {
"english": "eng",
"farsi_persian": "per",
"arabic": "ara",
"spanish": "spa",
"portuguese": "por",
"italian": "ita",
"dutch": "dut",
"hebrew": "heb",
"indonesian": "ind",
"danish": "dan",
"norwegian": "nor",
"bengali": "ben",
"bulgarian": "bul",
"croatian": "hrv",
"swedish": "swe",
"vietnamese": "vie",
"czech": "cze",
"finnish": "fin",
"french": "fre",
"german": "ger",
"greek": "gre",
"hungarian": "hun",
"icelandic": "ice",
"japanese": "jpn",
"macedonian": "mac",
"malay": "may",
"polish": "pol",
"romanian": "rum",
"russian": "rus",
"serbian": "srp",
"thai": "tha",
"turkish": "tur",
}
class SubsceneProvider(Provider):
provider_name = "subscene_cloudscraper"
_movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
_tv_show_title_regex = re.compile(
r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
)
_supported_languages = {}
_supported_languages["brazillian-portuguese"] = Language("por", "BR")
for key, val in _LANGUAGE_MAP.items():
_supported_languages[key] = Language.fromalpha3b(val)
_supported_languages_reversed = {
val: key for key, val in _supported_languages.items()
}
languages = set(_supported_languages.values())
video_types = (Episode, Movie)
subtitle_class = SubsceneSubtitle
def initialize(self):
pass
def terminate(self):
pass
def _scraper_call(self, url, retry=7, method="GET", sleep=5, **kwargs):
last_exc = None
for n in range(retry):
# Creating an instance for every try in order to avoid dropped connections.
# This could probably be improved!
scraper = cloudscraper.create_scraper()
if method == "GET":
req = scraper.get(url, **kwargs)
elif method == "POST":
req = scraper.post(url, **kwargs)
else:
raise NotImplementedError(f"{method} not allowed")
try:
req.raise_for_status()
except HTTPError as error:
logger.debug(
"'%s' returned. Trying again [%d] in %s", error, n + 1, sleep
)
last_exc = error
time.sleep(sleep)
else:
return req
raise ProviderError("403 Retry count exceeded") from last_exc
def _gen_results(self, query):
url = (
f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l="
)
result = self._scraper_call(url, method="POST")
soup = bso(result.content, "html.parser")
for title in soup.select("li div[class='title'] a"):
yield title
def _search_movie(self, title, year):
title = title.lower()
year = str(year)
found_movie = None
results = []
for result in self._gen_results(title):
text = result.text.lower()
match = self._movie_title_regex.match(text)
if not match:
continue
match_title = match.group(1)
match_year = match.group(3)
if year == match_year:
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio(),
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_movie = results[0]["href"]
logger.debug("Movie found: %s", results[0])
return found_movie
def _search_tv_show_season(self, title, season, year=None):
try:
season_str = _SEASONS[season - 1].lower()
except IndexError:
logger.debug("Season number not supported: %s", season)
return None
found_tv_show_season = None
results = []
for result in self._gen_results(title):
text = result.text.lower()
match = self._tv_show_title_regex.match(text)
if not match:
logger.debug("Series title not matched: %s", text)
continue
else:
logger.debug("Series title matched: %s", text)
match_title = match.group(1)
match_season = match.group(2)
# Match "complete series" titles as they usually contain season packs
if season_str == match_season or "complete" in match_season:
plus = 0.1 if year and str(year) in text else 0
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio()
+ plus,
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_tv_show_season = results[0]["href"]
logger.debug("TV Show season found: %s", results[0])
return found_tv_show_season
def _find_movie_subtitles(self, path, language):
soup = self._get_subtitle_page_soup(path, language)
subtitles = []
for item in soup.select("tr"):
subtitle = _get_subtitle_from_item(item, language)
if subtitle is None:
continue
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
return subtitles
def _find_episode_subtitles(
self, path, season, episode, language, episode_title=None
):
soup = self._get_subtitle_page_soup(path, language)
subtitles = []
for item in soup.select("tr"):
valid_item = None
clean_text = " ".join(item.text.split())
if not clean_text:
continue
# It will return list values
guess = _memoized_episode_guess(clean_text)
if "season" not in guess:
if "complete series" in clean_text.lower():
logger.debug("Complete series pack found: %s", clean_text)
guess["season"] = [season]
else:
logger.debug("Nothing guessed from release: %s", clean_text)
continue
if season in guess["season"] and episode in guess.get("episode", []):
logger.debug("Episode match found: %s - %s", guess, clean_text)
valid_item = item
elif season in guess["season"] and not "episode" in guess:
logger.debug("Season pack found: %s", clean_text)
valid_item = item
if valid_item is None:
continue
subtitle = _get_subtitle_from_item(item, language, episode)
if subtitle is None:
continue
subtitle.episode_title = episode_title
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
return subtitles
def _get_subtitle_page_soup(self, path, language):
language_path = self._supported_languages_reversed[language]
result = self._scraper_call(f"{_BASE_URL}{path}/{language_path}")
return bso(result.content, "html.parser")
def list_subtitles(self, video, languages):
is_episode = isinstance(video, Episode)
if is_episode:
result = self._search_tv_show_season(video.series, video.season, video.year)
else:
result = self._search_movie(video.title, video.year)
if result is None:
logger.debug("No results")
return []
subtitles = []
for language in languages:
if is_episode:
subtitles.extend(
self._find_episode_subtitles(
result, video.season, video.episode, language, video.title
)
)
else:
subtitles.extend(self._find_movie_subtitles(result, language))
return subtitles
def download_subtitle(self, subtitle):
# TODO: add MustGetBlacklisted support
result = self._scraper_call(subtitle.page_link)
soup = bso(result.content, "html.parser")
try:
download_url = _BASE_URL + str(
soup.select_one("a[id='downloadButton']")["href"] # type: ignore
)
except (AttributeError, KeyError, TypeError):
raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
downloaded = self._scraper_call(download_url)
archive = get_archive_from_bytes(downloaded.content)
if archive is None:
raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
subtitle.content = get_subtitle_from_archive(
archive,
episode=subtitle.episode_number,
episode_title=subtitle.episode_title,
)
@functools.lru_cache(2048)
def _memoized_episode_guess(content):
# Use include to save time from unnecessary checks
return guessit(
content,
{
"type": "episode",
# Add codec keys to avoid matching x264, 5.1, etc as episode info
"includes": ["season", "episode", "video_codec", "audio_codec"],
"enforce_list": True,
},
)
def _get_subtitle_from_item(item, language, episode_number=None):
release_infos = []
try:
release_infos.append(item.find("td", {"class": "a6"}).text.strip())
except (AttributeError, KeyError):
pass
try:
release_infos.append(
item.find("td", {"class": "a1"}).find_all("span")[-1].text.strip()
)
except (AttributeError, KeyError):
pass
release_info = "".join(r_info for r_info in release_infos if r_info)
try:
path = item.find("td", {"class": "a1"}).find("a")["href"]
except (AttributeError, KeyError):
logger.debug("Couldn't get path: %s", item)
return None
return SubsceneSubtitle(language, _BASE_URL + path, release_info, episode_number)