|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
from difflib import SequenceMatcher
|
|
|
import functools
|
|
|
import logging
|
|
|
import re
|
|
|
import time
|
|
|
import urllib.parse
|
|
|
|
|
|
from bs4 import BeautifulSoup as bso
|
|
|
from guessit import guessit
|
|
|
from requests import Session
|
|
|
from subliminal.exceptions import ConfigurationError
|
|
|
from subliminal_patch.core import Episode
|
|
|
from subliminal_patch.core import Movie
|
|
|
from subliminal_patch.exceptions import APIThrottled
|
|
|
from subliminal_patch.providers import Provider
|
|
|
from subliminal_patch.providers.utils import get_archive_from_bytes
|
|
|
from subliminal_patch.providers.utils import get_subtitle_from_archive
|
|
|
from subliminal_patch.providers.utils import update_matches
|
|
|
from subliminal_patch.subtitle import Subtitle
|
|
|
from subzero.language import Language
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class Subf2mSubtitle(Subtitle):
|
|
|
provider_name = "subf2m"
|
|
|
hash_verifiable = False
|
|
|
|
|
|
def __init__(self, language, page_link, release_info, episode_number=None):
|
|
|
super().__init__(language, page_link=page_link)
|
|
|
|
|
|
self.release_info = release_info
|
|
|
self.episode_number = episode_number
|
|
|
self.episode_title = None
|
|
|
|
|
|
self._matches = set(
|
|
|
("title", "year", "imdb_id")
|
|
|
if episode_number is None
|
|
|
else ("title", "series", "year", "season", "episode", "imdb_id")
|
|
|
)
|
|
|
|
|
|
def get_matches(self, video):
|
|
|
update_matches(self._matches, video, self.release_info)
|
|
|
|
|
|
return self._matches
|
|
|
|
|
|
@property
|
|
|
def id(self):
|
|
|
return self.page_link
|
|
|
|
|
|
|
|
|
_BASE_URL = "https://subf2m.co"
|
|
|
|
|
|
# TODO: add more seasons and languages
|
|
|
|
|
|
_SEASONS = (
|
|
|
"First",
|
|
|
"Second",
|
|
|
"Third",
|
|
|
"Fourth",
|
|
|
"Fifth",
|
|
|
"Sixth",
|
|
|
"Seventh",
|
|
|
"Eighth",
|
|
|
"Ninth",
|
|
|
"Tenth",
|
|
|
"Eleventh",
|
|
|
"Twelfth",
|
|
|
"Thirdteenth",
|
|
|
"Fourthteenth",
|
|
|
"Fifteenth",
|
|
|
"Sixteenth",
|
|
|
"Seventeenth",
|
|
|
"Eightheenth",
|
|
|
"Nineteenth",
|
|
|
"Tweentieth",
|
|
|
)
|
|
|
|
|
|
_LANGUAGE_MAP = {
|
|
|
"english": "eng",
|
|
|
"farsi_persian": "per",
|
|
|
"arabic": "ara",
|
|
|
"spanish": "spa",
|
|
|
"portuguese": "por",
|
|
|
"italian": "ita",
|
|
|
"dutch": "dut",
|
|
|
"hebrew": "heb",
|
|
|
"indonesian": "ind",
|
|
|
"danish": "dan",
|
|
|
"norwegian": "nor",
|
|
|
"bengali": "ben",
|
|
|
"bulgarian": "bul",
|
|
|
"croatian": "hrv",
|
|
|
"swedish": "swe",
|
|
|
"vietnamese": "vie",
|
|
|
"czech": "cze",
|
|
|
"finnish": "fin",
|
|
|
"french": "fre",
|
|
|
"german": "ger",
|
|
|
"greek": "gre",
|
|
|
"hungarian": "hun",
|
|
|
"icelandic": "ice",
|
|
|
"japanese": "jpn",
|
|
|
"macedonian": "mac",
|
|
|
"malay": "may",
|
|
|
"polish": "pol",
|
|
|
"romanian": "rum",
|
|
|
"russian": "rus",
|
|
|
"serbian": "srp",
|
|
|
"thai": "tha",
|
|
|
"turkish": "tur",
|
|
|
}
|
|
|
|
|
|
_DEFAULT_HEADERS = {
|
|
|
"authority": "subf2m.co",
|
|
|
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
|
"accept-language": "en-US,en;q=0.9",
|
|
|
"referer": "https://subf2m.co",
|
|
|
"sec-ch-ua": '"Chromium";v="111", "Not(A:Brand";v="8"',
|
|
|
"sec-ch-ua-mobile": "?0",
|
|
|
"sec-ch-ua-platform": '"Unknown"',
|
|
|
"sec-fetch-dest": "document",
|
|
|
"sec-fetch-mode": "navigate",
|
|
|
"sec-fetch-site": "same-origin",
|
|
|
"sec-fetch-user": "?1",
|
|
|
"upgrade-insecure-requests": "1",
|
|
|
}
|
|
|
|
|
|
|
|
|
class Subf2mProvider(Provider):
|
|
|
provider_name = "subf2m"
|
|
|
|
|
|
_movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
|
|
|
_tv_show_title_regex = re.compile(
|
|
|
r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
|
|
|
)
|
|
|
_tv_show_title_alt_regex = re.compile(r"(.+)\s(\d{1,2})(?:\s|$)")
|
|
|
_supported_languages = {}
|
|
|
_supported_languages["brazillian-portuguese"] = Language("por", "BR")
|
|
|
|
|
|
for key, val in _LANGUAGE_MAP.items():
|
|
|
_supported_languages[key] = Language.fromalpha3b(val)
|
|
|
|
|
|
_supported_languages_reversed = {
|
|
|
val: key for key, val in _supported_languages.items()
|
|
|
}
|
|
|
|
|
|
languages = set(_supported_languages.values())
|
|
|
|
|
|
video_types = (Episode, Movie)
|
|
|
subtitle_class = Subf2mSubtitle
|
|
|
|
|
|
def __init__(self, user_agent, verify_ssl=True, session_factory=None):
|
|
|
super().__init__()
|
|
|
|
|
|
if not (user_agent or "").strip():
|
|
|
raise ConfigurationError("User-agent config missing")
|
|
|
|
|
|
self._user_agent = user_agent
|
|
|
self._verify_ssl = verify_ssl
|
|
|
self._session_factory = session_factory
|
|
|
|
|
|
def initialize(self):
|
|
|
if self._session_factory is not None:
|
|
|
self._session = self._session_factory()
|
|
|
else:
|
|
|
logger.debug("No session factory set. Using default requests.Session.")
|
|
|
self._session = Session()
|
|
|
|
|
|
self._session.verify = self._verify_ssl
|
|
|
self._session.headers.update(_DEFAULT_HEADERS)
|
|
|
self._session.headers.update({"user-agent": self._user_agent})
|
|
|
|
|
|
def terminate(self):
|
|
|
self._session.close()
|
|
|
|
|
|
def _safe_get_text(self, url, retry=3, default_return=""):
|
|
|
req = None
|
|
|
|
|
|
for n in range(retry):
|
|
|
req = self._session.get(url, stream=True)
|
|
|
|
|
|
if req.status_code == 403:
|
|
|
logger.debug("Access to this resource is forbidden: %s", url)
|
|
|
break
|
|
|
|
|
|
# Sometimes subf2m will return 404 or 503. This error usually disappears
|
|
|
# retrying the query
|
|
|
if req.status_code in (404, 503):
|
|
|
logger.debug("503/404 returned. Trying again [%d] in 3 seconds", n + 1)
|
|
|
time.sleep(3)
|
|
|
continue
|
|
|
else:
|
|
|
req.raise_for_status()
|
|
|
break
|
|
|
|
|
|
if req is not None:
|
|
|
return "\n".join(
|
|
|
line for line in req.iter_lines(decode_unicode=True) if line
|
|
|
)
|
|
|
|
|
|
return default_return
|
|
|
|
|
|
def _gen_results(self, query):
|
|
|
query = urllib.parse.quote(query)
|
|
|
|
|
|
url = f"{_BASE_URL}/subtitles/searchbytitle?query={query}&l="
|
|
|
|
|
|
text = self._safe_get_text(url)
|
|
|
soup = bso(text, "html.parser")
|
|
|
|
|
|
for title in soup.select("li div[class='title'] a"):
|
|
|
yield title
|
|
|
|
|
|
def _search_movie(self, title, year, return_len=3):
|
|
|
title = title.lower()
|
|
|
year = str(year)
|
|
|
|
|
|
results = []
|
|
|
for result in self._gen_results(title):
|
|
|
text = result.text.lower()
|
|
|
match = self._movie_title_regex.match(text)
|
|
|
if not match:
|
|
|
continue
|
|
|
|
|
|
match_title = match.group(1)
|
|
|
match_year = match.group(3)
|
|
|
if year == match_year:
|
|
|
results.append(
|
|
|
{
|
|
|
"href": result.get("href"),
|
|
|
"similarity": SequenceMatcher(None, title, match_title).ratio(),
|
|
|
}
|
|
|
)
|
|
|
|
|
|
if results:
|
|
|
results.sort(key=lambda x: x["similarity"], reverse=True)
|
|
|
results = [result["href"] for result in results]
|
|
|
if results:
|
|
|
results = set(results[:return_len])
|
|
|
logger.debug("Results: %s", results)
|
|
|
return results
|
|
|
|
|
|
return []
|
|
|
|
|
|
def _search_tv_show_season(self, title, season, year=None, return_len=3):
|
|
|
try:
|
|
|
season_strs = (_SEASONS[season - 1].lower(), str(season))
|
|
|
except IndexError:
|
|
|
logger.debug("Season number not supported: %s", season)
|
|
|
return None
|
|
|
|
|
|
results = []
|
|
|
for result in self._gen_results(title):
|
|
|
text = result.text.lower()
|
|
|
|
|
|
match = self._tv_show_title_regex.match(text)
|
|
|
if not match:
|
|
|
match = self._tv_show_title_alt_regex.match(text)
|
|
|
|
|
|
if not match:
|
|
|
logger.debug("Series title not matched: %s", text)
|
|
|
continue
|
|
|
|
|
|
match_title = match.group(1).strip()
|
|
|
match_season = match.group(2).strip().lower()
|
|
|
|
|
|
if match_season in season_strs or "complete" in match_season:
|
|
|
logger.debug("OK: '%s' IN %s|complete", match_season, season_strs)
|
|
|
plus = 0.1 if year and str(year) in text else 0
|
|
|
results.append(
|
|
|
{
|
|
|
"href": result.get("href"),
|
|
|
"similarity": SequenceMatcher(None, title, match_title).ratio()
|
|
|
+ plus,
|
|
|
}
|
|
|
)
|
|
|
else:
|
|
|
logger.debug("Invalid: '%s' IN %s|complete", match_season, season_strs)
|
|
|
|
|
|
if results:
|
|
|
results.sort(key=lambda x: x["similarity"], reverse=True)
|
|
|
results = [result["href"] for result in results]
|
|
|
if results:
|
|
|
results = set(results[:return_len])
|
|
|
logger.debug("Results: %s", results)
|
|
|
return results
|
|
|
|
|
|
return []
|
|
|
|
|
|
def _find_movie_subtitles(self, path, language, imdb_id):
|
|
|
soup = self._get_subtitle_page_soup(path, language)
|
|
|
imdb_matched = _match_imdb(soup, imdb_id)
|
|
|
if not imdb_matched:
|
|
|
return []
|
|
|
|
|
|
subtitles = []
|
|
|
|
|
|
for item in soup.select("li.item"):
|
|
|
subtitle = _get_subtitle_from_item(item, language)
|
|
|
if subtitle is None:
|
|
|
continue
|
|
|
|
|
|
logger.debug("Found subtitle: %s", subtitle)
|
|
|
subtitles.append(subtitle)
|
|
|
|
|
|
return subtitles
|
|
|
|
|
|
def _find_episode_subtitles(
|
|
|
self, path, season, episode, language, episode_title=None, imdb_id=None
|
|
|
):
|
|
|
soup = self._get_subtitle_page_soup(path, language)
|
|
|
imdb_matched = _match_imdb(soup, imdb_id)
|
|
|
if not imdb_matched:
|
|
|
return []
|
|
|
|
|
|
subtitles = []
|
|
|
|
|
|
for item in soup.select("li.item"):
|
|
|
valid_item = None
|
|
|
clean_text = " ".join(item.text.split())
|
|
|
|
|
|
if not clean_text:
|
|
|
continue
|
|
|
|
|
|
# First try with the special episode matches for subf2m
|
|
|
guess = _get_episode_from_release(clean_text)
|
|
|
|
|
|
if guess is None:
|
|
|
guess = _memoized_episode_guess(clean_text)
|
|
|
|
|
|
if "season" not in guess:
|
|
|
if "complete series" in clean_text.lower():
|
|
|
logger.debug("Complete series pack found: %s", clean_text)
|
|
|
guess["season"] = [season]
|
|
|
else:
|
|
|
logger.debug("Nothing guessed from release: %s", clean_text)
|
|
|
continue
|
|
|
|
|
|
if season in guess["season"] and episode in guess.get("episode", []):
|
|
|
logger.debug("Episode match found: %s - %s", guess, clean_text)
|
|
|
valid_item = item
|
|
|
|
|
|
elif season in guess["season"] and not "episode" in guess:
|
|
|
logger.debug("Season pack found: %s", clean_text)
|
|
|
valid_item = item
|
|
|
|
|
|
if valid_item is None:
|
|
|
continue
|
|
|
|
|
|
subtitle = _get_subtitle_from_item(item, language, episode)
|
|
|
|
|
|
if subtitle is None:
|
|
|
continue
|
|
|
|
|
|
subtitle.episode_title = episode_title
|
|
|
|
|
|
logger.debug("Found subtitle: %s", subtitle)
|
|
|
subtitles.append(subtitle)
|
|
|
|
|
|
return subtitles
|
|
|
|
|
|
def _get_subtitle_page_soup(self, path, language):
|
|
|
language_path = self._supported_languages_reversed[language]
|
|
|
|
|
|
text = self._safe_get_text(f"{_BASE_URL}{path}/{language_path}")
|
|
|
|
|
|
return bso(text, "html.parser")
|
|
|
|
|
|
def list_subtitles(self, video, languages):
|
|
|
is_episode = isinstance(video, Episode)
|
|
|
|
|
|
if is_episode:
|
|
|
paths = self._search_tv_show_season(video.series, video.season, video.year)
|
|
|
else:
|
|
|
paths = self._search_movie(video.title, video.year)
|
|
|
|
|
|
if not paths:
|
|
|
logger.debug("No results")
|
|
|
return []
|
|
|
|
|
|
languages = set([lang for lang in languages if lang in self.languages])
|
|
|
|
|
|
subs = []
|
|
|
for path in paths:
|
|
|
must_break = False
|
|
|
|
|
|
logger.debug("Looking for subs from %s", path)
|
|
|
|
|
|
for language in languages:
|
|
|
if is_episode:
|
|
|
subs.extend(
|
|
|
self._find_episode_subtitles(
|
|
|
path,
|
|
|
video.season,
|
|
|
video.episode,
|
|
|
language,
|
|
|
video.title,
|
|
|
video.series_imdb_id,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
subs.extend(
|
|
|
self._find_movie_subtitles(path, language, video.imdb_id)
|
|
|
)
|
|
|
|
|
|
must_break = subs != []
|
|
|
|
|
|
if must_break:
|
|
|
logger.debug("Good path found: %s. Not running over others.", path)
|
|
|
break
|
|
|
|
|
|
return subs
|
|
|
|
|
|
def download_subtitle(self, subtitle):
|
|
|
# TODO: add MustGetBlacklisted support
|
|
|
|
|
|
text = self._safe_get_text(subtitle.page_link)
|
|
|
soup = bso(text, "html.parser")
|
|
|
try:
|
|
|
download_url = _BASE_URL + str(
|
|
|
soup.select_one("a[id='downloadButton']")["href"] # type: ignore
|
|
|
)
|
|
|
except (AttributeError, KeyError, TypeError):
|
|
|
raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
|
|
|
|
|
|
downloaded = self._session.get(download_url, allow_redirects=True)
|
|
|
|
|
|
archive = get_archive_from_bytes(downloaded.content)
|
|
|
|
|
|
if archive is None:
|
|
|
raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
|
|
|
|
|
|
subtitle.content = get_subtitle_from_archive(
|
|
|
archive,
|
|
|
episode=subtitle.episode_number,
|
|
|
episode_title=subtitle.episode_title,
|
|
|
)
|
|
|
|
|
|
|
|
|
@functools.lru_cache(2048)
|
|
|
def _memoized_episode_guess(content):
|
|
|
# Use include to save time from unnecessary checks
|
|
|
return guessit(
|
|
|
content,
|
|
|
{
|
|
|
"type": "episode",
|
|
|
# Add codec keys to avoid matching x264, 5.1, etc as episode info
|
|
|
"includes": ["season", "episode", "video_codec", "audio_codec"],
|
|
|
"enforce_list": True,
|
|
|
},
|
|
|
)
|
|
|
|
|
|
|
|
|
_EPISODE_SPECIAL_RE = re.compile(
|
|
|
r"(season|s)\s*?(?P<x>\d{,2})\s?[-−]\s?(?P<y>\d{,2})", flags=re.IGNORECASE
|
|
|
)
|
|
|
|
|
|
|
|
|
def _match_imdb(soup, imdb_id):
|
|
|
try:
|
|
|
parsed_imdb_id = (
|
|
|
soup.select_one(
|
|
|
"#content > div.subtitles.byFilm > div.box.clearfix > div.top.left > div.header > h2 > a"
|
|
|
)
|
|
|
.get("href") # type: ignore
|
|
|
.split("/")[-1] # type: ignore
|
|
|
.strip()
|
|
|
)
|
|
|
except AttributeError:
|
|
|
logger.debug("Couldn't get IMDB ID")
|
|
|
parsed_imdb_id = None
|
|
|
|
|
|
if parsed_imdb_id is not None and parsed_imdb_id != imdb_id:
|
|
|
logger.debug("Wrong IMDB ID: '%s' != '%s'", parsed_imdb_id, imdb_id)
|
|
|
return False
|
|
|
|
|
|
if parsed_imdb_id is None:
|
|
|
logger.debug("Matching subtitles as IMDB ID was not parsed.")
|
|
|
else:
|
|
|
logger.debug("Good IMDB ID: '%s' == '%s'", parsed_imdb_id, imdb_id)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def _get_episode_from_release(release: str):
|
|
|
match = _EPISODE_SPECIAL_RE.search(release)
|
|
|
if match is None:
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
season, episode = [int(item) for item in match.group("x", "y")]
|
|
|
except (IndexError, ValueError):
|
|
|
return None
|
|
|
|
|
|
return {"season": [season], "episode": [episode]}
|
|
|
|
|
|
|
|
|
def _get_subtitle_from_item(item, language, episode_number=None):
|
|
|
release_info = [
|
|
|
rel.text.strip() for rel in item.find("ul", {"class": "scrolllist"})
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
text = item.find("div", {"class": "comment-col"}).find("p").text
|
|
|
release_info.append(text.replace("\n", " ").strip())
|
|
|
except AttributeError:
|
|
|
pass
|
|
|
|
|
|
release_info = "\n".join([item for item in release_info if item])
|
|
|
|
|
|
try:
|
|
|
path = item.find("a", {"class": "download icon-download"})["href"] # type: ignore
|
|
|
except (AttributeError, KeyError):
|
|
|
logger.debug("Couldn't get path: %s", item)
|
|
|
return None
|
|
|
|
|
|
return Subf2mSubtitle(language, _BASE_URL + path, release_info, episode_number)
|