added more improvement to Titulki provider

pull/1600/head v1.0.1-beta.8
Samuel Bartík 3 years ago committed by GitHub
parent 006e17bdc2
commit 618bddebf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,7 +4,6 @@ from __future__ import absolute_import
import io
import logging
import math
import os
import re
import zipfile
from random import randint
@ -15,13 +14,19 @@ import rarfile
from guessit import guessit
from requests import Session
from requests.adapters import HTTPAdapter
from subliminal import __short_version__
from requests.exceptions import HTTPError
from subliminal.exceptions import AuthenticationError, ConfigurationError, DownloadLimitExceeded, Error, ProviderError
from subliminal.providers import ParserBeautifulSoup, Provider
from subliminal.subtitle import SUBTITLE_EXTENSIONS, Subtitle, fix_line_ending
from subliminal.providers import ParserBeautifulSoup
from subliminal.subtitle import fix_line_ending
from subliminal.video import Episode, Movie
from subliminal_patch.exceptions import ParseResponseError
from subliminal_patch.providers import Provider
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subliminal_patch.score import framerate_equal
from subliminal_patch.subtitle import guess_matches, sanitize
from subliminal_patch.subtitle import Subtitle, guess_matches, sanitize
from subzero.language import Language
from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST
@ -36,26 +41,29 @@ class TitulkySubtitle(Subtitle):
hash_verifiable = False
hearing_impaired_verifiable = False
def __init__(self, sub_id, language, names, season, episode, year, release_info, fps, uploader, approved, page_link, download_link, skip_wrong_fps=False):
def __init__(self, sub_id, imdb_id, language, names, season, episode, year, releases, fps, uploader, approved, page_link, download_link, skip_wrong_fps=False, asked_for_episode=None):
super().__init__(language, page_link=page_link)
self.names = names
self.year = year
self.sub_id = sub_id
self.imdb_id = imdb_id
self.fps = fps
self.season = season
self.episode = episode
self.release_info = release_info
self.releases = releases
self.release_info = ', '.join(releases)
self.language = language
self.approved = approved
self.page_link = page_link
self.uploader = uploader
self.download_link = download_link
self.skip_wrong_fps = skip_wrong_fps
self.asked_for_episode = asked_for_episode
self.matches = None
# Try to parse S00E00 string from the main subtitle name
season_episode_string = re.findall('S(\d+)E(\d+)', self.names[0], re.IGNORECASE)
season_episode_string = re.findall(r'S(\d+)E(\d+)', self.names[0], re.IGNORECASE)
# If we did not search for subtitles with season and episode numbers in search query,
# try to parse it from the main subtitle name that most likely contains it
@ -79,7 +87,11 @@ class TitulkySubtitle(Subtitle):
if _type == 'episode':
## EPISODE
# match imdb_id of a series
if video.series_imdb_id and video.series_imdb_id == self.imdb_id:
matches.add('series_imdb_id')
# match season/episode
if self.season and self.season == video.season:
matches.add('season')
@ -94,11 +106,15 @@ class TitulkySubtitle(Subtitle):
# match episode title
episode_titles = [video.title]
if _contains_element(_from=episode_titles, _in=self.names):
matches.add('title')
matches.add('episode_title')
elif _type == 'movie':
## MOVIE
# match imdb_id of a movie
if video.imdb_id and video.imdb_id == self.imdb_id:
matches.add('imdb_id')
# match movie title
video_titles = [video.title] + video.alternative_titles
if _contains_element(_from=video_titles, _in=self.names):
@ -110,8 +126,9 @@ class TitulkySubtitle(Subtitle):
if video.year and video.year == self.year:
matches.add('year')
# match other properties based on release info
matches |= guess_matches(video, guessit(self.release_info, {"type": _type}))
# match other properties based on release infos
for release in self.releases:
matches |= guess_matches(video, guessit(release, {"type": _type}))
# If turned on in settings, then do not match if video FPS is not equal to subtitle FPS
if self.skip_wrong_fps and video.fps and self.fps and not framerate_equal(video.fps, self.fps):
@ -123,7 +140,7 @@ class TitulkySubtitle(Subtitle):
return matches
class TitulkyProvider(Provider):
class TitulkyProvider(Provider, ProviderSubtitleArchiveMixin):
"""Titulky.com provider"""
languages = {Language(l) for l in ['ces', 'slk']}
@ -220,7 +237,7 @@ class TitulkyProvider(Provider):
res = self.session.get(url, timeout=self.timeout)
if res.status_code != 200:
raise ProviderError(f"Fetch failed with status code {res.status_code}")
raise HTTPError(f"Fetch failed with status code {res.status_code}")
if not res.text:
raise ProviderError("No response returned from the provider")
@ -235,7 +252,7 @@ class TitulkyProvider(Provider):
for key, value in params.items():
result += f'{key}={value}&'
# Remove last &
# Remove the last &
result = result[:-1]
# Remove spaces
@ -243,23 +260,34 @@ class TitulkyProvider(Provider):
return result
# Parse details of an individual subtitle: release, language, uploader, fps and year
# Parse details of an individual subtitle: imdb_id, release, language, uploader, fps and year
def parse_details(self, url):
html_src = self.fetch_page(url)
details_page_soup = ParserBeautifulSoup(html_src, ['lxml', 'html.parser'])
details_container = details_page_soup.find('div', class_='detail')
if not details_container:
# The subtitles were removed and got redirected to a different page. Better treat this silently.
# The subtitles could be removed and got redirected to a different page. Better treat this silently.
logger.debug("Titulky.com: Could not find details div container. Skipping.")
return False
### IMDB ID
imdb_id = None
imdb_tag = details_container.find('a', attrs={'target': 'imdb'})
if imdb_tag:
imdb_url = imdb_tag.get('href')
imdb_id = re.findall(r'tt(\d+)', imdb_url)[0]
if not imdb_id:
logger.debug("Titulky.com: No IMDB ID supplied on details page.")
### RELEASE
release = None
release_tag = details_container.find('div', class_='releas')
if not release_tag:
raise Error("Could not find release tag. Did the HTML source change?")
raise ParseResponseError("Could not find release tag. Did the HTML source change?")
release = release_tag.get_text(strip=True)
@ -284,12 +312,12 @@ class TitulkyProvider(Provider):
uploader_tag = details_container.find('div', class_='ulozil')
if not uploader_tag:
raise Error("Could not find uploader tag. Did the HTML source change?")
raise ParseResponseError("Could not find uploader tag. Did the HTML source change?")
uploader_anchor_tag = uploader_tag.find('a')
if not uploader_anchor_tag:
raise Error("Could not find uploader anchor tag. Did the HTML source change?")
raise ParseResponseError("Could not find uploader anchor tag. Did the HTML source change?")
uploader = uploader_anchor_tag.string.strip() if uploader_anchor_tag else None
@ -301,11 +329,11 @@ class TitulkyProvider(Provider):
fps_icon_tag_selection = details_container.select('img[src*=\'Movieroll\']')
if not fps_icon_tag_selection and not hasattr(fps_icon_tag_selection[0], 'parent'):
raise Error("Could not find parent of the fps icon tag. Did the HTML source change?")
raise ParseResponseError("Could not find parent of the fps icon tag. Did the HTML source change?")
fps_icon_tag = fps_icon_tag_selection[0]
parent_text = fps_icon_tag.parent.get_text(strip=True)
match = re.findall('(\d+,\d+) fps', parent_text)
match = re.findall(r'(\d+,\d+) fps', parent_text)
# If the match is found, change the decimal separator to a dot and convert to float
fps = float(match[0].replace(',', '.')) if len(match) > 0 else None
@ -318,7 +346,7 @@ class TitulkyProvider(Provider):
h1_tag = details_container.find('h1', id='titulky')
if not h1_tag:
raise Error("Could not find h1 tag. Did the HTML source change?")
raise ParseResponseError("Could not find h1 tag. Did the HTML source change?")
# The h1 tag contains the name of the subtitle and a year
h1_texts = [text for text in h1_tag.stripped_strings]
@ -334,11 +362,12 @@ class TitulkyProvider(Provider):
# Return the subtitle details
return {
'release': release,
'releases': [release],
'language': language,
'uploader': uploader,
'fps': fps,
'year': year
'year': year,
'imdb_id': imdb_id
}
def process_row(self, row, video_names, thread_id=None, threads_data=None):
@ -347,7 +376,7 @@ class TitulkyProvider(Provider):
anchor_tag = row.find_all('a')[1]
# The details link is relative, so we need to remove the dot at the beginning
details_link = f"{self.server_url}{anchor_tag.get('href')[1:]}"
id_match = re.findall('id=(\d+)', details_link)
id_match = re.findall(r'id=(\d+)', details_link)
sub_id = id_match[0] if len(id_match) > 0 else None
download_link = f"{self.download_url}{sub_id}"
@ -492,13 +521,13 @@ class TitulkyProvider(Provider):
table = search_page_soup.find('table', class_='table')
if not table:
logger.debug("Titulky.com: Could not find table")
raise Error("Could not find table. Did the HTML source change?")
raise ParseResponseError("Could not find table. Did the HTML source change?")
# Get table body containing rows of subtitles
table_body = table.find('tbody')
if not table_body:
logger.debug("Titulky.com: Could not find table body")
raise Error("Could not find table body. Did the HTML source change?")
raise ParseResponseError("Could not find table body. Did the HTML source change?")
## Loop over all subtitles on the first page and put them in a list
subtitles = []
@ -514,9 +543,12 @@ class TitulkyProvider(Provider):
# and we can instationate it and add it to the list
if sub_info:
logger.debug(f"Titulky.com: Sucessfully retrieved subtitle info, row: {i}")
# If we found the subtitle by IMDB ID, no need to get it from details page
sub_imdb_id = imdb_id or sub_info['imdb_id']
subtitle_instance = self.subtitle_class(sub_info['id'], sub_info['language'], sub_info['names'], season, episode, sub_info['year'], sub_info['release'], sub_info['fps'],
sub_info['uploader'], sub_info['approved'], sub_info['details_link'], sub_info['download_link'], skip_wrong_fps=self.skip_wrong_fps)
subtitle_instance = self.subtitle_class(sub_info['id'], sub_imdb_id, sub_info['language'], sub_info['names'], season, episode, sub_info['year'], sub_info['releases'], sub_info['fps'],
sub_info['uploader'], sub_info['approved'], sub_info['details_link'], sub_info['download_link'], skip_wrong_fps=self.skip_wrong_fps, asked_for_episode=(type == 'episode'))
subtitles.append(subtitle_instance)
else:
# No subtitle info was returned, i. e. something unexpected
@ -558,7 +590,7 @@ class TitulkyProvider(Provider):
# If the thread returned didn't return anything, but expected a dict object
if not thread_data:
raise Error(f"No data returned from thread ID: {i}")
raise ProviderError(f"No data returned from thread ID: {i}")
# If an exception was raised in a thread, raise it again here
if 'exception' in thread_data and thread_data['exception']:
@ -571,8 +603,11 @@ class TitulkyProvider(Provider):
logger.debug(f"Titulky.com: Sucessfully retrieved subtitle info, thread ID: {i}")
sub_info = thread_data['sub_info']
subtitle_instance = self.subtitle_class(sub_info['id'], sub_info['language'], sub_info['names'], season, episode, sub_info['year'], sub_info['release'], sub_info['fps'],
sub_info['uploader'], sub_info['approved'], sub_info['details_link'], sub_info['download_link'], skip_wrong_fps=self.skip_wrong_fps)
# If we found the subtitle by IMDB ID, no need to get it from details page
sub_imdb_id = imdb_id or sub_info['imdb_id']
subtitle_instance = self.subtitle_class(sub_info['id'], sub_imdb_id, sub_info['language'], sub_info['names'], season, episode, sub_info['year'], sub_info['releases'], sub_info['fps'],
sub_info['uploader'], sub_info['approved'], sub_info['details_link'], sub_info['download_link'], skip_wrong_fps=self.skip_wrong_fps, asked_for_episode=(type == 'episode'))
subtitles.append(subtitle_instance)
else:
# The thread returned data, but it didn't contain a subtitle info, i. e. something unexpected
@ -639,51 +674,34 @@ class TitulkyProvider(Provider):
return subtitles
# The rest is mostly old code from original implementation. Might want to redo it.
def download_subtitle(self, subtitle):
res = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link},
timeout=self.timeout)
res.raise_for_status()
timeout=self.timeout)
try:
res.raise_for_status()
except:
raise HTTPError(f"An error occured during the download request to {subtitle.download_link}")
archive_stream = io.BytesIO(res.content)
archive = None
if rarfile.is_rarfile(archive_stream):
logger.debug("Titulky.com: Identified rar archive")
archive = rarfile.RarFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
subtitle_content = self.get_subtitle_from_archive(subtitle, archive)
elif zipfile.is_zipfile(archive_stream):
logger.debug("Titulky.com: Identified zip archive")
archive = zipfile.ZipFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
else:
subtitle_content = res.content
if subtitle_content:
subtitle.content = fix_line_ending(subtitle_content)
return subtitle_content
subtitle_content = self.get_subtitle_from_archive(subtitle, archive)
else:
logger.debug(f"Titulky.com: Could not extract subtitle from {archive}")
def _get_subtitle_from_archive(archive):
if '_info.txt' in archive.namelist():
info_content_binary = archive.read('_info.txt')
info_content = info_content_binary.decode(chardet.detect(info_content_binary)['encoding'])
if "nestaženo - překročen limit" in info_content:
raise DownloadLimitExceeded("The download limit has been exceeded")
for name in archive.namelist():
# discard hidden files
if os.path.split(name)[-1].startswith('.'):
continue
subtitle_content = fix_line_ending(res.content)
# discard non-subtitle files
if not name.lower().endswith(SUBTITLE_EXTENSIONS):
continue
if not subtitle_content:
logger.debug("Titulky.com: No subtitle content found. The downloading limit has been most likely exceeded.")
raise DownloadLimitExceeded("Subtitles download limit has been exceeded")
subtitle.content = subtitle_content
return archive.read(name)
return None
# Check if any element from source array is **contained** in any element from target array
# Returns on the first match
def _contains_element(_from=None, _in=None):
@ -695,4 +713,4 @@ def _contains_element(_from=None, _in=None):
if sanitize(source) in sanitize(target):
return True
return False
return False

Loading…
Cancel
Save