Merge remote-tracking branch 'origin/development' into development

pull/1979/head v1.1.3-beta.5
morpheus65535 2 years ago
commit 70fe14562f

@ -25,9 +25,20 @@ _CLEAN_TITLE_RES = [
(r"´|`", "'"), (r"´|`", "'"),
(r" {2,}", " "), (r" {2,}", " "),
] ]
_SPANISH_RE = re.compile(r"españa|ib[eé]rico|castellano|gallego|castilla")
_SPANISH_RE = re.compile(r"españa|ib[eé]rico|castellano|gallego|castilla")
_YEAR_RE = re.compile(r"(\(\d{4}\))") _YEAR_RE = re.compile(r"(\(\d{4}\))")
_SERIES_RE = re.compile(
r"\(?\d{4}\)?|(s\d{1,2}(e\d{1,2})?|(season|temporada)\s\d{1,2}).*?$",
flags=re.IGNORECASE,
)
_EPISODE_NUM_RE = re.compile(r"[eE](?P<x>\d{1,2})")
_SEASON_NUM_RE = re.compile(
r"(s|(season|temporada)\s)(?P<x>\d{1,2})", flags=re.IGNORECASE
)
_UNSUPPORTED_RE = re.compile(
r"(\)?\d{4}\)?|[sS]\d{1,2})\s.{,3}(extras|forzado(s)?|forced)", flags=re.IGNORECASE
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -47,11 +58,13 @@ class SubdivxSubtitle(Subtitle):
self.download_url = download_url self.download_url = download_url
self.uploader = uploader self.uploader = uploader
self.release_info = str(title) self._title = str(title).strip()
self.description = str(description).strip() self._description = str(description).strip()
if self.description: self.release_info = self._title
self.release_info += " | " + self.description
if self._description:
self.release_info += " | " + self._description
@property @property
def id(self): def id(self):
@ -62,18 +75,18 @@ class SubdivxSubtitle(Subtitle):
# episode # episode
if isinstance(video, Episode): if isinstance(video, Episode):
# already matched in search query # already matched within provider
matches.update(["title", "series", "season", "episode", "year"]) matches.update(["title", "series", "season", "episode", "year"])
# movie # movie
elif isinstance(video, Movie): elif isinstance(video, Movie):
# already matched in search query # already matched within provider
matches.update(["title", "year"]) matches.update(["title", "year"])
update_matches(matches, video, self.description) update_matches(matches, video, self._description)
# Don't lowercase; otherwise it will match a lot of false positives # Don't lowercase; otherwise it will match a lot of false positives
if video.release_group and video.release_group in self.description: if video.release_group and video.release_group in self._description:
matches.add("release_group") matches.add("release_group")
return matches return matches
@ -106,11 +119,19 @@ class SubdivxSubtitlesProvider(Provider):
subtitles = [] subtitles = []
if isinstance(video, Episode): if isinstance(video, Episode):
# TODO: cache pack queries (TV SHOW S01).
# Too many redundant server calls.
for query in ( for query in (
f"{video.series} S{video.season:02}E{video.episode:02}", f"{video.series} S{video.season:02}E{video.episode:02}",
f"{video.series} S{video.season:02}", f"{video.series} S{video.season:02}",
): ):
subtitles += self._handle_multi_page_search(query, video) subtitles += self._handle_multi_page_search(query, video)
# Try only with series title
if len(subtitles) <= 5:
subtitles += self._handle_multi_page_search(video.series, video, 1)
else: else:
for query in (video.title, f"{video.title} ({video.year})"): for query in (video.title, f"{video.title} ({video.year})"):
subtitles += self._handle_multi_page_search(query, video) subtitles += self._handle_multi_page_search(query, video)
@ -120,7 +141,7 @@ class SubdivxSubtitlesProvider(Provider):
return subtitles return subtitles
def _handle_multi_page_search(self, query, video, max_loops=3): def _handle_multi_page_search(self, query, video, max_loops=2):
params = { params = {
"buscar2": query, "buscar2": query,
"accion": "5", "accion": "5",
@ -135,20 +156,25 @@ class SubdivxSubtitlesProvider(Provider):
max_loops_not_met = True max_loops_not_met = True
while max_loops_not_met: while max_loops_not_met:
loops += 1
max_loops_not_met = loops < max_loops max_loops_not_met = loops < max_loops
page_subtitles = self._get_page_subtitles(params, video) page_subtitles, last_page = self._get_page_subtitles(params, video)
logger.debug("Yielding %d subtitles", len(page_subtitles)) logger.debug("Yielding %d subtitles [loop #%d]", len(page_subtitles), loops)
yield from page_subtitles yield from page_subtitles
if len(page_subtitles) < 100: if last_page:
break # this is the last page logger.debug("Last page for '%s' query. Breaking loop", query)
break
loops += 1
params["pg"] += 1 # search next page params["pg"] += 1 # search next page
time.sleep(self.multi_result_throttle) time.sleep(self.multi_result_throttle)
if not max_loops_not_met:
logger.debug("Max loops limit exceeded (%d)", max_loops)
def _get_page_subtitles(self, params, video): def _get_page_subtitles(self, params, video):
search_link = f"{_SERVER_URL}/index.php" search_link = f"{_SERVER_URL}/index.php"
response = self.session.get( response = self.session.get(
@ -156,19 +182,19 @@ class SubdivxSubtitlesProvider(Provider):
) )
try: try:
page_subtitles = self._parse_subtitles_page(video, response) page_subtitles, last_page = self._parse_subtitles_page(video, response)
except Exception as error: except Exception as error:
logger.error(f"Error parsing subtitles list: {error}") logger.error(f"Error parsing subtitles list: {error}")
return [] return []
return page_subtitles return page_subtitles, last_page
def list_subtitles(self, video, languages): def list_subtitles(self, video, languages):
return self.query(video, languages) return self.query(video, languages)
def download_subtitle(self, subtitle): def download_subtitle(self, subtitle):
# download the subtitle # download the subtitle
logger.info("Downloading subtitle %r", subtitle) logger.debug("Downloading subtitle %r", subtitle)
# download zip / rar file with the subtitle # download zip / rar file with the subtitle
response = self.session.get( response = self.session.get(
@ -198,20 +224,19 @@ class SubdivxSubtitlesProvider(Provider):
) )
title_soups = page_soup.find_all("div", {"id": "menu_detalle_buscador"}) title_soups = page_soup.find_all("div", {"id": "menu_detalle_buscador"})
body_soups = page_soup.find_all("div", {"id": "buscador_detalle"}) body_soups = page_soup.find_all("div", {"id": "buscador_detalle"})
episode = isinstance(video, Episode)
title_checker = _check_episode if isinstance(video, Episode) else _check_movie
for subtitle in range(0, len(title_soups)): for subtitle in range(0, len(title_soups)):
title_soup, body_soup = title_soups[subtitle], body_soups[subtitle] title_soup, body_soup = title_soups[subtitle], body_soups[subtitle]
# title # title
title = _clean_title(title_soup.find("a").text) title = _clean_title(title_soup.find("a").text)
# Forced subtitles are not supported if _UNSUPPORTED_RE.search(title):
if title.lower().rstrip().endswith(("forzado", "forzados")): logger.debug("Skipping unsupported subtitles: %s", title)
logger.debug("Skipping forced subtitles: %s", title)
continue continue
# Check movie title (if the video is a movie) if not title_checker(video, title):
if not episode and not _check_movie(video, title):
continue continue
# Data # Data
@ -243,7 +268,7 @@ class SubdivxSubtitlesProvider(Provider):
logger.debug("Found subtitle %r", subtitle) logger.debug("Found subtitle %r", subtitle)
subtitles.append(subtitle) subtitles.append(subtitle)
return subtitles return subtitles, len(title_soups) < 100
def _clean_title(title): def _clean_title(title):
@ -268,6 +293,40 @@ def _get_download_url(data):
return None return None
def _check_episode(video, title):
ep_num = _EPISODE_NUM_RE.search(title)
season_num = _SEASON_NUM_RE.search(title)
if season_num is None:
logger.debug("Not a season/episode: %s", title)
return False
season_num = int(season_num.group("x"))
if ep_num is not None:
ep_num = int(ep_num.group("x"))
ep_matches = (
(video.episode == ep_num) or (ep_num is None)
) and season_num == video.season
series_title = _SERIES_RE.sub("", title).strip()
distance = abs(len(series_title) - len(video.series))
series_matched = distance < 4 and ep_matches
logger.debug(
"Series matched? %s [%s -> %s] [title distance: %d]",
series_matched,
video,
title,
distance,
)
return series_matched
def _check_movie(video, title): def _check_movie(video, title):
if str(video.year) not in title: if str(video.year) not in title:
return False return False

@ -2,10 +2,13 @@
import functools import functools
import logging import logging
import urllib.parse
import re
from bs4 import BeautifulSoup as bso from bs4 import BeautifulSoup as bso
from guessit import guessit from guessit import guessit
from requests import Session from requests import Session
from difflib import SequenceMatcher
from subliminal_patch.core import Episode from subliminal_patch.core import Episode
from subliminal_patch.core import Movie from subliminal_patch.core import Movie
from subliminal_patch.exceptions import APIThrottled from subliminal_patch.exceptions import APIThrottled
@ -82,12 +85,37 @@ _LANGUAGE_MAP = {
"dutch": "dut", "dutch": "dut",
"hebrew": "heb", "hebrew": "heb",
"indonesian": "ind", "indonesian": "ind",
"danish": "dan",
"norwegian": "nor",
"bengali": "ben",
"bulgarian": "bul",
"croatian": "hrv",
"swedish": "swe",
"vietnamese": "vie",
"czech": "cze",
"finnish": "fin",
"french": "fre",
"german": "ger",
"greek": "gre",
"hungarian": "hun",
"icelandic": "ice",
"japanese": "jpn",
"macedonian": "mac",
"malay": "may",
"polish": "pol",
"romanian": "rum",
"russian": "rus",
"serbian": "srp",
"thai": "tha",
"turkish": "tur",
} }
class Subf2mProvider(Provider): class Subf2mProvider(Provider):
provider_name = "subf2m" provider_name = "subf2m"
_movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
_tv_show_title_regex = re.compile(r"^(.+?) - (.*?) season( \((\d{4})\))?$")
_supported_languages = {} _supported_languages = {}
_supported_languages["brazillian-portuguese"] = Language("por", "BR") _supported_languages["brazillian-portuguese"] = Language("por", "BR")
@ -112,7 +140,7 @@ class Subf2mProvider(Provider):
def _gen_results(self, query): def _gen_results(self, query):
req = self._session.get( req = self._session.get(
f"{_BASE_URL}/subtitles/searchbytitle?query={query.replace(' ', '+')}&l=", f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l=",
stream=True, stream=True,
) )
text = "\n".join(line for line in req.iter_lines(decode_unicode=True) if line) text = "\n".join(line for line in req.iter_lines(decode_unicode=True) if line)
@ -123,35 +151,61 @@ class Subf2mProvider(Provider):
def _search_movie(self, title, year): def _search_movie(self, title, year):
title = title.lower() title = title.lower()
year = f"({year})" year = str(year)
found_movie = None found_movie = None
results = []
for result in self._gen_results(title): for result in self._gen_results(title):
text = result.text.lower() text = result.text.lower()
if title.lower() in text and year in text: match = self._movie_title_regex.match(text)
found_movie = result.get("href") if not match:
logger.debug("Movie found: %s", found_movie) continue
break match_title = match.group(1)
match_year = match.group(3)
if year == match_year:
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio(),
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_movie = results[0]["href"]
logger.debug("Movie found: %s", results[0])
return found_movie return found_movie
def _search_tv_show_season(self, title, season): def _search_tv_show_season(self, title, season):
try: try:
season_str = f"{_SEASONS[season - 1]} Season" season_str = _SEASONS[season - 1].lower()
except IndexError: except IndexError:
logger.debug("Season number not supported: %s", season) logger.debug("Season number not supported: %s", season)
return None return None
expected_result = f"{title} - {season_str}".lower()
found_tv_show_season = None found_tv_show_season = None
results = []
for result in self._gen_results(title): for result in self._gen_results(title):
if expected_result in result.text.lower(): text = result.text.lower()
found_tv_show_season = result.get("href") match = self._tv_show_title_regex.match(text)
logger.debug("TV Show season found: %s", found_tv_show_season) if not match:
break continue
match_title = match.group(1)
match_season = match.group(2)
if season_str == match_season:
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio(),
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_tv_show_season = results[0]["href"]
logger.debug("TV Show season found: %s", results[0])
return found_tv_show_season return found_tv_show_season

@ -28,12 +28,10 @@ def test_list_subtitles_movie_with_year_fallback(movies):
def test_handle_multi_page_search(episodes): def test_handle_multi_page_search(episodes):
with SubdivxSubtitlesProvider() as provider: with SubdivxSubtitlesProvider() as provider:
subs = list( for _ in provider._handle_multi_page_search(
provider._handle_multi_page_search(
"Game Of Thrones", episodes["got_s03e10"] "Game Of Thrones", episodes["got_s03e10"]
) ):
) pass
assert len(subs) > 100
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -52,6 +50,18 @@ def test_list_subtitles_castillian_spanish(episodes):
assert provider.list_subtitles(item, {Language.fromietf("es")}) assert provider.list_subtitles(item, {Language.fromietf("es")})
def test_list_subtitles_episode_with_title_only_fallback(episodes):
item = list(episodes.values())[0]
item.series = "The Bear"
item.name = "The Bear"
item.season = 1
item.episode = 1
with SubdivxSubtitlesProvider() as provider:
subtitles = provider.list_subtitles(item, {Language("spa", "MX")})
assert len(subtitles) > 2
def test_download_subtitle(movies): def test_download_subtitle(movies):
subtitle = SubdivxSubtitle( subtitle = SubdivxSubtitle(
Language("spa", "MX"), Language("spa", "MX"),
@ -107,7 +117,7 @@ def test_subtitle_description_not_lowercase(video):
with SubdivxSubtitlesProvider() as provider: with SubdivxSubtitlesProvider() as provider:
subtitles = provider.list_subtitles(video, {Language("spa", "MX")}) subtitles = provider.list_subtitles(video, {Language("spa", "MX")})
assert subtitles assert subtitles
assert not subtitles[0].description.islower() assert not subtitles[0]._description.islower()
def test_subtitle_matches(video): def test_subtitle_matches(video):

@ -5,20 +5,45 @@ from subliminal_patch.providers.subf2m import Subf2mSubtitle
from subzero.language import Language from subzero.language import Language
def test_search_movie(movies): @pytest.mark.parametrize(
movie = movies["dune"] "title,year,expected_url",
[
(
"Dead Man's Chest",
2006,
"/subtitles/pirates-of-the-caribbean-2-dead-mans-chest",
),
("Dune", 2021, "/subtitles/dune-2021"),
("Cure", 1997, "/subtitles/cure-kyua"),
],
)
def test_search_movie(movies, title, year, expected_url):
movie = list(movies.values())[0]
movie.title = title
movie.year = year
with Subf2mProvider() as provider: with Subf2mProvider() as provider:
result = provider._search_movie(movie.title, movie.year) result = provider._search_movie(movie.title, movie.year)
assert result == "/subtitles/dune-2021" assert result == expected_url
def test_search_tv_show_season(episodes): @pytest.mark.parametrize(
episode = episodes["breaking_bad_s01e01"] "title,season,expected_url",
[
("Breaking Bad", 1, "/subtitles/breaking-bad-first-season"),
("House Of The Dragon", 1, "/subtitles/house-of-the-dragon-first-season"),
("The Bear", 1, "/subtitles/the-bear-first-season"),
],
)
def test_search_tv_show_season(episodes, title, season, expected_url):
episode = list(episodes.values())[0]
episode.name = title
episode.series = title
episode.season = season
with Subf2mProvider() as provider: with Subf2mProvider() as provider:
result = provider._search_tv_show_season(episode.series, episode.season) result = provider._search_tv_show_season(episode.series, episode.season)
assert result == "/subtitles/breaking-bad-first-season" assert result == expected_url
@pytest.mark.parametrize("language", [Language.fromalpha2("en"), Language("por", "BR")]) @pytest.mark.parametrize("language", [Language.fromalpha2("en"), Language("por", "BR")])

Loading…
Cancel
Save