diff --git a/libs/subliminal_patch/providers/legendastv.py b/libs/subliminal_patch/providers/legendastv.py deleted file mode 100644 index 3c3476b53..000000000 --- a/libs/subliminal_patch/providers/legendastv.py +++ /dev/null @@ -1,289 +0,0 @@ -# coding=utf-8 -from __future__ import absolute_import -import logging -import rarfile -import os -from subliminal.exceptions import ConfigurationError - -from subliminal.providers.legendastv import LegendasTVSubtitle as _LegendasTVSubtitle, \ - LegendasTVProvider as _LegendasTVProvider, Episode, Movie, guessit, sanitize, region, type_map, \ - raise_for_status, json, SHOW_EXPIRATION_TIME, title_re, season_re, datetime, pytz, NO_VALUE, releases_key, \ - SUBTITLE_EXTENSIONS, language_converters, ServiceUnavailable - -from requests.exceptions import RequestException -from subliminal_patch.providers import reinitialize_on_error -from subliminal_patch.subtitle import guess_matches -from subzero.language import Language - -logger = logging.getLogger(__name__) - - -class LegendasTVSubtitle(_LegendasTVSubtitle): - def __init__(self, language, type, title, year, imdb_id, season, archive, name): - super(LegendasTVSubtitle, self).__init__(language, type, title, year, imdb_id, season, archive, name) - self.archive.content = None - self.release_info = name.rstrip('.srt').split('/')[-1] - self.page_link = archive.link - - def make_picklable(self): - self.archive.content = None - return self - - def get_matches(self, video, hearing_impaired=False): - matches = set() - - # episode - if isinstance(video, Episode) and self.type == 'episode': - # series - if video.series and (sanitize(self.title) in ( - sanitize(name) for name in [video.series] + video.alternative_series)): - matches.add('series') - - # year - if video.original_series and self.year is None or video.year and video.year == self.year: - matches.add('year') - - # imdb_id - if video.series_imdb_id and self.imdb_id == video.series_imdb_id: - matches.add('series_imdb_id') - - # movie - elif isinstance(video, Movie) and self.type == 'movie': - # title - if video.title and (sanitize(self.title) in ( - sanitize(name) for name in [video.title] + video.alternative_titles)): - matches.add('title') - - # year - if video.year and self.year == video.year: - matches.add('year') - - # imdb_id - if video.imdb_id and self.imdb_id == video.imdb_id: - matches.add('imdb_id') - - # name - matches |= guess_matches(video, guessit(self.name, {'type': self.type})) - - return matches - - -class LegendasTVProvider(_LegendasTVProvider): - languages = {Language(*l) for l in language_converters['legendastv'].to_legendastv.keys()} - video_types = (Episode, Movie) - subtitle_class = LegendasTVSubtitle - - def __init__(self, username=None, password=None, featured_only=False): - - # Provider needs UNRAR installed. If not available raise ConfigurationError - try: - rarfile.tool_setup() - except rarfile.RarCannotExec: - raise ConfigurationError('RAR extraction tool not available') - - if any((username, password)) and not all((username, password)): - raise ConfigurationError('Username and password must be specified') - - self.username = username - self.password = password - self.logged_in = False - self.session = None - self.featured_only = featured_only - - @staticmethod - def is_valid_title(title, title_id, sanitized_title, season, year, imdb_id): - """Check if is a valid title.""" - if title["imdb_id"] and title["imdb_id"] == imdb_id: - logger.debug(u'Matched title "%s" as IMDB ID %s', sanitized_title, title["imdb_id"]) - return True - - if title["title2"] and sanitize(title['title2']) == sanitized_title: - logger.debug(u'Matched title "%s" as "%s"', sanitized_title, title["title2"]) - return True - - return _LegendasTVProvider.is_valid_title(title, title_id, sanitized_title, season, year) - - @region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME, should_cache_fn=lambda value: value) - def search_titles(self, titles, season, title_year, imdb_id): - """Search for titles matching the `title`. - - For episodes, each season has it own title - :param str titles: the titles to search for. - :param int season: season of the title - :param int title_year: year of the title - :return: found titles. - :rtype: dict - """ - titles_found = {} - - for title in titles: - sanitized_titles = [sanitize(title)] - ignore_characters = {'\'', '.'} - if any(c in title for c in ignore_characters): - sanitized_titles.append(sanitize(title, ignore_characters=ignore_characters)) - - for sanitized_title in sanitized_titles: - # make the query - if season: - logger.info('Searching episode title %r for season %r', sanitized_title, season) - else: - logger.info('Searching movie title %r', sanitized_title) - - r = self.session.get(self.server_url + 'legenda/sugestao/{}'.format(sanitized_title), timeout=10) - raise_for_status(r) - results = json.loads(r.text) - - # loop over results - for result in results: - source = result['_source'] - - # extract id - title_id = int(source['id_filme']) - - # extract type - title = {'type': type_map[source['tipo']], 'title2': None, 'imdb_id': None} - - # extract title, year and country - name, year, country = title_re.match(source['dsc_nome']).groups() - title['title'] = name - - if "dsc_nome_br" in source: - name2, ommit1, ommit2 = title_re.match(source['dsc_nome_br']).groups() - title['title2'] = name2 - - # extract imdb_id - if source['id_imdb'] != '0': - if not source['id_imdb'].startswith('tt'): - title['imdb_id'] = 'tt' + source['id_imdb'].zfill(7) - else: - title['imdb_id'] = source['id_imdb'] - - # extract season - if title['type'] == 'episode': - if source['temporada'] and source['temporada'].isdigit(): - title['season'] = int(source['temporada']) - else: - match = season_re.search(source['dsc_nome_br']) - if match: - title['season'] = int(match.group('season')) - else: - logger.debug('No season detected for title %d (%s)', title_id, name) - - # extract year - if year: - title['year'] = int(year) - elif source['dsc_data_lancamento'] and source['dsc_data_lancamento'].isdigit(): - # year is based on season air date hence the adjustment - title['year'] = int(source['dsc_data_lancamento']) - title.get('season', 1) + 1 - - # add title only if is valid - # Check against title without ignored chars - if self.is_valid_title(title, title_id, sanitized_titles[0], season, title_year, imdb_id): - logger.debug(u'Found title: %s', title) - titles_found[title_id] = title - - logger.debug('Found %d titles', len(titles_found)) - - return titles_found - - @reinitialize_on_error((RequestException, ServiceUnavailable), attempts=1) - def query(self, language, titles, season=None, episode=None, year=None, imdb_id=None): - # search for titles - titles_found = self.search_titles(titles, season, year, imdb_id) - - subtitles = [] - # iterate over titles - for title_id, t in titles_found.items(): - # Skip episodes or movies if it's not what was requested - if (season and t['type'] == 'movie') or (not season and t['type'] == 'episode'): - continue - - # Skip if season isn't matching - if season and season != t.get('season'): - continue - - # Skip if season wasn't provided (not an episode) but one is returned by provider (wrong type) - if not season and t.get('season'): - continue - - logger.info('Getting archives for title %d and language %d', title_id, language.legendastv) - archives = self.get_archives(title_id, language.legendastv, t['type'], season, episode) - if not archives: - logger.info('No archives found for title %d and language %d', title_id, language.legendastv) - - # iterate over title's archives - for a in archives: - - # Check if featured - if self.featured_only and a.featured == False: - logger.info('Subtitle is not featured, skipping') - continue - - # compute an expiration time based on the archive timestamp - expiration_time = (datetime.utcnow().replace(tzinfo=pytz.utc) - a.timestamp).total_seconds() - - # attempt to get the releases from the cache - cache_key = str(a.id + "|" + a.name) - releases = region.get(cache_key, expiration_time=expiration_time) - - # the releases are not in cache or cache is expired - if releases == NO_VALUE: - logger.info('Releases not found in cache') - - # download archive - self.download_archive(a) - - # extract the releases - releases = [] - for name in a.content.namelist(): - # discard the legendastv file - if name.startswith('Legendas.tv'): - continue - - # discard hidden files - if os.path.split(name)[-1].startswith('.'): - continue - - # discard non-subtitle files - if not name.lower().endswith(SUBTITLE_EXTENSIONS): - continue - - releases.append(name) - - # cache the releases - region.set(cache_key, releases) - - # iterate over releases - for r in releases: - subtitle = self.subtitle_class(language, t['type'], t['title'], t.get('year'), t.get('imdb_id'), - t.get('season'), a, r) - logger.debug('Found subtitle %r', subtitle) - subtitles.append(subtitle) - - return subtitles - - def list_subtitles(self, video, languages): - season = episode = None - if isinstance(video, Episode): - titles = [video.series] + video.alternative_series - season = video.season - episode = video.episode - imdb = video.series_imdb_id - else: - titles = [video.title] + video.alternative_titles - imdb = video.imdb_id - - subtitles = [s for l in languages for s in - self.query(l, titles, season=season, episode=episode, year=video.year, imdb_id=imdb)] - if subtitles: - return subtitles - else: - return [] - - def download_subtitle(self, subtitle): - super(LegendasTVProvider, self).download_subtitle(subtitle) - subtitle.archive.content = None - - def get_archives(self, title_id, language_code, title_type, season, episode): - return super(LegendasTVProvider, self).get_archives.original(self, title_id, language_code, title_type, - season, episode)