Removed closed subscene provider

pull/2495/head
morpheus65535 6 months ago
parent f914ed0cbf
commit bb4b01f3fb

@ -293,10 +293,6 @@ validators = [
Validator('napisy24.username', must_exist=True, default='', is_type_of=str, cast=str),
Validator('napisy24.password', must_exist=True, default='', is_type_of=str, cast=str),
# subscene section
Validator('subscene.username', must_exist=True, default='', is_type_of=str, cast=str),
Validator('subscene.password', must_exist=True, default='', is_type_of=str, cast=str),
# betaseries section
Validator('betaseries.token', must_exist=True, default='', is_type_of=str, cast=str),
@ -686,15 +682,6 @@ def save_settings(settings_items):
reset_providers = True
region.delete('oscom_token')
if key == 'settings-subscene-username':
if key != settings.subscene.username:
reset_providers = True
region.delete('subscene_cookies2')
elif key == 'settings-subscene-password':
if key != settings.subscene.password:
reset_providers = True
region.delete('subscene_cookies2')
if key == 'settings-titlovi-username':
if key != settings.titlovi.username:
reset_providers = True

@ -125,7 +125,7 @@ def provider_throttle_map():
PROVIDERS_FORCED_OFF = ["addic7ed", "tvsubtitles", "legendasdivx", "napiprojekt", "shooter",
"hosszupuska", "supersubtitles", "titlovi", "assrt", "subscene"]
"hosszupuska", "supersubtitles", "titlovi", "assrt"]
throttle_count = {}
@ -259,11 +259,6 @@ def get_providers_auth():
'also_foreign': False, # fixme
'verify_ssl': settings.podnapisi.verify_ssl
},
'subscene': {
'username': settings.subscene.username,
'password': settings.subscene.password,
'only_foreign': False, # fixme
},
'legendasdivx': {
'username': settings.legendasdivx.username,
'password': settings.legendasdivx.password,

@ -18,7 +18,7 @@ from app.config import get_scores, settings, get_array_from
from utilities.helper import get_target_folder, force_unicode
from app.database import get_profiles_list
from .pool import update_pools, _get_pool, _init_pool
from .pool import update_pools, _get_pool
from .utils import get_video, _get_lang_obj, _get_scores, _set_forced_providers
from .processing import process_subtitle
@ -46,21 +46,7 @@ def manual_search(path, profile_id, providers, sceneName, title, media_type):
try:
if providers:
subtitles = list_all_subtitles([video], language_set, pool)
if 'subscene' in providers:
s_pool = _init_pool("movie", profile_id, {"subscene"})
subscene_language_set = set()
for language in language_set:
if language.forced:
subscene_language_set.add(language)
if len(subscene_language_set):
s_pool.provider_configs.update({"subscene": {"only_foreign": True}})
subtitles_subscene = list_all_subtitles([video], subscene_language_set, s_pool)
s_pool.provider_configs.update({"subscene": {"only_foreign": False}})
subtitles[video] += subtitles_subscene[video]
else:
subtitles = []
logging.info("BAZARR All providers are throttled")
return 'All providers are throttled'
except Exception:

@ -97,7 +97,6 @@ def _set_forced_providers(pool, also_forced=False, forced_required=False):
pool.provider_configs.update(
{
"podnapisi": {'also_foreign': also_forced, "only_foreign": forced_required},
"subscene": {"only_foreign": forced_required},
"opensubtitles": {'also_foreign': also_forced, "only_foreign": forced_required}
}
)

@ -15,5 +15,4 @@ deathbycaptcha # unknown version, only found on gist
git+https://github.com/pannal/libfilebot#egg=libfilebot
git+https://github.com/RobinDavid/pyADS.git@28a2f6dbfb357f85b2c2f49add770b336e88840d#egg=pyads
py7zr==0.7.0 # modified to prevent importing of modules that can't be vendored
subscene-api==1.0.0 # modified specificaly for Bazarr
subliminal==2.1.0 # modified specifically for Bazarr

@ -1,92 +0,0 @@
# coding=utf-8
from __future__ import absolute_import
from babelfish import LanguageReverseConverter
from subliminal.exceptions import ConfigurationError
from subzero.language import Language
# alpha3 codes extracted from `https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes`
# Subscene language list extracted from it's upload form
from_subscene = {
'Farsi/Persian': 'fas', 'Greek': 'ell', 'Greenlandic': 'kal',
'Malay': 'msa', 'Pashto': 'pus', 'Punjabi': 'pan', 'Swahili': 'swa'
}
from_subscene_with_country = {
'Brazillian Portuguese': ('por', 'BR')
}
to_subscene_with_country = {val: key for key, val in from_subscene_with_country.items()}
to_subscene = {v: k for k, v in from_subscene.items()}
exact_languages_alpha3 = [
'ara', 'aze', 'bel', 'ben', 'bos', 'bul', 'cat', 'ces', 'dan', 'deu',
'eng', 'epo', 'est', 'eus', 'fin', 'fra', 'heb', 'hin', 'hrv', 'hun',
'hye', 'ind', 'isl', 'ita', 'jpn', 'kat', 'kor', 'kur', 'lav', 'lit',
'mal', 'mkd', 'mni', 'mon', 'mya', 'nld', 'nor', 'pol', 'por', 'ron',
'rus', 'sin', 'slk', 'slv', 'som', 'spa', 'sqi', 'srp', 'sun', 'swe',
'tam', 'tel', 'tgl', 'tha', 'tur', 'ukr', 'urd', 'vie', 'yor'
]
language_ids = {
'ara': 2, 'dan': 10, 'nld': 11, 'eng': 13, 'fas': 46, 'fin': 17,
'fra': 18, 'heb': 22, 'ind': 44, 'ita': 26, 'msa': 50, 'nor': 30,
'ron': 33, 'spa': 38, 'swe': 39, 'vie': 45, 'sqi': 1, 'hye': 73,
'aze': 55, 'eus': 74, 'bel': 68, 'ben': 54, 'bos': 60, 'bul': 5,
'mya': 61, 'cat': 49, 'hrv': 8, 'ces': 9, 'epo': 47, 'est': 16,
'kat': 62, 'deu': 19, 'ell': 21, 'kal': 57, 'hin': 51, 'hun': 23,
'isl': 25, 'jpn': 27, 'kor': 28, 'kur': 52, 'lav': 29, 'lit': 43,
'mkd': 48, 'mal': 64, 'mni': 65, 'mon': 72, 'pus': 67, 'pol': 31,
'por': 32, 'pan': 66, 'rus': 34, 'srp': 35, 'sin': 58, 'slk': 36,
'slv': 37, 'som': 70, 'tgl': 53, 'tam': 59, 'tel': 63, 'tha': 40,
'tur': 41, 'ukr': 56, 'urd': 42, 'yor': 71, 'pt-BR': 4
}
# TODO: specify codes for unspecified_languages
unspecified_languages = [
'Big 5 code', 'Bulgarian/ English',
'Chinese BG code', 'Dutch/ English', 'English/ German',
'Hungarian/ English', 'Rohingya'
]
supported_languages = {Language(l) for l in exact_languages_alpha3}
alpha3_of_code = {l.name: l.alpha3 for l in supported_languages}
supported_languages.update({Language(l) for l in to_subscene})
supported_languages.update({Language(lang, cr) for lang, cr in to_subscene_with_country})
class SubsceneConverter(LanguageReverseConverter):
codes = {l.name for l in supported_languages}
def convert(self, alpha3, country=None, script=None):
if alpha3 in exact_languages_alpha3:
return Language(alpha3).name
if alpha3 in to_subscene:
return to_subscene[alpha3]
if (alpha3, country) in to_subscene_with_country:
return to_subscene_with_country[(alpha3, country)]
raise ConfigurationError('Unsupported language for subscene: %s, %s, %s' % (alpha3, country, script))
def reverse(self, code):
if code in from_subscene_with_country:
return from_subscene_with_country[code]
if code in from_subscene:
return (from_subscene[code],)
if code in alpha3_of_code:
return (alpha3_of_code[code],)
if code in unspecified_languages:
raise NotImplementedError("currently this language is unspecified: %s" % code)
raise ConfigurationError('Unsupported language code for subscene: %s' % code)

@ -1,366 +0,0 @@
# coding=utf-8
import io
import logging
import os
import time
import traceback
from urllib import parse
import requests
import inflect
import re
import json
import html
import zipfile
import rarfile
from babelfish import language_converters
from guessit import guessit
from dogpile.cache.api import NO_VALUE
from requests.exceptions import RequestException
from subliminal import Episode, ProviderError
from subliminal.video import Episode, Movie
from subliminal.exceptions import ConfigurationError, ServiceUnavailable
from subliminal.utils import sanitize_release_group
from subliminal.cache import region
from subliminal_patch.http import RetryingCFSession
from subliminal_patch.providers import Provider, reinitialize_on_error
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subliminal_patch.subtitle import Subtitle, guess_matches
from subliminal_patch.converters.subscene import language_ids, supported_languages
from subscene_api.subscene import search, SearchTypes, Subtitle as APISubtitle, SITE_DOMAIN
from subzero.language import Language
p = inflect.engine()
language_converters.register('subscene = subliminal_patch.converters.subscene:SubsceneConverter')
logger = logging.getLogger(__name__)
class SubsceneSubtitle(Subtitle):
provider_name = 'subscene'
hearing_impaired_verifiable = True
is_pack = False
page_link = None
season = None
episode = None
releases = None
def __init__(self, language, release_info, hearing_impaired=False, page_link=None, encoding=None, mods=None,
asked_for_release_group=None, asked_for_episode=None):
super(SubsceneSubtitle, self).__init__(language, hearing_impaired=hearing_impaired, page_link=page_link,
encoding=encoding, mods=mods)
self.release_info = self.releases = release_info
self.asked_for_episode = asked_for_episode
self.asked_for_release_group = asked_for_release_group
self.season = None
self.episode = None
@classmethod
def from_api(cls, s):
return cls(Language.fromsubscene(s.language.strip()), s.title, hearing_impaired=s.hearing_impaired,
page_link=s.url)
@property
def id(self):
return self.page_link
@property
def numeric_id(self):
return self.page_link.split("/")[-1]
def get_matches(self, video):
matches = set()
if self.release_info.strip() == get_video_filename(video):
logger.debug("Using hash match as the release name is the same")
matches |= {"hash"}
# episode
if isinstance(video, Episode):
guess = guessit(self.release_info, {'type': 'episode'})
self.season = guess.get("season")
self.episode = guess.get("episode")
matches |= guess_matches(video, guess)
if "season" in matches and "episode" not in guess:
# pack
matches.add("episode")
logger.debug("%r is a pack", self)
self.is_pack = True
if "title" in guess and "year" in matches:
if video.series in guess['title']:
matches.add("series")
# movie
else:
guess = guessit(self.release_info, {'type': 'movie'})
matches |= guess_matches(video, guess)
if video.release_group and "release_group" not in matches and "release_group" in guess:
if sanitize_release_group(video.release_group) in sanitize_release_group(guess["release_group"]):
matches.add("release_group")
self.matches = matches
return matches
def get_download_link(self, session):
return APISubtitle.get_zipped_url(self.page_link, session)
def get_video_filename(video):
return os.path.splitext(os.path.basename(video.original_name))[0]
class SubsceneProvider(Provider, ProviderSubtitleArchiveMixin):
"""
This currently only searches for the filename on SubScene. It doesn't open every found subtitle page to avoid
massive hammering, thus it can't determine whether a subtitle is only-foreign or not.
"""
subtitle_class = SubsceneSubtitle
languages = supported_languages
languages.update(set(Language.rebuild(l, forced=True) for l in languages))
languages.update(set(Language.rebuild(l, hi=True) for l in languages))
video_types = (Episode, Movie)
session = None
skip_wrong_fps = False
hearing_impaired_verifiable = True
only_foreign = False
username = None
password = None
search_throttle = 8 # seconds
def __init__(self, only_foreign=False, username=None, password=None):
if not all((username, password)):
raise ConfigurationError('Username and password must be specified')
self.only_foreign = only_foreign
self.username = username
self.password = password
def initialize(self):
logger.info("Creating session")
self.session = RetryingCFSession()
prev_cookies = region.get("subscene_cookies2")
if prev_cookies != NO_VALUE:
logger.debug("Re-using old subscene cookies: %r", prev_cookies)
self.session.cookies.update(prev_cookies)
else:
logger.debug("Logging in")
self.login()
def login(self):
r = self.session.get("https://subscene.com/account/login")
if "Server Error" in r.text:
logger.error("Login unavailable; Maintenance?")
raise ServiceUnavailable("Login unavailable; Maintenance?")
match = re.search(r"<script id='modelJson' type='application/json'>\s*(.+)\s*</script>", r.text)
if match:
h = html
data = json.loads(h.unescape(match.group(1)))
login_url = parse.urljoin(data["siteUrl"], data["loginUrl"])
time.sleep(1.0)
r = self.session.post(login_url,
{
"username": self.username,
"password": self.password,
data["antiForgery"]["name"]: data["antiForgery"]["value"]
})
pep_content = re.search(r"<form method=\"post\" action=\"https://subscene\.com/\">"
r".+name=\"id_token\".+?value=\"(?P<id_token>.+?)\".*?"
r"access_token\".+?value=\"(?P<access_token>.+?)\".+?"
r"token_type.+?value=\"(?P<token_type>.+?)\".+?"
r"expires_in.+?value=\"(?P<expires_in>.+?)\".+?"
r"scope.+?value=\"(?P<scope>.+?)\".+?"
r"state.+?value=\"(?P<state>.+?)\".+?"
r"session_state.+?value=\"(?P<session_state>.+?)\"",
r.text, re.MULTILINE | re.DOTALL)
if pep_content:
r = self.session.post(SITE_DOMAIN, pep_content.groupdict())
try:
r.raise_for_status()
except Exception:
raise ProviderError("Something went wrong when trying to log in: %s", traceback.format_exc())
else:
cj = self.session.cookies.copy()
store_cks = ("scene", "idsrv", "idsrv.xsrf", "idsvr.clients", "idsvr.session", "idsvr.username")
for cn in self.session.cookies.keys():
if cn not in store_cks:
del cj[cn]
logger.debug("Storing cookies: %r", cj)
region.set("subscene_cookies2", cj)
return
raise ProviderError("Something went wrong when trying to log in #1")
def terminate(self):
logger.info("Closing session")
self.session.close()
def _create_filters(self, languages):
self.filters = dict(HearingImpaired="2")
acc_filters = self.filters.copy()
if self.only_foreign:
self.filters["ForeignOnly"] = "True"
acc_filters["ForeignOnly"] = self.filters["ForeignOnly"].lower()
logger.info("Only searching for foreign/forced subtitles")
selected_ids = []
for l in languages:
lid = language_ids.get(l.basename, language_ids.get(l.alpha3, None))
if lid:
selected_ids.append(str(lid))
acc_filters["SelectedIds"] = selected_ids
self.filters["LanguageFilter"] = ",".join(acc_filters["SelectedIds"])
last_filters = region.get("subscene_filters")
if last_filters != acc_filters:
region.set("subscene_filters", acc_filters)
logger.debug("Setting account filters to %r", acc_filters)
self.session.post("https://u.subscene.com/filter", acc_filters, allow_redirects=False)
logger.debug("Filter created: '%s'" % self.filters)
def _enable_filters(self):
self.session.cookies.update(self.filters)
logger.debug("Filters applied")
def list_subtitles(self, video, languages):
if not video.original_name:
logger.info("Skipping search because we don't know the original release name")
return []
self._create_filters(languages)
self._enable_filters()
if isinstance(video, Episode):
international_titles = list(set([video.series] + video.alternative_series[:1]))
subtitles = [s for s in self.query(video, international_titles) if s.language in languages]
if not len(subtitles):
us_titles = [x + ' (US)' for x in international_titles]
subtitles = [s for s in self.query(video, us_titles) if s.language in languages]
return subtitles
else:
titles = list(set([video.title] + video.alternative_titles[:1]))
return [s for s in self.query(video, titles) if s.language in languages]
def download_subtitle(self, subtitle):
if subtitle.pack_data:
logger.info("Using previously downloaded pack data")
if rarfile.is_rarfile(io.BytesIO(subtitle.pack_data)):
logger.debug('Identified rar archive')
archive = rarfile.RarFile(io.BytesIO(subtitle.pack_data))
elif zipfile.is_zipfile(io.BytesIO(subtitle.pack_data)):
logger.debug('Identified zip archive')
archive = zipfile.ZipFile(io.BytesIO(subtitle.pack_data))
else:
logger.error('Unsupported compressed format')
return
subtitle.pack_data = None
try:
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
return
except ProviderError:
pass
# open the archive
r = self.session.get(subtitle.get_download_link(self.session), timeout=10)
r.raise_for_status()
archive_stream = io.BytesIO(r.content)
if rarfile.is_rarfile(archive_stream):
logger.debug('Identified rar archive')
archive = rarfile.RarFile(archive_stream)
elif zipfile.is_zipfile(archive_stream):
logger.debug('Identified zip archive')
archive = zipfile.ZipFile(archive_stream)
else:
logger.error('Unsupported compressed format')
return
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
# store archive as pack_data for later caching
subtitle.pack_data = r.content
def parse_results(self, video, film):
subtitles = []
for s in film.subtitles:
try:
subtitle = SubsceneSubtitle.from_api(s)
except NotImplementedError as e:
logger.info(e)
continue
subtitle.asked_for_release_group = video.release_group
if isinstance(video, Episode):
subtitle.asked_for_episode = video.episode
if self.only_foreign:
subtitle.language = Language.rebuild(subtitle.language, forced=True)
# set subtitle language to hi if it's hearing_impaired
if subtitle.hearing_impaired:
subtitle.language = Language.rebuild(subtitle.language, hi=True)
subtitles.append(subtitle)
logger.debug('Found subtitle %r', subtitle)
return subtitles
def do_search(self, *args, **kwargs):
try:
return search(*args, **kwargs)
except requests.HTTPError:
region.delete("subscene_cookies2")
raise
@reinitialize_on_error((RequestException,), attempts=1)
def query(self, video, titles):
subtitles = []
if isinstance(video, Episode):
more_than_one = len(titles) > 1
for series in titles:
term = u"%s - %s Season" % (series, p.number_to_words("%sth" % video.season).capitalize())
logger.debug('Searching with series and season: %s', term)
film = self.do_search(term, session=self.session, release=False, throttle=self.search_throttle,
limit_to=SearchTypes.TvSerie)
if not film and video.season == 1:
logger.debug('Searching with series name: %s', series)
film = self.do_search(series, session=self.session, release=False, throttle=self.search_throttle,
limit_to=SearchTypes.TvSerie)
if film and film.subtitles:
logger.debug('Searching found: %s', len(film.subtitles))
subtitles += self.parse_results(video, film)
else:
logger.debug('No results found')
if more_than_one:
time.sleep(self.search_throttle)
else:
more_than_one = len(titles) > 1
for title in titles:
logger.debug('Searching for movie results: %r', title)
film = self.do_search(title, year=video.year, session=self.session, limit_to=None, release=False,
throttle=self.search_throttle)
if film and film.subtitles:
subtitles += self.parse_results(video, film)
if more_than_one:
time.sleep(self.search_throttle)
logger.info("%s subtitles found" % len(subtitles))
return subtitles

@ -1,410 +0,0 @@
# -*- coding: utf-8 -*-
from difflib import SequenceMatcher
import functools
import logging
import re
import time
import urllib.parse
from bs4 import BeautifulSoup as bso
import cloudscraper
from guessit import guessit
from requests import Session
from requests.exceptions import HTTPError
from subliminal.exceptions import ProviderError
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import APIThrottled
from subliminal_patch.providers import Provider
from subliminal_patch.providers.utils import get_archive_from_bytes
from subliminal_patch.providers.utils import get_subtitle_from_archive
from subliminal_patch.providers.utils import update_matches
from subliminal_patch.subtitle import Subtitle
from subzero.language import Language
logger = logging.getLogger(__name__)
class SubsceneSubtitle(Subtitle):
provider_name = "subscene_cloudscraper"
hash_verifiable = False
def __init__(self, language, page_link, release_info, episode_number=None):
super().__init__(language, page_link=page_link)
self.release_info = release_info
self.episode_number = episode_number
self.episode_title = None
self._matches = set(
("title", "year")
if episode_number is None
else ("title", "series", "year", "season", "episode")
)
def get_matches(self, video):
update_matches(self._matches, video, self.release_info)
return self._matches
@property
def id(self):
return self.page_link
_BASE_URL = "https://subscene.com"
# TODO: add more seasons and languages
_SEASONS = (
"First",
"Second",
"Third",
"Fourth",
"Fifth",
"Sixth",
"Seventh",
"Eighth",
"Ninth",
"Tenth",
"Eleventh",
"Twelfth",
"Thirdteenth",
"Fourthteenth",
"Fifteenth",
"Sixteenth",
"Seventeenth",
"Eightheenth",
"Nineteenth",
"Tweentieth",
)
_LANGUAGE_MAP = {
"english": "eng",
"farsi_persian": "per",
"arabic": "ara",
"spanish": "spa",
"portuguese": "por",
"italian": "ita",
"dutch": "dut",
"hebrew": "heb",
"indonesian": "ind",
"danish": "dan",
"norwegian": "nor",
"bengali": "ben",
"bulgarian": "bul",
"croatian": "hrv",
"swedish": "swe",
"vietnamese": "vie",
"czech": "cze",
"finnish": "fin",
"french": "fre",
"german": "ger",
"greek": "gre",
"hungarian": "hun",
"icelandic": "ice",
"japanese": "jpn",
"macedonian": "mac",
"malay": "may",
"polish": "pol",
"romanian": "rum",
"russian": "rus",
"serbian": "srp",
"thai": "tha",
"turkish": "tur",
}
class SubsceneProvider(Provider):
provider_name = "subscene_cloudscraper"
_movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
_tv_show_title_regex = re.compile(
r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
)
_supported_languages = {}
_supported_languages["brazillian-portuguese"] = Language("por", "BR")
for key, val in _LANGUAGE_MAP.items():
_supported_languages[key] = Language.fromalpha3b(val)
_supported_languages_reversed = {
val: key for key, val in _supported_languages.items()
}
languages = set(_supported_languages.values())
video_types = (Episode, Movie)
subtitle_class = SubsceneSubtitle
def initialize(self):
pass
def terminate(self):
pass
def _scraper_call(self, url, retry=7, method="GET", sleep=5, **kwargs):
last_exc = None
for n in range(retry):
# Creating an instance for every try in order to avoid dropped connections.
# This could probably be improved!
scraper = cloudscraper.create_scraper()
if method == "GET":
req = scraper.get(url, **kwargs)
elif method == "POST":
req = scraper.post(url, **kwargs)
else:
raise NotImplementedError(f"{method} not allowed")
try:
req.raise_for_status()
except HTTPError as error:
logger.debug(
"'%s' returned. Trying again [%d] in %s", error, n + 1, sleep
)
last_exc = error
time.sleep(sleep)
else:
return req
raise ProviderError("403 Retry count exceeded") from last_exc
def _gen_results(self, query):
url = (
f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l="
)
result = self._scraper_call(url, method="POST")
soup = bso(result.content, "html.parser")
for title in soup.select("li div[class='title'] a"):
yield title
def _search_movie(self, title, year):
title = title.lower()
year = str(year)
found_movie = None
results = []
for result in self._gen_results(title):
text = result.text.lower()
match = self._movie_title_regex.match(text)
if not match:
continue
match_title = match.group(1)
match_year = match.group(3)
if year == match_year:
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio(),
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_movie = results[0]["href"]
logger.debug("Movie found: %s", results[0])
return found_movie
def _search_tv_show_season(self, title, season, year=None):
try:
season_str = _SEASONS[season - 1].lower()
except IndexError:
logger.debug("Season number not supported: %s", season)
return None
found_tv_show_season = None
results = []
for result in self._gen_results(title):
text = result.text.lower()
match = self._tv_show_title_regex.match(text)
if not match:
logger.debug("Series title not matched: %s", text)
continue
else:
logger.debug("Series title matched: %s", text)
match_title = match.group(1)
match_season = match.group(2)
# Match "complete series" titles as they usually contain season packs
if season_str == match_season or "complete" in match_season:
plus = 0.1 if year and str(year) in text else 0
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio()
+ plus,
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
found_tv_show_season = results[0]["href"]
logger.debug("TV Show season found: %s", results[0])
return found_tv_show_season
def _find_movie_subtitles(self, path, language):
soup = self._get_subtitle_page_soup(path, language)
subtitles = []
for item in soup.select("tr"):
subtitle = _get_subtitle_from_item(item, language)
if subtitle is None:
continue
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
return subtitles
def _find_episode_subtitles(
self, path, season, episode, language, episode_title=None
):
soup = self._get_subtitle_page_soup(path, language)
subtitles = []
for item in soup.select("tr"):
valid_item = None
clean_text = " ".join(item.text.split())
if not clean_text:
continue
# It will return list values
guess = _memoized_episode_guess(clean_text)
if "season" not in guess:
if "complete series" in clean_text.lower():
logger.debug("Complete series pack found: %s", clean_text)
guess["season"] = [season]
else:
logger.debug("Nothing guessed from release: %s", clean_text)
continue
if season in guess["season"] and episode in guess.get("episode", []):
logger.debug("Episode match found: %s - %s", guess, clean_text)
valid_item = item
elif season in guess["season"] and not "episode" in guess:
logger.debug("Season pack found: %s", clean_text)
valid_item = item
if valid_item is None:
continue
subtitle = _get_subtitle_from_item(item, language, episode)
if subtitle is None:
continue
subtitle.episode_title = episode_title
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
return subtitles
def _get_subtitle_page_soup(self, path, language):
language_path = self._supported_languages_reversed[language]
result = self._scraper_call(f"{_BASE_URL}{path}/{language_path}")
return bso(result.content, "html.parser")
def list_subtitles(self, video, languages):
is_episode = isinstance(video, Episode)
if is_episode:
result = self._search_tv_show_season(video.series, video.season, video.year)
else:
result = self._search_movie(video.title, video.year)
if result is None:
logger.debug("No results")
return []
subtitles = []
for language in languages:
if is_episode:
subtitles.extend(
self._find_episode_subtitles(
result, video.season, video.episode, language, video.title
)
)
else:
subtitles.extend(self._find_movie_subtitles(result, language))
return subtitles
def download_subtitle(self, subtitle):
# TODO: add MustGetBlacklisted support
result = self._scraper_call(subtitle.page_link)
soup = bso(result.content, "html.parser")
try:
download_url = _BASE_URL + str(
soup.select_one("a[id='downloadButton']")["href"] # type: ignore
)
except (AttributeError, KeyError, TypeError):
raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
downloaded = self._scraper_call(download_url)
archive = get_archive_from_bytes(downloaded.content)
if archive is None:
raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
subtitle.content = get_subtitle_from_archive(
archive,
episode=subtitle.episode_number,
episode_title=subtitle.episode_title,
)
@functools.lru_cache(2048)
def _memoized_episode_guess(content):
# Use include to save time from unnecessary checks
return guessit(
content,
{
"type": "episode",
# Add codec keys to avoid matching x264, 5.1, etc as episode info
"includes": ["season", "episode", "video_codec", "audio_codec"],
"enforce_list": True,
},
)
def _get_subtitle_from_item(item, language, episode_number=None):
release_infos = []
try:
release_infos.append(item.find("td", {"class": "a6"}).text.strip())
except (AttributeError, KeyError):
pass
try:
release_infos.append(
item.find("td", {"class": "a1"}).find_all("span")[-1].text.strip()
)
except (AttributeError, KeyError):
pass
release_info = "".join(r_info for r_info in release_infos if r_info)
try:
path = item.find("td", {"class": "a1"}).find("a")["href"]
except (AttributeError, KeyError):
logger.debug("Couldn't get path: %s", item)
return None
return SubsceneSubtitle(language, _BASE_URL + path, release_info, episode_number)

@ -1,299 +0,0 @@
# -*- coding: utf-8 -*-
# vim: fenc=utf-8 ts=4 et sw=4 sts=4
# This file is part of Subscene-API.
#
# Subscene-API is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Subscene-API is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Python wrapper for Subscene subtitle database.
since Subscene doesn't provide an official API, I wrote
this script that does the job by parsing the website"s pages.
"""
# imports
import re
import enum
import sys
import requests
import time
import logging
is_PY2 = sys.version_info[0] < 3
if is_PY2:
from contextlib2 import suppress
from urllib2 import Request, urlopen
else:
from contextlib import suppress
from urllib.request import Request, urlopen
from dogpile.cache.api import NO_VALUE
from subliminal.cache import region
from bs4 import BeautifulSoup, NavigableString
logger = logging.getLogger(__name__)
# constants
HEADERS = {
}
SITE_DOMAIN = "https://subscene.com"
DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWeb"\
"Kit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36"
ENDPOINT_RE = re.compile(r'(?uis)<form.+?action="/subtitles/(.+)">.*?<input type="text"')
class NewEndpoint(Exception):
pass
# utils
def soup_for(url, data=None, session=None, user_agent=DEFAULT_USER_AGENT):
url = re.sub(r"\s", "+", url)
if not session:
r = Request(url, data=None, headers=dict(HEADERS, **{"User-Agent": user_agent}))
html = urlopen(r).read().decode("utf-8")
else:
ret = session.post(url, data=data)
ret.raise_for_status()
html = ret.text
return BeautifulSoup(html, "html.parser")
class AttrDict(object):
def __init__(self, *attrs):
self._attrs = attrs
for attr in attrs:
setattr(self, attr, "")
def to_dict(self):
return {k: getattr(self, k) for k in self._attrs}
# models
@enum.unique
class SearchTypes(enum.Enum):
Exact = 1
TvSerie = 2
Popular = 3
Close = 4
SectionsParts = {
SearchTypes.Exact: "Exact",
SearchTypes.TvSerie: "TV-Series",
SearchTypes.Popular: "Popular",
SearchTypes.Close: "Close"
}
class Subtitle(object):
def __init__(self, title, url, language, owner_username, owner_url,
description, hearing_impaired):
self.title = title
self.url = url
self.language = language
self.owner_username = owner_username
self.owner_url = owner_url
self.description = description
self.hearing_impaired = hearing_impaired
self._zipped_url = None
def __str__(self):
return self.title
@classmethod
def from_rows(cls, rows):
subtitles = []
for row in rows:
if row.td.a is not None and row.td.get("class", ["lazy"])[0] != "empty":
subtitles.append(cls.from_row(row))
return subtitles
@classmethod
def from_row(cls, row):
attrs = AttrDict("title", "url", "language", "owner_username",
"owner_url", "description", "hearing_impaired")
with suppress(Exception):
attrs.title = row.find("td", "a1").a.find_all("span")[1].text \
.strip()
with suppress(Exception):
attrs.url = SITE_DOMAIN + row.find("td", "a1").a.get("href")
with suppress(Exception):
attrs.language = row.find("td", "a1").a.find_all("span")[0].text \
.strip()
with suppress(Exception):
attrs.owner_username = row.find("td", "a5").a.text.strip()
with suppress(Exception):
attrs.owner_page = SITE_DOMAIN + row.find("td", "a5").a \
.get("href").strip()
with suppress(Exception):
attrs.description = row.find("td", "a6").div.text.strip()
with suppress(Exception):
attrs.hearing_impaired = bool(row.find("td", "a41"))
return cls(**attrs.to_dict())
@classmethod
def get_zipped_url(cls, url, session=None):
soup = soup_for(url, session=session)
return SITE_DOMAIN + soup.find("div", "download").a.get("href")
@property
def zipped_url(self):
if self._zipped_url:
return self._zipped_url
self._zipped_url = Subtitle.get_zipped_url(self.url)
return self._zipped_url
class Film(object):
def __init__(self, title, year=None, imdb=None, cover=None,
subtitles=None):
self.title = title
self.year = year
self.imdb = imdb
self.cover = cover
self.subtitles = subtitles
def __str__(self):
return self.title
@classmethod
def from_url(cls, url, session=None):
soup = soup_for(url, session=session)
content = soup.find("div", "subtitles")
header = content.find("div", "box clearfix")
cover = None
try:
cover = header.find("div", "poster").img.get("src")
except AttributeError:
pass
title = header.find("div", "header").h2.text[:-12].strip()
imdb = header.find("div", "header").h2.find("a", "imdb").get("href")
year = header.find("div", "header").ul.li.text
year = int(re.findall(r"[0-9]+", year)[0])
rows = content.find("table").tbody.find_all("tr")
subtitles = Subtitle.from_rows(rows)
return cls(title, year, imdb, cover, subtitles)
# functions
def section_exists(soup, section):
tag_part = SectionsParts[section]
try:
headers = soup.find("div", "search-result").find_all("h2")
except AttributeError:
return False
for header in headers:
if tag_part in header.text:
return True
return False
def get_first_film(soup, section, year=None, session=None):
tag_part = SectionsParts[section]
tag = None
headers = soup.find("div", "search-result").find_all("h2")
for header in headers:
if tag_part in header.text:
tag = header
break
if not tag:
return
url = None
url = SITE_DOMAIN + tag.findNext("ul").find("li").div.a.get("href")
for t in tag.findNext("ul").findAll("li"):
if isinstance(t, NavigableString) or not t.div:
continue
if str(year) in t.div.a.string:
url = SITE_DOMAIN + t.div.a.get("href")
break
return Film.from_url(url, session=session)
def find_endpoint(session, content=None):
endpoint = region.get("subscene_endpoint2")
if endpoint is NO_VALUE:
if not content:
content = session.get(SITE_DOMAIN).text
m = ENDPOINT_RE.search(content)
if m:
endpoint = m.group(1).strip()
logger.debug("Switching main endpoint to %s", endpoint)
region.set("subscene_endpoint2", endpoint)
return endpoint
def search(term, release=True, session=None, year=None, limit_to=SearchTypes.Exact, throttle=0):
# note to subscene: if you actually start to randomize the endpoint, we'll have to query your server even more
if release:
endpoint = "release"
else:
endpoint = find_endpoint(session)
time.sleep(throttle)
if not endpoint:
logger.error("Couldn't find endpoint, exiting")
return
soup = soup_for("%s/subtitles/%s" % (SITE_DOMAIN, endpoint), data={"query": term},
session=session)
if soup:
if "Subtitle search by" in str(soup):
rows = soup.find("table").tbody.find_all("tr")
subtitles = Subtitle.from_rows(rows)
return Film(term, subtitles=subtitles)
for junk, search_type in SearchTypes.__members__.items():
if section_exists(soup, search_type):
return get_first_film(soup, search_type, year=year, session=session)
if limit_to == search_type:
return
Loading…
Cancel
Save