You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/subliminal_patch/providers/addic7ed.py

378 lines
15 KiB

# coding=utf-8
from __future__ import absolute_import
import logging
import re
import datetime
import subliminal
import time
from random import randint
from requests import Session
from subliminal.cache import region
from subliminal.exceptions import DownloadLimitExceeded, AuthenticationError
from subliminal.providers.addic7ed import Addic7edProvider as _Addic7edProvider, \
Addic7edSubtitle as _Addic7edSubtitle, ParserBeautifulSoup, show_cells_re
from subliminal.subtitle import fix_line_ending
from subliminal_patch.utils import sanitize
from subliminal_patch.exceptions import TooManyRequests
from subliminal_patch.pitcher import pitchers, load_verification, store_verification
from subzero.language import Language
logger = logging.getLogger(__name__)
#: Series header parsing regex
series_year_re = re.compile(r'^(?P<series>[ \w\'.:(),*&!?-]+?)(?: \((?P<year>\d{4})\))?$')
SHOW_EXPIRATION_TIME = datetime.timedelta(weeks=1).total_seconds()
class Addic7edSubtitle(_Addic7edSubtitle):
hearing_impaired_verifiable = True
def __init__(self, language, hearing_impaired, page_link, series, season, episode, title, year, version,
download_link):
super(Addic7edSubtitle, self).__init__(language, hearing_impaired, page_link, series, season, episode,
title, year, version, download_link)
self.release_info = version
def get_matches(self, video):
matches = super(Addic7edSubtitle, self).get_matches(video)
if not subliminal.score.episode_scores.get("addic7ed_boost"):
return matches
# if the release group matches, the format is most likely correct, as well
if "release_group" in matches:
matches.add("format")
if {"series", "season", "episode", "year"}.issubset(matches) and "format" in matches:
matches.add("addic7ed_boost")
logger.info("Boosting Addic7ed subtitle by %s" % subliminal.score.episode_scores.get("addic7ed_boost"))
return matches
def __repr__(self):
return '<%s %r [%s]>' % (
self.__class__.__name__, u"http://www.addic7ed.com/%s" % self.download_link, self.language)
class Addic7edProvider(_Addic7edProvider):
languages = {Language('por', 'BR')} | {Language(l) for l in [
'ara', 'aze', 'ben', 'bos', 'bul', 'cat', 'ces', 'dan', 'deu', 'ell', 'eng', 'eus', 'fas', 'fin', 'fra', 'glg',
'heb', 'hrv', 'hun', 'hye', 'ind', 'ita', 'jpn', 'kor', 'mkd', 'msa', 'nld', 'nor', 'pol', 'por', 'ron', 'rus',
'slk', 'slv', 'spa', 'sqi', 'srp', 'swe', 'tha', 'tur', 'ukr', 'vie', 'zho'
]} | {Language.fromietf(l) for l in ["sr-Latn", "sr-Cyrl"]}
USE_ADDICTED_RANDOM_AGENTS = False
hearing_impaired_verifiable = True
subtitle_class = Addic7edSubtitle
server_url = 'https://www.addic7ed.com/'
sanitize_characters = {'-', ':', '(', ')', '.', '/'}
def __init__(self, username=None, password=None, use_random_agents=False):
super(Addic7edProvider, self).__init__(username=username, password=password)
self.USE_ADDICTED_RANDOM_AGENTS = use_random_agents
def initialize(self):
self.session = Session()
self.session.headers['User-Agent'] = 'Subliminal/%s' % subliminal.__short_version__
from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST
logger.debug("Addic7ed: using random user agents")
self.session.headers['User-Agent'] = AGENT_LIST[randint(0, len(AGENT_LIST) - 1)]
self.session.headers['Referer'] = self.server_url
# login
if self.username and self.password:
def check_verification(cache_region):
rr = self.session.get(self.server_url + 'panel.php', allow_redirects=False, timeout=10,
headers={"Referer": self.server_url})
if rr.status_code == 302:
logger.info('Addic7ed: Login expired')
cache_region.delete("addic7ed_data")
else:
logger.info('Addic7ed: Re-using old login')
self.logged_in = True
return True
if load_verification("addic7ed", self.session, callback=check_verification):
return
logger.info('Addic7ed: Logging in')
data = {'username': self.username, 'password': self.password, 'Submit': 'Log in', 'url': '',
'remember': 'true'}
tries = 0
while tries < 3:
r = self.session.get(self.server_url + 'login.php', timeout=10, headers={"Referer": self.server_url})
if "grecaptcha" in r.text:
logger.info('Addic7ed: Solving captcha. This might take a couple of minutes, but should only '
'happen once every so often')
site_key = re.search(r'grecaptcha.execute\(\'(.+?)\',', r.text).group(1)
if not site_key:
logger.error("Addic7ed: Captcha site-key not found!")
return
pitcher = pitchers.get_pitcher()("Addic7ed", self.server_url + 'login.php', site_key,
user_agent=self.session.headers["User-Agent"],
cookies=self.session.cookies.get_dict(),
is_invisible=True)
result = pitcher.throw()
if not result:
raise Exception("Addic7ed: Couldn't solve captcha!")
data["recaptcha_response"] = result
r = self.session.post(self.server_url + 'dologin.php', data, allow_redirects=False, timeout=10,
headers={"Referer": self.server_url + "login.php"})
if "relax, slow down" in r.text:
raise TooManyRequests(self.username)
if r.status_code != 302:
if "User <b></b> doesn't exist" in r.text and tries <= 2:
logger.info("Addic7ed: Error, trying again. (%s/%s)", tries+1, 3)
tries += 1
continue
raise AuthenticationError(self.username)
break
store_verification("addic7ed", self.session)
logger.debug('Addic7ed: Logged in')
self.logged_in = True
def terminate(self):
self.session.close()
def get_show_id(self, series, year=None, country_code=None):
"""Get the best matching show id for `series`, `year` and `country_code`.
First search in the result of :meth:`_get_show_ids` and fallback on a search with :meth:`_search_show_id`.
:param str series: series of the episode.
:param year: year of the series, if any.
:type year: int
:param country_code: country code of the series, if any.
:type country_code: str
:return: the show id, if found.
:rtype: int
"""
series_sanitized = sanitize(series).lower()
show_ids = self._get_show_ids()
show_id = None
# attempt with country
if not show_id and country_code:
logger.debug('Getting show id with country')
show_id = show_ids.get('%s %s' % (series_sanitized, country_code.lower()))
# attempt with year
if not show_id and year:
logger.debug('Getting show id with year')
show_id = show_ids.get('%s %d' % (series_sanitized, year))
# attempt clean
if not show_id:
logger.debug('Getting show id')
show_id = show_ids.get(series_sanitized)
# search as last resort
# broken right now
# if not show_id:
# logger.warning('Series %s not found in show ids', series)
# show_id = self._search_show_id(series)
return show_id
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME)
def _get_show_ids(self):
"""Get the ``dict`` of show ids per series by querying the `shows.php` page.
:return: show id per series, lower case and without quotes.
:rtype: dict
# patch: add punctuation cleaning
"""
# get the show page
logger.info('Getting show ids')
r = self.session.get(self.server_url + 'shows.php', timeout=10)
r.raise_for_status()
# LXML parser seems to fail when parsing Addic7ed.com HTML markup.
# Last known version to work properly is 3.6.4 (next version, 3.7.0, fails)
# Assuming the site's markup is bad, and stripping it down to only contain what's needed.
show_cells = re.findall(show_cells_re, r.content)
if show_cells:
soup = ParserBeautifulSoup(b''.join(show_cells), ['lxml', 'html.parser'])
else:
# If RegEx fails, fall back to original r.text and use 'html.parser'
soup = ParserBeautifulSoup(r.text, ['html.parser'])
# populate the show ids
show_ids = {}
for show in soup.select('td > h3 > a[href^="/show/"]'):
show_clean = sanitize(show.text, default_characters=self.sanitize_characters)
try:
show_id = int(show['href'][6:])
except ValueError:
continue
show_ids[show_clean] = show_id
match = series_year_re.match(show_clean)
if match and match.group(2) and match.group(1) not in show_ids:
# year found, also add it without year
show_ids[match.group(1)] = show_id
soup.decompose()
soup = None
logger.debug('Found %d show ids', len(show_ids))
return show_ids
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME)
def _search_show_id(self, series, year=None):
"""Search the show id from the `series` and `year`.
:param str series: series of the episode.
:param year: year of the series, if any.
:type year: int
:return: the show id, if found.
:rtype: int
"""
# addic7ed doesn't support search with quotes
series = series.replace('\'', ' ')
# build the params
series_year = '%s %d' % (series, year) if year is not None else series
params = {'search': series_year, 'Submit': 'Search'}
# make the search
logger.info('Searching show ids with %r', params)
# currently addic7ed searches via srch.php from the front page, then a re-search is needed which calls
# search.php
for endpoint in ("srch.php", "search.php",):
headers = None
if endpoint == "search.php":
headers = {
"referer": self.server_url + "srch.php"
}
r = self.session.get(self.server_url + endpoint, params=params, timeout=10, headers=headers)
r.raise_for_status()
if r.text and "Sorry, your search" not in r.text:
break
time.sleep(4)
if r.status_code == 304:
raise TooManyRequests()
soup = ParserBeautifulSoup(r.text, ['lxml', 'html.parser'])
suggestion = None
# get the suggestion
try:
suggestion = soup.select('span.titulo > a[href^="/show/"]')
if not suggestion:
logger.warning('Show id not found: no suggestion')
return None
if not sanitize(suggestion[0].i.text.replace('\'', ' '),
default_characters=self.sanitize_characters) == \
sanitize(series_year, default_characters=self.sanitize_characters):
logger.warning('Show id not found: suggestion does not match')
return None
show_id = int(suggestion[0]['href'][6:])
logger.debug('Found show id %d', show_id)
return show_id
finally:
soup.decompose()
soup = None
def query(self, show_id, series, season, year=None, country=None):
# patch: fix logging
# get the page of the season of the show
logger.info('Getting the page of show id %d, season %d', show_id, season)
r = self.session.get(self.server_url + 'ajax_loadShow.php',
params={'show': show_id, 'season': season},
timeout=10,
headers={
"referer": "%sshow/%s" % (self.server_url, show_id),
"X-Requested-With": "XMLHttpRequest"
}
)
r.raise_for_status()
if r.status_code == 304:
raise TooManyRequests()
if not r.text:
# Provider wrongful return a status of 304 Not Modified with an empty content
# raise_for_status won't raise exception for that status code
logger.error('No data returned from provider')
return []
soup = ParserBeautifulSoup(r.text, ['lxml', 'html.parser'])
# loop over subtitle rows
subtitles = []
for row in soup.select('tr.epeven'):
cells = row('td')
# ignore incomplete subtitles
status = cells[5].text
if status != 'Completed':
logger.debug('Ignoring subtitle with status %s', status)
continue
# read the item
language = Language.fromaddic7ed(cells[3].text)
hearing_impaired = bool(cells[6].text)
page_link = self.server_url + cells[2].a['href'][1:]
season = int(cells[0].text)
episode = int(cells[1].text)
title = cells[2].text
version = cells[4].text
download_link = cells[9].a['href'][1:]
subtitle = self.subtitle_class(language, hearing_impaired, page_link, series, season, episode, title,
year,
version, download_link)
logger.debug('Found subtitle %r', subtitle)
subtitles.append(subtitle)
soup.decompose()
soup = None
return subtitles
def download_subtitle(self, subtitle):
# download the subtitle
r = self.session.get(self.server_url + subtitle.download_link, headers={'Referer': subtitle.page_link},
timeout=10)
r.raise_for_status()
if r.status_code == 304:
raise TooManyRequests()
if not r.text:
# Provider wrongful return a status of 304 Not Modified with an empty content
# raise_for_status won't raise exception for that status code
logger.error('Unable to download subtitle. No data returned from provider')
return
# detect download limit exceeded
if r.headers['Content-Type'] == 'text/html':
raise DownloadLimitExceeded
subtitle.content = fix_line_ending(r.content)