Merge remote-tracking branch 'origin/development' into development

pull/1055/head
Louis Vézina 4 years ago
commit 9b048dd3ef

@ -124,6 +124,7 @@ class Notifications(Resource):
database.execute("UPDATE table_settings_notifier SET enabled = ?, url = ? WHERE name = ?",
(item['enabled'], item['url'], item['name']))
save_settings(zip(request.form.keys(), request.form.listvalues()))
return '', 204
@ -606,7 +607,8 @@ class EpisodesSubtitlesManualDownload(Resource):
subs_id = result[6]
subs_path = result[7]
history_log(2, sonarrSeriesId, sonarrEpisodeId, message, path, language_code, provider, score, subs_id, subs_path)
send_notifications(sonarrSeriesId, sonarrEpisodeId, message)
if not settings.general.getboolean('dont_notify_manual_actions'):
send_notifications(sonarrSeriesId, sonarrEpisodeId, message)
store_subtitles(path, episodePath)
return result, 201
except OSError:
@ -653,7 +655,8 @@ class EpisodesSubtitlesUpload(Resource):
provider = "manual"
score = 360
history_log(4, sonarrSeriesId, sonarrEpisodeId, message, path, language_code, provider, score, subtitles_path=subs_path)
send_notifications(sonarrSeriesId, sonarrEpisodeId, message)
if not settings.general.getboolean('dont_notify_manual_actions'):
send_notifications(sonarrSeriesId, sonarrEpisodeId, message)
store_subtitles(path, episodePath)
return result, 201
@ -1057,7 +1060,8 @@ class MovieSubtitlesManualDownload(Resource):
subs_id = result[6]
subs_path = result[7]
history_log_movie(2, radarrId, message, path, language_code, provider, score, subs_id, subs_path)
send_notifications_movie(radarrId, message)
if not settings.general.getboolean('dont_notify_manual_actions'):
send_notifications_movie(radarrId, message)
store_subtitles_movie(path, moviePath)
return result, 201
except OSError:
@ -1103,7 +1107,8 @@ class MovieSubtitlesUpload(Resource):
provider = "manual"
score = 120
history_log_movie(4, radarrId, message, path, language_code, provider, score, subtitles_path=subs_path)
send_notifications_movie(radarrId, message)
if not settings.general.getboolean('dont_notify_manual_actions'):
send_notifications_movie(radarrId, message)
store_subtitles_movie(path, moviePath)
return result, 201

