from __future__ import unicode_literals import argparse from codecs import open as codecs_open from functools import lru_cache # codecs_open = open from os.path import isabs import sys from typing import Dict, Type, Union, Tuple, List, Optional from urllib.parse import urlsplit, SplitResult from .base import BaseTLDSourceParser, Registry from .exceptions import ( TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError, ) from .helpers import project_dir from .trie import Trie from .result import Result __author__ = "Artur Barseghyan" __copyright__ = "2013-2021 Artur Barseghyan" __license__ = "MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later" __all__ = ( "BaseMozillaTLDSourceParser", "get_fld", "get_tld", "get_tld_names", "get_tld_names_container", "is_tld", "MozillaTLDSourceParser", "MozillaPublicOnlyTLDSourceParser", "parse_tld", "pop_tld_names_container", "process_url", "reset_tld_names", "Result", "tld_names", "update_tld_names", "update_tld_names_cli", "update_tld_names_container", ) tld_names: Dict[str, Trie] = {} def get_tld_names_container() -> Dict[str, Trie]: """Get container of all tld names. :return: :rtype dict: """ global tld_names return tld_names def update_tld_names_container( tld_names_local_path: str, trie_obj: Trie ) -> None: """Update TLD Names container item. :param tld_names_local_path: :param trie_obj: :return: """ global tld_names # tld_names.update({tld_names_local_path: trie_obj}) tld_names[tld_names_local_path] = trie_obj def pop_tld_names_container(tld_names_local_path: str) -> None: """Remove TLD names container item. :param tld_names_local_path: :return: """ global tld_names tld_names.pop(tld_names_local_path, None) @lru_cache(maxsize=128, typed=True) def update_tld_names( fail_silently: bool = False, parser_uid: str = None ) -> bool: """Update TLD names. :param fail_silently: :param parser_uid: :return: """ results: List[bool] = [] results_append = results.append if parser_uid: parser_cls = Registry.get(parser_uid, None) if parser_cls and parser_cls.source_url: results_append( parser_cls.update_tld_names(fail_silently=fail_silently) ) else: for parser_uid, parser_cls in Registry.items(): if parser_cls and parser_cls.source_url: results_append( parser_cls.update_tld_names(fail_silently=fail_silently) ) return all(results) def update_tld_names_cli() -> int: """CLI wrapper for update_tld_names. Since update_tld_names returns True on success, we need to negate the result to match CLI semantics. """ parser = argparse.ArgumentParser(description="Update TLD names") parser.add_argument( "parser_uid", nargs="?", default=None, help="UID of the parser to update TLD names for.", ) parser.add_argument( "--fail-silently", dest="fail_silently", default=False, action="store_true", help="Fail silently", ) args = parser.parse_args(sys.argv[1:]) parser_uid = args.parser_uid fail_silently = args.fail_silently return int( not update_tld_names( parser_uid=parser_uid, fail_silently=fail_silently ) ) def get_tld_names( fail_silently: bool = False, retry_count: int = 0, parser_class: Type[BaseTLDSourceParser] = None, ) -> Dict[str, Trie]: """Build the ``tlds`` list if empty. Recursive. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param retry_count: If greater than 1, we raise an exception in order to avoid infinite loops. :param parser_class: :type fail_silently: bool :type retry_count: int :type parser_class: BaseTLDSourceParser :return: List of TLD names :rtype: obj:`tld.utils.Trie` """ if not parser_class: parser_class = MozillaTLDSourceParser return parser_class.get_tld_names( fail_silently=fail_silently, retry_count=retry_count ) # ************************************************************************** # **************************** Parser classes ****************************** # ************************************************************************** class BaseMozillaTLDSourceParser(BaseTLDSourceParser): @classmethod def get_tld_names( cls, fail_silently: bool = False, retry_count: int = 0 ) -> Optional[Dict[str, Trie]]: """Parse. :param fail_silently: :param retry_count: :return: """ if retry_count > 1: if fail_silently: return None else: raise TldIOError global tld_names _tld_names = tld_names # _tld_names = get_tld_names_container() # If already loaded, return if ( cls.local_path in _tld_names and _tld_names[cls.local_path] is not None ): return _tld_names try: # Load the TLD names file if isabs(cls.local_path): local_path = cls.local_path else: local_path = project_dir(cls.local_path) local_file = codecs_open(local_path, "r", encoding="utf8") trie = Trie() trie_add = trie.add # Performance opt # Make a list of it all, strip all garbage private_section = False include_private = cls.include_private for line in local_file: if "===BEGIN PRIVATE DOMAINS===" in line: private_section = True if private_section and not include_private: break # Puny code TLD names if "// xn--" in line: line = line.split()[1] if line[0] in ("/", "\n"): continue trie_add(f"{line.strip()}", private=private_section) update_tld_names_container(cls.local_path, trie) local_file.close() except IOError as err: # Grab the file cls.update_tld_names(fail_silently=fail_silently) # Increment ``retry_count`` in order to avoid infinite loops retry_count += 1 # Run again return cls.get_tld_names( fail_silently=fail_silently, retry_count=retry_count ) except Exception as err: if fail_silently: return None else: raise err finally: try: local_file.close() except Exception: pass return _tld_names class MozillaTLDSourceParser(BaseMozillaTLDSourceParser): """Mozilla TLD source.""" uid: str = "mozilla" source_url: str = "https://publicsuffix.org/list/public_suffix_list.dat" local_path: str = "res/effective_tld_names.dat.txt" class MozillaPublicOnlyTLDSourceParser(BaseMozillaTLDSourceParser): """Mozilla TLD source.""" uid: str = "mozilla_public_only" source_url: str = ( "https://publicsuffix.org/list/public_suffix_list.dat?publiconly" ) local_path: str = "res/effective_tld_names_public_only.dat.txt" include_private: bool = False # ************************************************************************** # **************************** Core functions ****************************** # ************************************************************************** def process_url( url: Union[str, SplitResult], fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser, ) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]: """Process URL. :param parser_class: :param url: :param fail_silently: :param fix_protocol: :param search_public: :param search_private: :return: """ if not (search_public or search_private): raise TldImproperlyConfigured( "Either `search_public` or `search_private` (or both) shall be " "set to True." ) # Init _tld_names = get_tld_names( fail_silently=fail_silently, parser_class=parser_class ) if not isinstance(url, SplitResult): if fix_protocol and not url.startswith(("//", "http://", "https://")): url = f"https://{url}" # Get parsed URL as we might need it later try: parsed_url = urlsplit(url) except ValueError as e: if fail_silently: parsed_url = url else: raise e else: parsed_url = url # Get (sub) domain name try: domain_name = parsed_url.hostname except AttributeError as e: if fail_silently: domain_name = None else: raise e if not domain_name: if fail_silently: return None, None, parsed_url else: raise TldBadUrl(url=url) domain_name = domain_name.lower() # This will correctly handle dots at the end of domain name in URLs like # https://github.com............/barseghyanartur/tld/ if domain_name.endswith("."): domain_name = domain_name.rstrip(".") domain_parts = domain_name.split(".") tld_names_local_path = parser_class.local_path # Now we query our Trie iterating on the domain parts in reverse order node = _tld_names[tld_names_local_path].root current_length = 0 tld_length = 0 match = None len_domain_parts = len(domain_parts) for i in range(len_domain_parts - 1, -1, -1): part = domain_parts[i] # Cannot go deeper if node.children is None: break # Exception if part == node.exception: break child = node.children.get(part) # Wildcards if child is None: child = node.children.get("*") # If the current part is not in current node's children, we can stop if child is None: break # Else we move deeper and increment our tld offset current_length += 1 node = child if node.leaf: tld_length = current_length match = node # Checking the node we finished on is a leaf and is one we allow if ( (match is None) or (not match.leaf) or (not search_public and not match.private) or (not search_private and match.private) ): if fail_silently: return None, None, parsed_url else: raise TldDomainNotFound(domain_name=domain_name) if len_domain_parts == tld_length: non_zero_i = -1 # hostname = tld else: non_zero_i = max(1, len_domain_parts - tld_length) return domain_parts, non_zero_i, parsed_url def get_fld( url: Union[str, SplitResult], fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = None, **kwargs, ) -> Optional[str]: """Extract the first level domain. Extract the top level domain based on the mozilla's effective TLD names dat file. Returns a string. May throw ``TldBadUrl`` or ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD match found respectively. :param url: URL to get top level domain from. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param fix_protocol: If set to True, missing or wrong protocol is ignored (https is appended instead). :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type url: str | SplitResult :type fail_silently: bool :type fix_protocol: bool :type search_public: bool :type search_private: bool :return: String with top level domain (if ``as_object`` argument is set to False) or a ``tld.utils.Result`` object (if ``as_object`` argument is set to True); returns None on failure. :rtype: str """ if "as_object" in kwargs: raise TldImproperlyConfigured( "`as_object` argument is deprecated for `get_fld`. Use `get_tld` " "instead." ) if not parser_class: parser_class = ( MozillaTLDSourceParser if search_private else MozillaPublicOnlyTLDSourceParser ) domain_parts, non_zero_i, parsed_url = process_url( url=url, fail_silently=fail_silently, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class, ) if domain_parts is None: return None # This should be None when domain_parts is None # but mypy isn't quite smart enough to figure that out yet assert non_zero_i is not None if non_zero_i < 0: # hostname = tld return parsed_url.hostname return ".".join(domain_parts[non_zero_i - 1 :]) def get_tld( url: Union[str, SplitResult], fail_silently: bool = False, as_object: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = None, ) -> Optional[Union[str, Result]]: """Extract the top level domain. Extract the top level domain based on the mozilla's effective TLD names dat file. Returns a string. May throw ``TldBadUrl`` or ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD match found respectively. :param url: URL to get top level domain from. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param as_object: If set to True, ``tld.utils.Result`` object is returned, ``domain``, ``suffix`` and ``tld`` properties. :param fix_protocol: If set to True, missing or wrong protocol is ignored (https is appended instead). :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type url: str | SplitResult :type fail_silently: bool :type as_object: bool :type fix_protocol: bool :type search_public: bool :type search_private: bool :return: String with top level domain (if ``as_object`` argument is set to False) or a ``tld.utils.Result`` object (if ``as_object`` argument is set to True); returns None on failure. :rtype: str """ if not parser_class: parser_class = ( MozillaTLDSourceParser if search_private else MozillaPublicOnlyTLDSourceParser ) domain_parts, non_zero_i, parsed_url = process_url( url=url, fail_silently=fail_silently, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class, ) if domain_parts is None: return None # This should be None when domain_parts is None # but mypy isn't quite smart enough to figure that out yet assert non_zero_i is not None if not as_object: if non_zero_i < 0: # hostname = tld return parsed_url.hostname return ".".join(domain_parts[non_zero_i:]) if non_zero_i < 0: # hostname = tld subdomain = "" domain = "" # This is checked in `process_url`, but the type is # ambiguous (Optional[str]) so this assertion is just to satisfy mypy assert parsed_url.hostname is not None, "No hostname in URL" _tld = parsed_url.hostname else: subdomain = ".".join(domain_parts[: non_zero_i - 1]) domain = ".".join(domain_parts[non_zero_i - 1 : non_zero_i]) _tld = ".".join(domain_parts[non_zero_i:]) return Result( subdomain=subdomain, domain=domain, tld=_tld, parsed_url=parsed_url ) def parse_tld( url: Union[str, SplitResult], fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = None, ) -> Union[Tuple[None, None, None], Tuple[str, str, str]]: """Parse TLD into parts. :param url: :param fail_silently: :param fix_protocol: :param search_public: :param search_private: :param parser_class: :return: Tuple (tld, domain, subdomain) :rtype: tuple """ if not parser_class: parser_class = ( MozillaTLDSourceParser if search_private else MozillaPublicOnlyTLDSourceParser ) try: obj = get_tld( url, fail_silently=fail_silently, as_object=True, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class, ) if obj is None: return None, None, None return obj.tld, obj.domain, obj.subdomain # type: ignore except (TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError): pass return None, None, None def is_tld( value: Union[str, SplitResult], search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = None, ) -> bool: """Check if given URL is tld. :param value: URL to get top level domain from. :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type value: str :type search_public: bool :type search_private: bool :return: :rtype: bool """ if not parser_class: parser_class = ( MozillaTLDSourceParser if search_private else MozillaPublicOnlyTLDSourceParser ) _tld = get_tld( url=value, fail_silently=True, fix_protocol=True, search_public=search_public, search_private=search_private, parser_class=parser_class, ) return value == _tld def reset_tld_names(tld_names_local_path: str = None) -> None: """Reset the ``tld_names`` to empty value. If ``tld_names_local_path`` is given, removes specified entry from ``tld_names`` instead. :param tld_names_local_path: :type tld_names_local_path: str :return: """ if tld_names_local_path: pop_tld_names_container(tld_names_local_path) else: global tld_names tld_names = {}