@ -58,7 +58,9 @@ defaults = {
'upgrade_manual': 'True',
'anti_captcha_provider': 'None',
'wanted_search_frequency': '3',
'wanted_search_frequency_movie': '3'
'wanted_search_frequency_movie': '3',
'subzero_mods': '',
'dont_notify_manual_actions': 'False'
},
'auth': {
'type': 'None',

@ -18,6 +18,7 @@ from subliminal import region, score as subliminal_scores, \
from subliminal_patch.core import SZAsyncProviderPool, download_best_subtitles, save_subtitles, download_subtitles, \
list_all_subtitles, get_subtitle_path
from subliminal_patch.score import compute_score
from subliminal_patch.subtitle import Subtitle
from get_languages import language_from_alpha3, alpha2_from_alpha3, alpha3_from_alpha2, language_from_alpha2, \
alpha2_from_language, alpha3_from_language
from config import settings
@ -177,12 +178,16 @@ def download_subtitle(path, language, audio_language, hi, forced, providers, pro
logging.info("BAZARR All providers are throttled")
return None
subz_mods = settings.general.subzero_mods.strip().split(',') if settings.general.subzero_mods.strip() else None
saved_any = False
if downloaded_subtitles:
for video, subtitles in downloaded_subtitles.items():
if not subtitles:
continue
for s in subtitles:
s.mods = subz_mods
try:
fld = get_target_folder(path)
chmod = int(settings.general.chmod, 8) if not sys.platform.startswith(
@ -417,6 +422,7 @@ def manual_download_subtitle(path, language, audio_language, hi, forced, subtitl
os.environ["SZ_KEEP_ENCODING"] = "True"
subtitle = pickle.loads(codecs.decode(subtitle.encode(), "base64"))
subtitle.mods = settings.general.subzero_mods.strip().split(',') if settings.general.subzero_mods.strip() else None
use_postprocessing = settings.general.getboolean('use_postprocessing')
postprocessing_cmd = settings.general.postprocessing_cmd
single = settings.general.getboolean('single_language')
@ -551,12 +557,6 @@ def manual_upload_subtitle(path, language, forced, title, scene_name, media_type
chmod = int(settings.general.chmod, 8) if not sys.platform.startswith(
'win') and settings.general.getboolean('chmod_enabled') else None
dest_directory = get_target_folder(path)
fake_video_path = None
if dest_directory:
fake_video_path = os.path.join(dest_directory, os.path.split(path)[1])
_, ext = os.path.splitext(subtitle.filename)
language = alpha3_from_alpha2(language)
if language == 'pob':
@ -567,48 +567,37 @@ def manual_upload_subtitle(path, language, forced, title, scene_name, media_type
if forced:
lang_obj = Language.rebuild(lang_obj, forced=True)
subtitle_path = get_subtitle_path(video_path=force_unicode(fake_video_path if fake_video_path else path),
language=None if single else lang_obj,
extension=ext,
forced_tag=forced)
sub = Subtitle(
lang_obj,
mods=settings.general.subzero_mods.strip().split(',') if settings.general.subzero_mods.strip() else None
)
subtitle_path = force_unicode(subtitle_path)
if os.path.exists(subtitle_path):
os.remove(subtitle_path)
sub.content = subtitle.read()
if not sub.is_valid():
logging.exception('BAZARR Invalid subtitle file: ' + subtitle.filename)
sub.mods = None
if settings.general.getboolean('utf8_encode'):
try:
os.remove(subtitle_path + ".tmp")
except:
pass
sub.set_encoding("utf-8")
subtitle.save(subtitle_path + ".tmp")
with open(subtitle_path + ".tmp", 'rb') as fr:
text = fr.read()
try:
guess = chardet.detect(text)
text = text.decode(guess["encoding"])
text = text.encode('utf-8')
except UnicodeError:
logging.exception("BAZARR subtitles file doesn't seems to be text based. Skipping this file: " +
subtitle_path)
else:
with open(subtitle_path, 'wb') as fw:
fw.write(text)
finally:
try:
os.remove(subtitle_path + ".tmp")
except:
pass
else:
subtitle.save(subtitle_path)
if chmod:
os.chmod(subtitle_path, chmod)
saved_subtitles = []
try:
saved_subtitles = save_subtitles(path,
[sub],
single=single,
tags=None, # fixme
directory=get_target_folder(path),
chmod=chmod,
# formats=("srt", "vtt")
path_decoder=force_unicode)
except:
pass
if len(saved_subtitles) < 1:
logging.exception('BAZARR Error saving Subtitles file to disk for this file:' + path)
return
subtitle_path = saved_subtitles[0].storage_path
message = language_from_alpha3(language) + (" forced" if forced else "") + " Subtitles manually uploaded."
uploaded_language_code3 = language

@ -57,7 +57,6 @@ class ZimukuSubtitle(Subtitle):
# episode
if isinstance(video, Episode):
# always make year a match
info = guessit(self.version, {"type": "episode"})
# other properties
matches |= guess_matches(video, info, partial=True)
@ -145,6 +144,19 @@ class ZimukuProvider(Provider):
logger.debug("No data returned from provider")
return []
html = r.content.decode("utf-8", "ignore")
# parse window location
pattern = r"url\s*=\s*'([^']*)'\s*\+\s*url"
parts = re.findall(pattern, html)
redirect_url = search_link
while parts:
parts.reverse()
redirect_url = urljoin(self.server_url, "".join(parts))
r = self.session.get(redirect_url, timeout=30)
html = r.content.decode("utf-8", "ignore")
parts = re.findall(pattern, html)
logger.debug("search url located: " + redirect_url)
soup = ParserBeautifulSoup(
r.content.decode("utf-8", "ignore"), ["lxml", "html.parser"]
)
@ -154,8 +166,12 @@ class ZimukuProvider(Provider):
logger.debug("enter a non-shooter page")
for item in soup.find_all("div", {"class": "item"}):
title_a = item.find("p", class_="tt clearfix").find("a")
subs_year = re.findall(r"\d{4}", title_a.text) or None
subs_year = year
if season:
# episode year in zimuku is the season's year not show's year
actual_subs_year = re.findall(r"\d{4}", title_a.text) or None
if actual_subs_year:
subs_year = int(actual_subs_year[0]) - season + 1
title = title_a.text
season_cn1 = re.search("第(.*)季", title)
if not season_cn1:

@ -0,0 +1,7 @@
# coding=utf-8
class EmptyEntryError(Exception):
pass
class EmptyLineError(Exception):
pass

@ -7,7 +7,8 @@ import pysubs2
import logging
import time
from .mods import EMPTY_TAG_PROCESSOR, EmptyEntryError
from .mods import EMPTY_TAG_PROCESSOR
from .exc import EmptyEntryError
from .registry import registry
from subzero.language import Language
import six
@ -15,8 +16,6 @@ import six
logger = logging.getLogger(__name__)
lowercase_re = re.compile(r'(?sux)[a-zà-ž]')
class SubtitleModifications(object):
debug = False
@ -189,7 +188,7 @@ class SubtitleModifications(object):
sub = processor.process(sub)
if sub.strip():
if lowercase_re.search(sub):
if not sub.isupper():
return False
entry_used = True
@ -302,11 +301,11 @@ class SubtitleModifications(object):
mod = self.initialized_mods[identifier]
try:
line = mod.modify(line.strip(), entry=entry.text, debug=self.debug, parent=self, index=index,
line = mod.modify(line.strip(), entry=t, debug=self.debug, parent=self, index=index,
**args)
except EmptyEntryError:
if self.debug:
logger.debug(u"%d: %s: %r -> ''", index, identifier, entry.text)
logger.debug(u"%d: %s: %r -> ''", index, identifier, t)
skip_entry = True
break
@ -331,11 +330,11 @@ class SubtitleModifications(object):
mod = self.initialized_mods[identifier]
try:
line = mod.modify(line.strip(), entry=entry.text, debug=self.debug, parent=self, index=index,
line = mod.modify(line.strip(), entry=t, debug=self.debug, parent=self, index=index,
procs=["last_process"], **args)
except EmptyEntryError:
if self.debug:
logger.debug(u"%d: %s: %r -> ''", index, identifier, entry.text)
logger.debug(u"%d: %s: %r -> ''", index, identifier, t)
skip_entry = True
break

@ -109,9 +109,3 @@ empty_line_post_processors = [
]
class EmptyEntryError(Exception):
pass
class EmptyLineError(Exception):
pass

@ -9,6 +9,7 @@ from subzero.modification.mods import SubtitleTextModification, empty_line_post_
from subzero.modification.processors import FuncProcessor
from subzero.modification.processors.re_processor import NReProcessor
from subzero.modification import registry
from tld import get_tld
ENGLISH = Language("eng")
@ -30,7 +31,7 @@ class CommonFixes(SubtitleTextModification):
NReProcessor(re.compile(r'(?u)(\w|\b|\s|^)(-\s?-{1,2})'), r"\1—", name="CM_multidash"),
# line = _/-/\s
NReProcessor(re.compile(r'(?u)(^\W*[-_.:>~]+\W*$)'), "", name="<CM_non_word_only"),
NReProcessor(re.compile(r'(?u)(^\W*[-_.:<>~"\']+\W*$)'), "", name="CM_non_word_only"),
# remove >>
NReProcessor(re.compile(r'(?u)^\s?>>\s*'), "", name="CM_leading_crocodiles"),
@ -115,7 +116,9 @@ class CommonFixes(SubtitleTextModification):
NReProcessor(re.compile(r'(?u)(?:(?<=^)|(?<=\w)) +([!?.,](?![!?.,]| \.))'), r"\1", name="CM_punctuation_space"),
# add space after punctuation
NReProcessor(re.compile(r'(?u)([!?.,:])([A-zÀ-ž]{2,})'), r"\1 \2", name="CM_punctuation_space2"),
NReProcessor(re.compile(r'(?u)(([^\s]*)([!?.,:])([A-zÀ-ž]{2,}))'),
lambda match: u"%s%s %s" % (match.group(2), match.group(3), match.group(4)) if not get_tld(match.group(1), fail_silently=True, fix_protocol=True) else match.group(1),
name="CM_punctuation_space2"),
# fix lowercase I in english
NReProcessor(re.compile(r'(?u)(\b)i(\b)'), r"\1I\2", name="CM_EN_lowercase_i",

@ -3,7 +3,8 @@ from __future__ import absolute_import
from __future__ import unicode_literals
import re
from subzero.modification.mods import SubtitleTextModification, empty_line_post_processors, EmptyEntryError, TAG
from subzero.modification.mods import SubtitleTextModification, empty_line_post_processors, TAG
from subzero.modification.exc import EmptyEntryError
from subzero.modification.processors.re_processor import NReProcessor
from subzero.modification import registry
@ -41,14 +42,14 @@ class HearingImpaired(SubtitleTextModification):
# possibly with a dash in front; try not breaking actual sentences with a colon at the end by not matching if
# a space is inside the text; ignore anything ending with a quote
NReProcessor(re.compile(r'(?u)(?:(?<=^)|(?<=[.\-!?\"]))([\s\->~]*((?=[A-zÀ-ž&+]\s*[A-zÀ-ž&+]\s*[A-zÀ-ž&+])'
r'[A-zÀ-ž-_0-9\s\"\'&+()\[\]]+:)(?![\"’ʼ❜‘‛”“‟„])\s*)(?![0-9])'),
r'[A-zÀ-ž-_0-9\s\"\'&+()\[\]]+:)(?![\"’ʼ❜‘‛”“‟„])\s*)(?![0-9]|//)'),
lambda match:
match.group(1) if (match.group(2).count(" ") > 0 or match.group(1).count("-") > 0)
else "" if not match.group(1).startswith(" ") else " ",
name="HI_before_colon_noncaps"),
# brackets (only remove if at least 3 chars in brackets)
NReProcessor(re.compile(r'(?sux)-?%(t)s[([][^([)\]]+?(?=[A-zÀ-ž"\'.]{3,})[^([)\]]+[)\]][\s:]*%(t)s' %
NReProcessor(re.compile(r'(?sux)-?%(t)s["\']*[([][^([)\]]+?(?=[A-zÀ-ž"\'.]{3,})[^([)\]]+[)\]]["\']*[\s:]*%(t)s' %
{"t": TAG}), "", name="HI_brackets"),
#NReProcessor(re.compile(r'(?sux)-?%(t)s[([]%(t)s(?=[A-zÀ-ž"\'.]{3,})[^([)\]]+%(t)s$' % {"t": TAG}),
@ -92,8 +93,8 @@ class HearingImpaired(SubtitleTextModification):
"", name="HI_music_symbols_only"),
# remove music entries
NReProcessor(re.compile(r'(?ums)(^[-\s>~]*[♫♪]+\s*.+|.+\s*[♫♪]+\s*$)'),
"", name="HI_music"),
NReProcessor(re.compile(r'(?ums)(^[-\s>~]*[*#¶♫♪]+\s*.+|.+\s*[*#¶♫♪]+\s*$)'),
"", name="HI_music", entry=True),
]

@ -3,6 +3,7 @@ from __future__ import absolute_import
import re
import logging
from subzero.modification.exc import EmptyEntryError
from subzero.modification.processors import Processor
logger = logging.getLogger(__name__)
@ -15,13 +16,22 @@ class ReProcessor(Processor):
pattern = None
replace_with = None
def __init__(self, pattern, replace_with, name=None, supported=None):
def __init__(self, pattern, replace_with, name=None, supported=None, entry=False, **kwargs):
super(ReProcessor, self).__init__(name=name, supported=supported)
self.pattern = pattern
self.replace_with = replace_with
self.use_entry = entry
def process(self, content, debug=False, **kwargs):
return self.pattern.sub(self.replace_with, content)
def process(self, content, debug=False, entry=None, **kwargs):
if not self.use_entry:
return self.pattern.sub(self.replace_with, content)
ret = self.pattern.sub(self.replace_with, entry)
if not ret:
raise EmptyEntryError()
elif ret != entry:
return ret
return content
class NReProcessor(ReProcessor):
@ -37,7 +47,7 @@ class MultipleWordReProcessor(ReProcessor):
}
replaces found key in pattern with the corresponding value in data
"""
def __init__(self, snr_dict, name=None, parent=None, supported=None):
def __init__(self, snr_dict, name=None, parent=None, supported=None, **kwargs):
super(ReProcessor, self).__init__(name=name, supported=supported)
self.snr_dict = snr_dict

@ -0,0 +1,24 @@
from .utils import (
get_fld,
get_tld,
get_tld_names,
is_tld,
parse_tld,
Result,
update_tld_names,
)
__title__ = 'tld'
__version__ = '0.12.2'
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'get_fld',
'get_tld',
'get_tld_names',
'is_tld',
'parse_tld',
'Result',
'update_tld_names',
)

@ -0,0 +1,68 @@
from codecs import open as codecs_open
from urllib.request import urlopen
from typing import Optional
from .exceptions import (
TldIOError,
TldImproperlyConfigured,
)
from .helpers import project_dir
from .registry import Registry
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = ('BaseTLDSourceParser',)
class BaseTLDSourceParser(metaclass=Registry):
"""Base TLD source parser."""
uid: Optional[str] = None
source_url: str
local_path: str
@classmethod
def validate(cls):
"""Constructor."""
if not cls.uid:
raise TldImproperlyConfigured(
"The `uid` property of the TLD source parser shall be defined."
)
@classmethod
def get_tld_names(cls, fail_silently: bool = False, retry_count: int = 0):
"""Get tld names.
:param fail_silently:
:param retry_count:
:return:
"""
cls.validate()
raise NotImplementedError(
"Your TLD source parser shall implement `get_tld_names` method."
)
@classmethod
def update_tld_names(cls, fail_silently: bool = False) -> bool:
"""Update the local copy of the TLD file.
:param fail_silently:
:return:
"""
try:
remote_file = urlopen(cls.source_url)
local_file = codecs_open(
project_dir(cls.local_path),
'wb',
encoding='utf8'
)
local_file.write(remote_file.read().decode('utf8'))
local_file.close()
remote_file.close()
except Exception as err:
if fail_silently:
return False
raise TldIOError(err)
return True

@ -0,0 +1,58 @@
from typing import Any
from . import defaults
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'get_setting',
'reset_settings',
'set_setting',
'settings',
)
class Settings(object):
"""Settings registry."""
def __init__(self):
self._settings = {}
self._settings_get = self._settings.get
def set(self, name: str, value: Any) -> None:
"""
Override default settings.
:param str name:
:param mixed value:
"""
self._settings[name] = value
def get(self, name: str, default: Any = None) -> Any:
"""
Gets a variable from local settings.
:param str name:
:param mixed default: Default value.
:return mixed:
"""
if name in self._settings:
return self._settings_get(name, default)
elif hasattr(defaults, name):
return getattr(defaults, name, default)
return default
def reset(self) -> None:
"""Reset settings."""
for name in defaults.__all__:
self.set(name, getattr(defaults, name))
settings = Settings()
get_setting = settings.get
set_setting = settings.set
reset_settings = settings.reset

@ -0,0 +1,14 @@
from os.path import dirname
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'DEBUG',
'NAMES_LOCAL_PATH_PARENT',
)
# Absolute base path that is prepended to NAMES_LOCAL_PATH
NAMES_LOCAL_PATH_PARENT = dirname(__file__)
DEBUG = False

@ -0,0 +1,56 @@
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'TldBadUrl',
'TldDomainNotFound',
'TldImproperlyConfigured',
'TldIOError',
)
class TldIOError(IOError):
"""TldIOError.
Supposed to be thrown when problems with reading/writing occur.
"""
class TldDomainNotFound(ValueError):
"""TldDomainNotFound.
Supposed to be thrown when domain name is not found (didn't match) the
local TLD policy.
"""
def __init__(self, domain_name):
super(TldDomainNotFound, self).__init__(
"Domain %s didn't match any existing TLD name!" % domain_name
)
class TldBadUrl(ValueError):
"""TldBadUrl.
Supposed to be thrown when bad URL is given.
"""
def __init__(self, url):
super(TldBadUrl, self).__init__("Is not a valid URL %s!" % url)
class TldImproperlyConfigured(Exception):
"""TldImproperlyConfigured.
Supposed to be thrown when code is improperly configured. Typical use-case
is when user tries to use `get_tld` function with both `search_public` and
`search_private` set to False.
"""
def __init__(self, msg=None):
if msg is None:
msg = "Improperly configured."
else:
msg = "Improperly configured. %s" % msg
super(TldImproperlyConfigured, self).__init__(msg)

@ -0,0 +1,22 @@
from os.path import abspath, join
from .conf import get_setting
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'project_dir',
'PROJECT_DIR',
)
def project_dir(base: str) -> str:
"""Project dir."""
tld_names_local_path_parent = get_setting('NAMES_LOCAL_PATH_PARENT')
return abspath(
join(tld_names_local_path_parent, base).replace('\\', '/')
)
PROJECT_DIR = project_dir

@ -0,0 +1,45 @@
from typing import Dict
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'Registry',
)
class Registry(type):
REGISTRY = {} # type: Dict[str, Registry]
def __new__(mcs, name, bases, attrs):
new_cls = type.__new__(mcs, name, bases, attrs)
# Here the name of the class is used as key but it could be any class
# parameter.
if getattr(new_cls, '_uid', None):
mcs.REGISTRY[new_cls._uid] = new_cls
return new_cls
@property
def _uid(cls) -> str:
return getattr(cls, 'uid', cls.__name__)
@classmethod
def reset(mcs) -> None:
mcs.REGISTRY = {}
@classmethod
def get(mcs, key, default=None):
return mcs.REGISTRY.get(key, default)
@classmethod
def items(mcs):
return mcs.REGISTRY.items()
# @classmethod
# def get_registry(mcs) -> Dict[str, Type]:
# return dict(mcs.REGISTRY)
#
# @classmethod
# def pop(mcs, uid) -> None:
# mcs.REGISTRY.pop(uid)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,67 @@
from typing import Any, Dict
from urllib.parse import SplitResult
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'Result',
)
class Result(object):
"""Container."""
__slots__ = ('subdomain', 'domain', 'tld', '__fld', 'parsed_url')
def __init__(self,
tld: str,
domain: str,
subdomain: str,
parsed_url: SplitResult):
self.tld = tld
self.domain = domain if domain != '' else tld
self.subdomain = subdomain
self.parsed_url = parsed_url
if domain:
self.__fld = f"{self.domain}.{self.tld}"
else:
self.__fld = self.tld
@property
def extension(self) -> str:
"""Alias of ``tld``.
:return str:
"""
return self.tld
suffix = extension
@property
def fld(self) -> str:
"""First level domain.
:return:
:rtype: str
"""
return self.__fld
def __str__(self) -> str:
return self.tld
__repr__ = __str__
@property
def __dict__(self) -> Dict[str, Any]: # type: ignore
"""Mimic __dict__ functionality.
:return:
:rtype: dict
"""
return {
'tld': self.tld,
'domain': self.domain,
'subdomain': self.subdomain,
'fld': self.fld,
'parsed_url': self.parsed_url,
}

@ -0,0 +1,8 @@
import unittest
from .test_core import *
from .test_commands import *
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
from functools import lru_cache
import logging
import socket
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'internet_available_only',
'log_info',
)
LOG_INFO = True
LOGGER = logging.getLogger(__name__)
def log_info(func):
"""Log some useful info."""
if not LOG_INFO:
return func
def inner(self, *args, **kwargs):
"""Inner."""
result = func(self, *args, **kwargs)
LOGGER.debug('\n\n%s', func.__name__)
LOGGER.debug('============================')
if func.__doc__:
LOGGER.debug('""" %s """', func.__doc__.strip())
LOGGER.debug('----------------------------')
if result is not None:
LOGGER.debug(result)
LOGGER.debug('\n++++++++++++++++++++++++++++')
return result
return inner
@lru_cache(maxsize=32)
def is_internet_available(host="8.8.8.8", port=53, timeout=3):
"""Check if internet is available.
Host: 8.8.8.8 (google-public-dns-a.google.com)
OpenPort: 53/tcp
Service: domain (DNS/TCP)
"""
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except socket.error as ex:
print(ex)
return False
def internet_available_only(func):
def inner(self, *args, **kwargs):
"""Inner."""
if not is_internet_available():
LOGGER.debug('\n\n%s', func.__name__)
LOGGER.debug('============================')
if func.__doc__:
LOGGER.debug('""" %s """', func.__doc__.strip())
LOGGER.debug('----------------------------')
LOGGER.debug("Skipping because no Internet connection available.")
LOGGER.debug('\n++++++++++++++++++++++++++++')
return None
result = func(self, *args, **kwargs)
return result
return inner

File diff suppressed because it is too large Load Diff

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
import logging
import unittest
import subprocess
from .base import log_info, internet_available_only
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'GPL 2.0/LGPL 2.1'
__all__ = ('TestCommands',)
LOGGER = logging.getLogger(__name__)
class TestCommands(unittest.TestCase):
"""Tld commands tests."""
def setUp(self):
"""Set up."""
@internet_available_only
@log_info
def test_1_update_tld_names_command(self):
"""Test updating the tld names (re-fetch mozilla source)."""
res = subprocess.check_output(['update-tld-names']).strip()
self.assertEqual(res, b'')
return res
@internet_available_only
@log_info
def test_1_update_tld_names_mozilla_command(self):
"""Test updating the tld names (re-fetch mozilla source)."""
res = subprocess.check_output(['update-tld-names', 'mozilla']).strip()
self.assertEqual(res, b'')
return res
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,842 @@
# -*- coding: utf-8 -*-
import copy
import logging
from os.path import abspath, join
import unittest
from tempfile import gettempdir
from typing import Type
from urllib.parse import urlsplit
from faker import Faker # type: ignore
from .. import defaults
from ..base import BaseTLDSourceParser
from ..conf import get_setting, reset_settings, set_setting
from ..exceptions import (
TldBadUrl,
TldDomainNotFound,
TldImproperlyConfigured,
TldIOError,
)
from ..helpers import project_dir
from ..registry import Registry
from ..utils import (
get_fld,
get_tld,
get_tld_names,
get_tld_names_container,
is_tld,
MozillaTLDSourceParser,
BaseMozillaTLDSourceParser,
parse_tld,
reset_tld_names,
update_tld_names,
update_tld_names_cli,
)
from .base import internet_available_only, log_info
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = ('TestCore',)
LOGGER = logging.getLogger(__name__)
class TestCore(unittest.TestCase):
"""Core tld functionality tests."""
@classmethod
def setUpClass(cls):
cls.faker = Faker()
cls.temp_dir = gettempdir()
def setUp(self):
"""Set up."""
self.good_patterns = [
{
'url': 'http://www.google.co.uk',
'fld': 'google.co.uk',
'subdomain': 'www',
'domain': 'google',
'suffix': 'co.uk',
'tld': 'co.uk',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://www.v2.google.co.uk',
'fld': 'google.co.uk',
'subdomain': 'www.v2',
'domain': 'google',
'suffix': 'co.uk',
'tld': 'co.uk',
'kwargs': {'fail_silently': True},
},
# No longer valid
# {
# 'url': 'http://www.me.congresodelalengua3.ar',
# 'tld': 'me.congresodelalengua3.ar',
# 'subdomain': 'www',
# 'domain': 'me',
# 'suffix': 'congresodelalengua3.ar',
# },
{
'url': u'http://хром.гугл.рф',
'fld': u'гугл.рф',
'subdomain': u'хром',
'domain': u'гугл',
'suffix': u'рф',
'tld': u'рф',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://www.google.co.uk:8001/lorem-ipsum/',
'fld': 'google.co.uk',
'subdomain': 'www',
'domain': 'google',
'suffix': 'co.uk',
'tld': 'co.uk',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://www.me.cloudfront.net',
'fld': 'me.cloudfront.net',
'subdomain': 'www',
'domain': 'me',
'suffix': 'cloudfront.net',
'tld': 'cloudfront.net',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://www.v2.forum.tech.google.co.uk:8001/'
'lorem-ipsum/',
'fld': 'google.co.uk',
'subdomain': 'www.v2.forum.tech',
'domain': 'google',
'suffix': 'co.uk',
'tld': 'co.uk',
'kwargs': {'fail_silently': True},
},
{
'url': 'https://pantheon.io/',
'fld': 'pantheon.io',
'subdomain': '',
'domain': 'pantheon',
'suffix': 'io',
'tld': 'io',
'kwargs': {'fail_silently': True},
},
{
'url': 'v2.www.google.com',
'fld': 'google.com',
'subdomain': 'v2.www',
'domain': 'google',
'suffix': 'com',
'tld': 'com',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': '//v2.www.google.com',
'fld': 'google.com',
'subdomain': 'v2.www',
'domain': 'google',
'suffix': 'com',
'tld': 'com',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': 'http://foo@bar.com',
'fld': 'bar.com',
'subdomain': '',
'domain': 'bar',
'suffix': 'com',
'tld': 'com',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://user:foo@bar.com',
'fld': 'bar.com',
'subdomain': '',
'domain': 'bar',
'suffix': 'com',
'tld': 'com',
'kwargs': {'fail_silently': True},
},
{
'url': 'https://faguoren.xn--fiqs8s',
'fld': 'faguoren.xn--fiqs8s',
'subdomain': '',
'domain': 'faguoren',
'suffix': 'xn--fiqs8s',
'tld': 'xn--fiqs8s',
'kwargs': {'fail_silently': True},
},
{
'url': 'blogs.lemonde.paris',
'fld': 'lemonde.paris',
'subdomain': 'blogs',
'domain': 'lemonde',
'suffix': 'paris',
'tld': 'paris',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': 'axel.brighton.ac.uk',
'fld': 'brighton.ac.uk',
'subdomain': 'axel',
'domain': 'brighton',
'suffix': 'ac.uk',
'tld': 'ac.uk',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': 'm.fr.blogspot.com.au',
'fld': 'fr.blogspot.com.au',
'subdomain': 'm',
'domain': 'fr',
'suffix': 'blogspot.com.au',
'tld': 'blogspot.com.au',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': u'help.www.福岡.jp',
'fld': u'www.福岡.jp',
'subdomain': 'help',
'domain': 'www',
'suffix': u'福岡.jp',
'tld': u'福岡.jp',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': u'syria.arabic.variant.سوريا',
'fld': u'variant.سوريا',
'subdomain': 'syria.arabic',
'domain': 'variant',
'suffix': u'سوريا',
'tld': u'سوريا',
'kwargs': {'fail_silently': True, 'fix_protocol': True},
},
{
'url': u'http://www.help.kawasaki.jp',
'fld': u'www.help.kawasaki.jp',
'subdomain': '',
'domain': 'www',
'suffix': u'help.kawasaki.jp',
'tld': u'help.kawasaki.jp',
'kwargs': {'fail_silently': True},
},
{
'url': u'http://www.city.kawasaki.jp',
'fld': u'city.kawasaki.jp',
'subdomain': 'www',
'domain': 'city',
'suffix': u'kawasaki.jp',
'tld': u'kawasaki.jp',
'kwargs': {'fail_silently': True},
},
{
'url': u'http://fedoraproject.org',
'fld': u'fedoraproject.org',
'subdomain': '',
'domain': 'fedoraproject',
'suffix': u'org',
'tld': u'org',
'kwargs': {'fail_silently': True},
},
{
'url': u'http://www.cloud.fedoraproject.org',
'fld': u'www.cloud.fedoraproject.org',
'subdomain': '',
'domain': 'www',
'suffix': u'cloud.fedoraproject.org',
'tld': u'cloud.fedoraproject.org',
'kwargs': {'fail_silently': True},
},
{
'url': u'https://www.john.app.os.fedoraproject.org',
'fld': u'john.app.os.fedoraproject.org',
'subdomain': 'www',
'domain': 'john',
'suffix': u'app.os.fedoraproject.org',
'tld': u'app.os.fedoraproject.org',
'kwargs': {'fail_silently': True},
},
{
'url': 'ftp://www.xn--mxail5aa.xn--11b4c3d',
'fld': 'xn--mxail5aa.xn--11b4c3d',
'subdomain': 'www',
'domain': 'xn--mxail5aa',
'suffix': 'xn--11b4c3d',
'tld': 'xn--11b4c3d',
'kwargs': {'fail_silently': True},
},
{
'url': 'http://cloud.fedoraproject.org',
'fld': 'cloud.fedoraproject.org',
'subdomain': '',
'domain': 'cloud.fedoraproject.org',
'suffix': 'cloud.fedoraproject.org',
'tld': 'cloud.fedoraproject.org',
'kwargs': {'fail_silently': True}
},
{
'url': 'github.io',
'fld': 'github.io',
'subdomain': '',
'domain': 'github.io',
'suffix': 'github.io',
'tld': 'github.io',
'kwargs': {'fail_silently': True, 'fix_protocol': True}
},
{
'url': urlsplit('http://lemonde.fr/article.html'),
'fld': 'lemonde.fr',
'subdomain': '',
'domain': 'lemonde',
'suffix': 'fr',
'tld': 'fr',
'kwargs': {'fail_silently': True}
},
{
'url': 'https://github.com....../barseghyanartur/tld/',
'fld': 'github.com',
'subdomain': '',
'domain': 'github',
'suffix': 'com',
'tld': 'com',
'kwargs': {'fail_silently': True}
},
]
self.bad_patterns = {
'v2.www.google.com': {
'exception': TldBadUrl,
},
'/index.php?a=1&b=2': {
'exception': TldBadUrl,
},
'http://www.tld.doesnotexist': {
'exception': TldDomainNotFound,
},
'https://2001:0db8:0000:85a3:0000:0000:ac1f:8001': {
'exception': TldDomainNotFound,
},
'http://192.169.1.1': {
'exception': TldDomainNotFound,
},
'http://localhost:8080': {
'exception': TldDomainNotFound,
},
'https://localhost': {
'exception': TldDomainNotFound,
},
'https://localhost2': {
'exception': TldImproperlyConfigured,
'kwargs': {'search_public': False, 'search_private': False},
},
}
self.invalid_tlds = {
'v2.www.google.com',
'tld.doesnotexist',
'2001:0db8:0000:85a3:0000:0000:ac1f',
'192.169.1.1',
'localhost',
'google.com',
}
self.tld_names_local_path_custom = project_dir(
join(
'tests',
'res',
'effective_tld_names_custom.dat.txt'
)
)
self.good_patterns_custom_parser = [
{
'url': 'http://www.foreverchild',
'fld': 'www.foreverchild',
'subdomain': '',
'domain': 'www',
'suffix': 'foreverchild',
'tld': 'foreverchild',
'kwargs': {
'fail_silently': True,
# 'parser_class': self.get_custom_parser_class(),
},
},
{
'url': 'http://www.v2.foreverchild',
'fld': 'v2.foreverchild',
'subdomain': 'www',
'domain': 'v2',
'suffix': 'foreverchild',
'tld': 'foreverchild',
'kwargs': {
'fail_silently': True,
# 'parser_class': self.get_custom_parser_class(),
},
},
]
reset_settings()
def tearDown(self):
"""Tear down."""
reset_settings()
Registry.reset()
@property
def good_url(self):
return self.good_patterns[0]['url']
@property
def bad_url(self):
return list(self.bad_patterns.keys())[0]
def get_custom_parser_class(
self,
uid: str = 'custom_mozilla',
source_url: str = None,
local_path: str = 'tests/res/effective_tld_names_custom.dat.txt'
) -> Type[BaseTLDSourceParser]:
# Define a custom TLD source parser class
parser_class = type(
'CustomMozillaTLDSourceParser',
(BaseMozillaTLDSourceParser,),
{
'uid': uid,
'source_url': source_url,
'local_path': local_path,
}
)
return parser_class
@log_info
def test_0_tld_names_loaded(self):
"""Test if tld names are loaded."""
get_fld('http://www.google.co.uk')
from ..utils import tld_names
res = len(tld_names) > 0
self.assertTrue(res)
return res
@internet_available_only
@log_info
def test_1_update_tld_names(self):
"""Test updating the tld names (re-fetch mozilla source)."""
res = update_tld_names(fail_silently=False)
self.assertTrue(res)
return res
@log_info
def test_2_fld_good_patterns_pass(self):
"""Test good URL patterns."""
res = []
for data in self.good_patterns:
_res = get_fld(data['url'], **data['kwargs'])
self.assertEqual(_res, data['fld'])
res.append(_res)
return res
@log_info
def test_3_fld_bad_patterns_pass(self):
"""Test bad URL patterns."""
res = []
for url, params in self.bad_patterns.items():
_res = get_fld(url, fail_silently=True)
self.assertEqual(_res, None)
res.append(_res)
return res
@log_info
def test_4_override_settings(self):
"""Testing settings override."""
def override_settings():
"""Override settings."""
return get_setting('DEBUG')
self.assertEqual(defaults.DEBUG, override_settings())
set_setting('DEBUG', True)
self.assertEqual(True, override_settings())
return override_settings()
@log_info
def test_5_tld_good_patterns_pass_parsed_object(self):
"""Test good URL patterns."""
res = []
for data in self.good_patterns:
kwargs = copy.copy(data['kwargs'])
kwargs['as_object'] = True
_res = get_tld(data['url'], **kwargs)
self.assertEqual(_res.tld, data['tld'])
self.assertEqual(_res.subdomain, data['subdomain'])
self.assertEqual(_res.domain, data['domain'])
self.assertEqual(_res.suffix, data['suffix'])
self.assertEqual(_res.fld, data['fld'])
self.assertEqual(
str(_res).encode('utf8'),
data['tld'].encode('utf8')
)
self.assertEqual(
_res.__dict__,
{
'tld': _res.tld,
'domain': _res.domain,
'subdomain': _res.subdomain,
'fld': _res.fld,
'parsed_url': _res.parsed_url,
}
)
res.append(_res)
return res
@log_info
def test_6_override_full_names_path(self):
default = project_dir('dummy.txt')
override_base = '/tmp/test'
set_setting('NAMES_LOCAL_PATH_PARENT', override_base)
modified = project_dir('dummy.txt')
self.assertNotEqual(default, modified)
self.assertEqual(modified, abspath('/tmp/test/dummy.txt'))
@log_info
def test_7_public_private(self):
res = get_fld(
'http://silly.cc.ua',
fail_silently=True,
search_private=False
)
self.assertEqual(res, None)
res = get_fld(
'http://silly.cc.ua',
fail_silently=True,
search_private=True
)
self.assertEqual(res, 'silly.cc.ua')
res = get_fld(
'mercy.compute.amazonaws.com',
fail_silently=True,
search_private=False,
fix_protocol=True
)
self.assertEqual(res, None)
res = get_fld(
'http://whatever.com',
fail_silently=True,
search_public=False
)
self.assertEqual(res, None)
@log_info
def test_8_fld_bad_patterns_exceptions(self):
"""Test exceptions."""
res = []
for url, params in self.bad_patterns.items():
kwargs = params['kwargs'] if 'kwargs' in params else {}
kwargs['fail_silently'] = False
with self.assertRaises(params['exception']):
_res = get_fld(url, **kwargs)
res.append(_res)
return res
@log_info
def test_9_tld_good_patterns_pass(self):
"""Test `get_tld` good URL patterns."""
res = []
for data in self.good_patterns:
_res = get_tld(data['url'], **data['kwargs'])
self.assertEqual(_res, data['tld'])
res.append(_res)
return res
@log_info
def test_10_tld_bad_patterns_pass(self):
"""Test `get_tld` bad URL patterns."""
res = []
for url, params in self.bad_patterns.items():
_res = get_tld(url, fail_silently=True)
self.assertEqual(_res, None)
res.append(_res)
return res
@log_info
def test_11_parse_tld_good_patterns(self):
"""Test `parse_tld` good URL patterns."""
res = []
for data in self.good_patterns:
_res = parse_tld(data['url'], **data['kwargs'])
self.assertEqual(
_res,
(data['tld'], data['domain'], data['subdomain'])
)
res.append(_res)
return res
@log_info
def test_12_is_tld_good_patterns(self):
"""Test `is_tld` good URL patterns."""
for data in self.good_patterns:
self.assertTrue(is_tld(data['tld']))
@log_info
def test_13_is_tld_bad_patterns(self):
"""Test `is_tld` bad URL patterns."""
for _tld in self.invalid_tlds:
self.assertFalse(is_tld(_tld))
@log_info
def test_14_fail_update_tld_names(self):
"""Test fail `update_tld_names`."""
parser_class = self.get_custom_parser_class(
uid='custom_mozilla_2',
source_url='i-do-not-exist'
)
# Assert raise TldIOError on wrong NAMES_SOURCE_URL
with self.assertRaises(TldIOError):
update_tld_names(fail_silently=False, parser_uid=parser_class.uid)
# Assert return False on wrong NAMES_SOURCE_URL
self.assertFalse(
update_tld_names(fail_silently=True, parser_uid=parser_class.uid)
)
@log_info
def test_15_fail_get_tld_names(self):
"""Test fail `update_tld_names`."""
parser_class = self.get_custom_parser_class(
uid='custom_mozilla_3',
source_url='i-do-not-exist',
local_path='/srv/tests/res/effective_tld_names_custom_3.dat.txt'
)
reset_tld_names()
# Assert raise TldIOError on wrong NAMES_SOURCE_URL
for params in self.good_patterns:
kwargs = {'url': params['url']}
kwargs.update(params['kwargs'])
kwargs['fail_silently'] = False
kwargs['parser_class'] = parser_class
with self.assertRaises(TldIOError):
get_tld(**kwargs)
@log_info
def test_15_fail_get_fld_wrong_kwargs(self):
"""Test fail `get_fld` with wrong kwargs."""
with self.assertRaises(TldImproperlyConfigured):
get_fld(self.good_url, as_object=True)
@log_info
def test_16_fail_parse_tld(self):
"""Test fail `parse_tld`.
Assert raise TldIOError on wrong `NAMES_SOURCE_URL` for `parse_tld`.
"""
parser_class = self.get_custom_parser_class(
source_url='i-do-not-exist'
)
parsed_tld = parse_tld(
self.bad_url,
fail_silently=False,
parser_class=parser_class
)
self.assertEqual(parsed_tld, (None, None, None))
@log_info
def test_17_get_tld_names_and_reset_tld_names(self):
"""Test fail `get_tld_names` and repair using `reset_tld_names`."""
tmp_filename = join(
gettempdir(),
f'{self.faker.uuid4()}.dat.txt'
)
parser_class = self.get_custom_parser_class(
source_url='i-do-not-exist',
local_path=tmp_filename
)
reset_tld_names()
with self.subTest('Assert raise TldIOError'):
# Assert raise TldIOError on wrong NAMES_SOURCE_URL for
# `get_tld_names`
with self.assertRaises(TldIOError):
get_tld_names(
fail_silently=False,
parser_class=parser_class
)
tmp_filename = join(
gettempdir(),
f'{self.faker.uuid4()}.dat.txt'
)
parser_class_2 = self.get_custom_parser_class(
source_url='i-do-not-exist-2',
local_path=tmp_filename
)
reset_tld_names()
with self.subTest('Assert get None'):
# Assert get None on wrong `NAMES_SOURCE_URL` for `get_tld_names`
self.assertIsNone(
get_tld_names(
fail_silently=True,
parser_class=parser_class_2
)
)
@internet_available_only
@log_info
def test_18_update_tld_names_cli(self):
"""Test the return code of the CLI version of `update_tld_names`."""
reset_tld_names()
res = update_tld_names_cli()
self.assertEqual(res, 0)
@log_info
def test_19_parse_tld_custom_tld_names_good_patterns(self):
"""Test `parse_tld` good URL patterns for custom tld names."""
res = []
for data in self.good_patterns_custom_parser:
kwargs = copy.copy(data['kwargs'])
kwargs['parser_class'] = self.get_custom_parser_class()
_res = parse_tld(data['url'], **kwargs)
self.assertEqual(
_res,
(data['tld'], data['domain'], data['subdomain'])
)
res.append(_res)
return res
@log_info
def test_20_tld_custom_tld_names_good_patterns_pass_parsed_object(self):
"""Test `get_tld` good URL patterns for custom tld names."""
res = []
for data in self.good_patterns_custom_parser:
kwargs = copy.copy(data['kwargs'])
kwargs.update({
'as_object': True,
'parser_class': self.get_custom_parser_class(),
})
_res = get_tld(data['url'], **kwargs)
self.assertEqual(_res.tld, data['tld'])
self.assertEqual(_res.subdomain, data['subdomain'])
self.assertEqual(_res.domain, data['domain'])
self.assertEqual(_res.suffix, data['suffix'])
self.assertEqual(_res.fld, data['fld'])
self.assertEqual(
str(_res).encode('utf8'),
data['tld'].encode('utf8')
)
self.assertEqual(
_res.__dict__,
{
'tld': _res.tld,
'domain': _res.domain,
'subdomain': _res.subdomain,
'fld': _res.fld,
'parsed_url': _res.parsed_url,
}
)
res.append(_res)
return res
@log_info
def test_21_reset_tld_names_for_custom_parser(self):
"""Test `reset_tld_names` for `tld_names_local_path`."""
res = []
parser_class = self.get_custom_parser_class()
for data in self.good_patterns_custom_parser:
kwargs = copy.copy(data['kwargs'])
kwargs.update({
'as_object': True,
'parser_class': self.get_custom_parser_class(),
})
_res = get_tld(data['url'], **kwargs)
self.assertEqual(_res.tld, data['tld'])
self.assertEqual(_res.subdomain, data['subdomain'])
self.assertEqual(_res.domain, data['domain'])
self.assertEqual(_res.suffix, data['suffix'])
self.assertEqual(_res.fld, data['fld'])
self.assertEqual(
str(_res).encode('utf8'),
data['tld'].encode('utf8')
)
self.assertEqual(
_res.__dict__,
{
'tld': _res.tld,
'domain': _res.domain,
'subdomain': _res.subdomain,
'fld': _res.fld,
'parsed_url': _res.parsed_url,
}
)
res.append(_res)
tld_names = get_tld_names_container()
self.assertIn(parser_class.local_path, tld_names)
reset_tld_names(parser_class.local_path)
self.assertNotIn(parser_class.local_path, tld_names)
return res
@log_info
def test_22_fail_define_custom_parser_class_without_uid(self):
"""Test fail define custom parser class without `uid`."""
class CustomParser(BaseTLDSourceParser):
pass
class AnotherCustomParser(BaseTLDSourceParser):
uid = 'another-custom-parser'
# Assert raise TldImproperlyConfigured
with self.assertRaises(TldImproperlyConfigured):
CustomParser.get_tld_names()
# Assert raise NotImplementedError
with self.assertRaises(NotImplementedError):
AnotherCustomParser.get_tld_names()
@log_info
def test_23_len_trie_nodes(self):
"""Test len of the trie nodes."""
get_tld('http://delusionalinsanity.com')
tld_names = get_tld_names_container()
self.assertGreater(
len(tld_names[MozillaTLDSourceParser.local_path]),
0
)
@log_info
def test_24_get_tld_names_no_arguments(self):
"""Test len of the trie nodes."""
tld_names = get_tld_names()
self.assertGreater(
len(tld_names),
0
)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,64 @@
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'Trie',
'TrieNode',
)
class TrieNode(object):
"""Class representing a single Trie node."""
__slots__ = ('children', 'exception', 'leaf', 'private')
def __init__(self):
self.children = None
self.exception = None
self.leaf = False
self.private = False
class Trie(object):
"""An adhoc Trie data structure to store tlds in reverse notation order."""
def __init__(self):
self.root = TrieNode()
self.__nodes = 0
def __len__(self):
return self.__nodes
def add(self, tld: str, private: bool = False) -> None:
node = self.root
# Iterating over the tld parts in reverse order
# for part in reversed(tld.split('.')):
tld_split = tld.split('.')
tld_split.reverse()
for part in tld_split:
if part.startswith('!'):
node.exception = part[1:]
break
# To save up some RAM, we initialize the children dict only
# when strictly necessary
if node.children is None:
node.children = {}
child = TrieNode()
else:
child = node.children.get(part)
if child is None:
child = TrieNode()
node.children[part] = child
node = child
node.leaf = True
if private:
node.private = True
self.__nodes += 1

@ -0,0 +1,624 @@
from __future__ import unicode_literals
import argparse
from codecs import open as codecs_open
from functools import lru_cache
# codecs_open = open
from os.path import isabs
import sys
from typing import Dict, Type, Union, Tuple, List, Optional
from urllib.parse import urlsplit, SplitResult
from .base import BaseTLDSourceParser
from .exceptions import (
TldBadUrl,
TldDomainNotFound,
TldImproperlyConfigured,
TldIOError,
)
from .helpers import project_dir
from .trie import Trie
from .registry import Registry
from .result import Result
__author__ = 'Artur Barseghyan'
__copyright__ = '2013-2020 Artur Barseghyan'
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
__all__ = (
'BaseMozillaTLDSourceParser',
'get_fld',
'get_tld',
'get_tld_names',
'get_tld_names_container',
'is_tld',
'MozillaTLDSourceParser',
'parse_tld',
'pop_tld_names_container',
'process_url',
'reset_tld_names',
'Result',
'tld_names',
'update_tld_names',
'update_tld_names_cli',
'update_tld_names_container',
)
tld_names: Dict[str, Trie] = {}
def get_tld_names_container() -> Dict[str, Trie]:
"""Get container of all tld names.
:return:
:rtype dict:
"""
global tld_names
return tld_names
def update_tld_names_container(tld_names_local_path: str,
trie_obj: Trie) -> None:
"""Update TLD Names container item.
:param tld_names_local_path:
:param trie_obj:
:return:
"""
global tld_names
# tld_names.update({tld_names_local_path: trie_obj})
tld_names[tld_names_local_path] = trie_obj
def pop_tld_names_container(tld_names_local_path: str) -> None:
"""Remove TLD names container item.
:param tld_names_local_path:
:return:
"""
global tld_names
tld_names.pop(tld_names_local_path, None)
@lru_cache(maxsize=128, typed=True)
def update_tld_names(
fail_silently: bool = False,
parser_uid: str = None
) -> bool:
"""Update TLD names.
:param fail_silently:
:param parser_uid:
:return:
"""
results: List[bool] = []
results_append = results.append
if parser_uid:
parser_cls = Registry.get(parser_uid, None)
if parser_cls and parser_cls.source_url:
results_append(
parser_cls.update_tld_names(fail_silently=fail_silently)
)
else:
for parser_uid, parser_cls in Registry.items():
if parser_cls and parser_cls.source_url:
results_append(
parser_cls.update_tld_names(fail_silently=fail_silently)
)
return all(results)
def update_tld_names_cli() -> int:
"""CLI wrapper for update_tld_names.
Since update_tld_names returns True on success, we need to negate the
result to match CLI semantics.
"""
parser = argparse.ArgumentParser(description='Update TLD names')
parser.add_argument(
'parser_uid',
nargs='?',
default=None,
help="UID of the parser to update TLD names for.",
)
parser.add_argument(
'--fail-silently',
dest="fail_silently",
default=False,
action='store_true',
help="Fail silently",
)
args = parser.parse_args(sys.argv[1:])
parser_uid = args.parser_uid
fail_silently = args.fail_silently
return int(
not update_tld_names(
parser_uid=parser_uid,
fail_silently=fail_silently
)
)
def get_tld_names(
fail_silently: bool = False,
retry_count: int = 0,
parser_class: Type[BaseTLDSourceParser] = None
) -> Dict[str, Trie]:
"""Build the ``tlds`` list if empty. Recursive.
:param fail_silently: If set to True, no exceptions are raised and None
is returned on failure.
:param retry_count: If greater than 1, we raise an exception in order
to avoid infinite loops.
:param parser_class:
:type fail_silently: bool
:type retry_count: int
:type parser_class: BaseTLDSourceParser
:return: List of TLD names
:rtype: obj:`tld.utils.Trie`
"""
if not parser_class:
parser_class = MozillaTLDSourceParser
return parser_class.get_tld_names(
fail_silently=fail_silently,
retry_count=retry_count
)
# **************************************************************************
# **************************** Parser classes ******************************
# **************************************************************************
class BaseMozillaTLDSourceParser(BaseTLDSourceParser):
@classmethod
def get_tld_names(
cls,
fail_silently: bool = False,
retry_count: int = 0
) -> Optional[Dict[str, Trie]]:
"""Parse.
:param fail_silently:
:param retry_count:
:return:
"""
if retry_count > 1:
if fail_silently:
return None
else:
raise TldIOError
global tld_names
_tld_names = tld_names
# _tld_names = get_tld_names_container()
# If already loaded, return
if (
cls.local_path in _tld_names
and _tld_names[cls.local_path] is not None
):
return _tld_names
try:
# Load the TLD names file
if isabs(cls.local_path):
local_path = cls.local_path
else:
local_path = project_dir(cls.local_path)
local_file = codecs_open(
local_path,
'r',
encoding='utf8'
)
trie = Trie()
trie_add = trie.add # Performance opt
# Make a list of it all, strip all garbage
private_section = False
for line in local_file:
if '===BEGIN PRIVATE DOMAINS===' in line:
private_section = True
# Puny code TLD names
if '// xn--' in line:
line = line.split()[1]
if line[0] in ('/', '\n'):
continue
trie_add(
f'{line.strip()}',
private=private_section
)
update_tld_names_container(cls.local_path, trie)
local_file.close()
except IOError as err:
# Grab the file
cls.update_tld_names(
fail_silently=fail_silently
)
# Increment ``retry_count`` in order to avoid infinite loops
retry_count += 1
# Run again
return cls.get_tld_names(
fail_silently=fail_silently,
retry_count=retry_count
)
except Exception as err:
if fail_silently:
return None
else:
raise err
finally:
try:
local_file.close()
except Exception:
pass
return _tld_names
class MozillaTLDSourceParser(BaseMozillaTLDSourceParser):
"""Mozilla TLD source."""
uid: str = 'mozilla'
source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat'
local_path: str = 'res/effective_tld_names.dat.txt'
# **************************************************************************
# **************************** Core functions ******************************
# **************************************************************************
def process_url(
url: str,
fail_silently: bool = False,
fix_protocol: bool = False,
search_public: bool = True,
search_private: bool = True,
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]:
"""Process URL.
:param parser_class:
:param url:
:param fail_silently:
:param fix_protocol:
:param search_public:
:param search_private:
:return:
"""
if not (search_public or search_private):
raise TldImproperlyConfigured(
"Either `search_public` or `search_private` (or both) shall be "
"set to True."
)
# Init
_tld_names = get_tld_names(
fail_silently=fail_silently,
parser_class=parser_class
)
if not isinstance(url, SplitResult):
url = url.lower()
if (
fix_protocol and not url.startswith(('//', 'http://', 'https://'))
):
url = f'https://{url}'
# Get parsed URL as we might need it later
parsed_url = urlsplit(url)
else:
parsed_url = url
# Get (sub) domain name
domain_name = parsed_url.hostname
if not domain_name:
if fail_silently:
return None, None, parsed_url
else:
raise TldBadUrl(url=url)
# This will correctly handle dots at the end of domain name in URLs like
# https://github.com............/barseghyanartur/tld/
if domain_name.endswith('.'):
domain_name = domain_name.rstrip('.')
domain_parts = domain_name.split('.')
tld_names_local_path = parser_class.local_path
# Now we query our Trie iterating on the domain parts in reverse order
node = _tld_names[tld_names_local_path].root
current_length = 0
tld_length = 0
match = None
len_domain_parts = len(domain_parts)
for i in range(len_domain_parts-1, -1, -1):
part = domain_parts[i]
# Cannot go deeper
if node.children is None:
break
# Exception
if part == node.exception:
break
child = node.children.get(part)
# Wildcards
if child is None:
child = node.children.get('*')
# If the current part is not in current node's children, we can stop
if child is None:
break
# Else we move deeper and increment our tld offset
current_length += 1
node = child
if node.leaf:
tld_length = current_length
match = node
# Checking the node we finished on is a leaf and is one we allow
if (
(match is None) or
(not match.leaf) or
(not search_public and not match.private) or
(not search_private and match.private)
):
if fail_silently:
return None, None, parsed_url
else:
raise TldDomainNotFound(domain_name=domain_name)
if len_domain_parts == tld_length:
non_zero_i = -1 # hostname = tld
else:
non_zero_i = max(1, len_domain_parts - tld_length)
return domain_parts, non_zero_i, parsed_url
def get_fld(
url: str,
fail_silently: bool = False,
fix_protocol: bool = False,
search_public: bool = True,
search_private: bool = True,
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser,
**kwargs
) -> Optional[str]:
"""Extract the first level domain.
Extract the top level domain based on the mozilla's effective TLD names
dat file. Returns a string. May throw ``TldBadUrl`` or
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
match found respectively.
:param url: URL to get top level domain from.
:param fail_silently: If set to True, no exceptions are raised and None
is returned on failure.
:param fix_protocol: If set to True, missing or wrong protocol is
ignored (https is appended instead).
:param search_public: If set to True, search in public domains.
:param search_private: If set to True, search in private domains.
:param parser_class:
:type url: str
:type fail_silently: bool
:type fix_protocol: bool
:type search_public: bool
:type search_private: bool
:return: String with top level domain (if ``as_object`` argument
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
argument is set to True); returns None on failure.
:rtype: str
"""
if 'as_object' in kwargs:
raise TldImproperlyConfigured(
"`as_object` argument is deprecated for `get_fld`. Use `get_tld` "
"instead."
)
domain_parts, non_zero_i, parsed_url = process_url(
url=url,
fail_silently=fail_silently,
fix_protocol=fix_protocol,
search_public=search_public,
search_private=search_private,
parser_class=parser_class
)
if domain_parts is None:
return None
# This should be None when domain_parts is None
# but mypy isn't quite smart enough to figure that out yet
assert non_zero_i is not None
if non_zero_i < 0:
# hostname = tld
return parsed_url.hostname
return ".".join(domain_parts[non_zero_i-1:])
def get_tld(
url: str,
fail_silently: bool = False,
as_object: bool = False,
fix_protocol: bool = False,
search_public: bool = True,
search_private: bool = True,
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
) -> Optional[Union[str, Result]]:
"""Extract the top level domain.
Extract the top level domain based on the mozilla's effective TLD names
dat file. Returns a string. May throw ``TldBadUrl`` or
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
match found respectively.
:param url: URL to get top level domain from.
:param fail_silently: If set to True, no exceptions are raised and None
is returned on failure.
:param as_object: If set to True, ``tld.utils.Result`` object is returned,
``domain``, ``suffix`` and ``tld`` properties.
:param fix_protocol: If set to True, missing or wrong protocol is
ignored (https is appended instead).
:param search_public: If set to True, search in public domains.
:param search_private: If set to True, search in private domains.
:param parser_class:
:type url: str
:type fail_silently: bool
:type as_object: bool
:type fix_protocol: bool
:type search_public: bool
:type search_private: bool
:return: String with top level domain (if ``as_object`` argument
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
argument is set to True); returns None on failure.
:rtype: str
"""
domain_parts, non_zero_i, parsed_url = process_url(
url=url,
fail_silently=fail_silently,
fix_protocol=fix_protocol,
search_public=search_public,
search_private=search_private,
parser_class=parser_class
)
if domain_parts is None:
return None
# This should be None when domain_parts is None
# but mypy isn't quite smart enough to figure that out yet
assert non_zero_i is not None
if not as_object:
if non_zero_i < 0:
# hostname = tld
return parsed_url.hostname
return ".".join(domain_parts[non_zero_i:])
if non_zero_i < 0:
# hostname = tld
subdomain = ""
domain = ""
# This is checked in process_url but the type is ambiguous (Optional[str])
# so this assertion is just to satisfy mypy
assert parsed_url.hostname is not None, "No hostname in URL"
_tld = parsed_url.hostname
else:
subdomain = ".".join(domain_parts[:non_zero_i-1])
domain = ".".join(
domain_parts[non_zero_i-1:non_zero_i]
)
_tld = ".".join(domain_parts[non_zero_i:])
return Result(
subdomain=subdomain,
domain=domain,
tld=_tld,
parsed_url=parsed_url
)
def parse_tld(
url: str,
fail_silently: bool = False,
fix_protocol: bool = False,
search_public: bool = True,
search_private: bool = True,
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
) -> Union[Tuple[None, None, None], Tuple[str, str, str]]:
"""Parse TLD into parts.
:param url:
:param fail_silently:
:param fix_protocol:
:param search_public:
:param search_private:
:param parser_class:
:return: Tuple (tld, domain, subdomain)
:rtype: tuple
"""
try:
obj = get_tld(
url,
fail_silently=fail_silently,
as_object=True,
fix_protocol=fix_protocol,
search_public=search_public,
search_private=search_private,
parser_class=parser_class
)
if obj is None:
return None, None, None
return obj.tld, obj.domain, obj.subdomain # type: ignore
except (
TldBadUrl,
TldDomainNotFound,
TldImproperlyConfigured,
TldIOError
):
pass
return None, None, None
def is_tld(
value: str,
search_public: bool = True,
search_private: bool = True,
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
) -> bool:
"""Check if given URL is tld.
:param value: URL to get top level domain from.
:param search_public: If set to True, search in public domains.
:param search_private: If set to True, search in private domains.
:param parser_class:
:type value: str
:type search_public: bool
:type search_private: bool
:return:
:rtype: bool
"""
_tld = get_tld(
url=value,
fail_silently=True,
fix_protocol=True,
search_public=search_public,
search_private=search_private,
parser_class=parser_class
)
return value == _tld
def reset_tld_names(tld_names_local_path: str = None) -> None:
"""Reset the ``tld_names`` to empty value.
If ``tld_names_local_path`` is given, removes specified
entry from ``tld_names`` instead.
:param tld_names_local_path:
:type tld_names_local_path: str
:return:
"""
if tld_names_local_path:
pop_tld_names_container(tld_names_local_path)
else:
global tld_names
tld_names = {}

@ -48,6 +48,22 @@
</table>
</div>
</div>
<br>
<h4>Options</h4>
<hr/>
<div class="row">
<div class="col-sm-3 text-right">
<b>Do not notify for manual actions</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input" id="settings-general-dont_notify_manual_actions"
name="settings-general-dont_notify_manual_actions">
<span class="custom-control-label" for="settings-general-dont_notify_manual_actions"></span>
</label>
<label>Suppress notifications when manually download/upload subtitles.</label>
</div>
</div>
</form>
</div>
@ -120,6 +136,9 @@
$('#save_button_checkmark').hide();
$('#save_button').prop('disabled', true).css('cursor', 'not-allowed');
// Set Checkbox input values
$('#settings-general-dont_notify_manual_actions').prop('checked', {{'true' if settings.general.getboolean('dont_notify_manual_actions') else 'false'}});
var table = $('#notification_providers').DataTable({
select: {
style: 'single'
@ -192,6 +211,11 @@
$('#save_button').on('click', function() {
var formdata = new FormData(document.getElementById("settings_form"));
// Make sure all checkbox input are sent with true/false value
$('input[type=checkbox]').each(function () {
formdata.set($(this).prop('id'), $(this).prop('checked'));
});
formdata.append('notification_providers', JSON.stringify(table.rows().data().toArray()));
$.ajax({

@ -272,6 +272,117 @@
<label>Re-encode downloaded Subtitles to UTF8. Should be left enabled in most case.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Hearing Impaired</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="remove_HI">
<span class="custom-control-label"></span>
</label>
<label>Removes tags, text and characters from subtitles that are meant for hearing impaired people.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Remove Tags</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="remove_tags">
<span class="custom-control-label"></span>
</label>
<label>Removes all possible style tags from the subtitle, such as font, bold, color etc.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>OCR Fixes</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="OCR_fixes">
<span class="custom-control-label"></span>
</label>
<label>Fix issues that happen when a subtitle gets converted from bitmap to text through OCR.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Common Fixes</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="common">
<span class="custom-control-label"></span>
</label>
<label>Fix common and whitespace/punctuation issues in subtitles.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Fix Uppercase</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="fix_uppercase">
<span class="custom-control-label"></span>
</label>
<label>Tries to make subtitles that are completely uppercase readable.</label>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Color</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="subzero_color">
<span class="custom-control-label"></span>
</label>
<label>Adds color to your subtitles (for playback devices/software that don't ship their own color modes; only works for players that support color tags).</label>
</div>
</div>
<div id="subzero_color_div">
<div class="row">
<div class="col-sm-4 text-right">
<b>Color Name</b>
</div>
<div class="form-group col-sm-5">
<select class="form-control selectpicker" id="subzero_color_name">
<option value="white">White</option>
<option value="light-grey">Light Grey</option>
<option value="red">Red</option>
<option value="green">Green</option>
<option value="yellow">Yellow</option>
<option value="blue">Blue</option>
<option value="magenta">Magenta</option>
<option value="cyan">Cyan</option>
<option value="black">Black</option>
<option value="dark-red">Dark Red</option>
<option value="dark-green">Dark Green</option>
<option value="dark-yellow">Dark Yellow</option>
<option value="dark-blue">Dark Blue</option>
<option value="dark-magenta">Dark Magenta</option>
<option value="dark-cyan">Dark Cyan</option>
<option value="dark-grey">Dark Grey</option>
</select>
</div>
</div>
</div>
<div class="row">
<div class="col-sm-3 text-right">
<b>Reverse RTL</b>
</div>
<div class="form-group col-sm-8">
<label class="custom-control custom-checkbox">
<input type="checkbox" class="custom-control-input subzero_mods" id="reverse_rtl">
<span class="custom-control-label"></span>
</label>
<label>Reverses the punctuation in right-to-left subtitles for problematic playback devices.</label>
</div>
</div>
{% if not os.startswith('win') %}
<div class="row">
<div class="col-sm-3 text-right">
@ -519,6 +630,14 @@
}
});
$('#subzero_color').on('change', function () {
if ($(this).prop('checked')) {
$('#subzero_color_div').show();
} else {
$('#subzero_color_div').hide();
}
});
$('#settings-general-chmod_enabled').on('change', function () {
if ($(this).prop('checked')) {
$('#chmod_div').show();
@ -566,9 +685,35 @@
$('#settings-general-use_postprocessing_threshold').prop('checked', {{'true' if settings.general.getboolean('use_postprocessing_threshold') else 'false'}}).trigger('change');
$('#settings-general-use_postprocessing_threshold_movie').prop('checked', {{'true' if settings.general.getboolean('use_postprocessing_threshold_movie') else 'false'}}).trigger('change');
$('.subzero_mods').prop('checked', false).trigger('change');
{% if settings.general.subzero_mods %}
$('{{settings.general.subzero_mods}}'.split(',')).each( function(i, item) {
if (item.startsWith('color'))
{
var color_name = 'white';
var m = item.match(/color\(name=(.*)\)/);
if (m != null && m.length > 1) color_name = m[1];
$('#subzero_color_name').val(color_name).trigger('change');
item = 'subzero_color';
}
$("[id=" + item + "]").prop('checked', true).trigger('change');
})
{% endif %}
$('#save_button').on('click', function () {
var formdata = new FormData(document.getElementById("settings_form"));
var enabled_subzero_mods = $(".subzero_mods").map(function () {
if ($(this).prop('checked')) {
if ($(this).attr('id') == 'subzero_color')
{
return 'color(name=' + $('#subzero_color_name').val() + ')';
}
else return $(this).attr('id');
}
}).get().join(',');
formdata.append('settings-general-subzero_mods', enabled_subzero_mods)
// Make sure all checkbox input are sent with true/false value
$('input[type=checkbox]').each(function () {
formdata.set($(this).prop('id'), $(this).prop('checked'));

Loading…
Cancel
Save