You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/jinja2/utils.py

756 lines
23 KiB

import enum
5 years ago
import json
import os
import re
import typing as t
from collections import abc
5 years ago
from collections import deque
from random import choice
from random import randrange
5 years ago
from threading import Lock
from types import CodeType
from urllib.parse import quote_from_bytes
5 years ago
import markupsafe
5 years ago
if t.TYPE_CHECKING:
import typing_extensions as te
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
5 years ago
# special singleton representing missing values for the runtime
missing: t.Any = type("MissingType", (), {"__repr__": lambda x: "missing"})()
5 years ago
internal_code: t.MutableSet[CodeType] = set()
5 years ago
concat = "".join
5 years ago
def pass_context(f: F) -> F:
"""Pass the :class:`~jinja2.runtime.Context` as the first argument
to the decorated function when called while rendering a template.
5 years ago
Can be used on functions, filters, and tests.
5 years ago
If only ``Context.eval_context`` is needed, use
:func:`pass_eval_context`. If only ``Context.environment`` is
needed, use :func:`pass_environment`.
.. versionadded:: 3.0.0
Replaces ``contextfunction`` and ``contextfilter``.
5 years ago
"""
f.jinja_pass_arg = _PassArg.context # type: ignore
5 years ago
return f
def pass_eval_context(f: F) -> F:
"""Pass the :class:`~jinja2.nodes.EvalContext` as the first argument
to the decorated function when called while rendering a template.
See :ref:`eval-context`.
5 years ago
Can be used on functions, filters, and tests.
If only ``EvalContext.environment`` is needed, use
:func:`pass_environment`.
.. versionadded:: 3.0.0
Replaces ``evalcontextfunction`` and ``evalcontextfilter``.
5 years ago
"""
f.jinja_pass_arg = _PassArg.eval_context # type: ignore
5 years ago
return f
def pass_environment(f: F) -> F:
"""Pass the :class:`~jinja2.Environment` as the first argument to
the decorated function when called while rendering a template.
Can be used on functions, filters, and tests.
.. versionadded:: 3.0.0
Replaces ``environmentfunction`` and ``environmentfilter``.
5 years ago
"""
f.jinja_pass_arg = _PassArg.environment # type: ignore
5 years ago
return f
class _PassArg(enum.Enum):
context = enum.auto()
eval_context = enum.auto()
environment = enum.auto()
@classmethod
def from_obj(cls, obj: F) -> t.Optional["_PassArg"]:
if hasattr(obj, "jinja_pass_arg"):
return obj.jinja_pass_arg # type: ignore
return None
def internalcode(f: F) -> F:
5 years ago
"""Marks the function as internally used"""
internal_code.add(f.__code__)
return f
def is_undefined(obj: t.Any) -> bool:
5 years ago
"""Check if the object passed is undefined. This does nothing more than
performing an instance check against :class:`Undefined` but looks nicer.
This can be used for custom filters or tests that want to react to
undefined variables. For example a custom default filter can look like
this::
def default(var, default=''):
if is_undefined(var):
return default
return var
"""
from .runtime import Undefined
5 years ago
return isinstance(obj, Undefined)
def consume(iterable: t.Iterable[t.Any]) -> None:
5 years ago
"""Consumes an iterable without doing anything with it."""
for _ in iterable:
5 years ago
pass
def clear_caches() -> None:
"""Jinja keeps internal caches for environments and lexers. These are
used so that Jinja doesn't have to recreate environments and lexers all
5 years ago
the time. Normally you don't have to care about that but if you are
measuring memory consumption you may want to clean the caches.
"""
from .environment import get_spontaneous_environment
from .lexer import _lexer_cache
get_spontaneous_environment.cache_clear()
5 years ago
_lexer_cache.clear()
def import_string(import_name: str, silent: bool = False) -> t.Any:
5 years ago
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).
If the `silent` is True the return value will be `None` if the import
fails.
:return: imported object
"""
try:
if ":" in import_name:
module, obj = import_name.split(":", 1)
elif "." in import_name:
module, _, obj = import_name.rpartition(".")
5 years ago
else:
return __import__(import_name)
return getattr(__import__(module, None, None, [obj]), obj)
except (ImportError, AttributeError):
if not silent:
raise
def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO]:
5 years ago
"""Returns a file descriptor for the filename if that file exists,
otherwise ``None``.
5 years ago
"""
if not os.path.isfile(filename):
return None
return open(filename, mode)
5 years ago
def object_type_repr(obj: t.Any) -> str:
5 years ago
"""Returns the name of the object's type. For some recognized
singletons the name of the object is returned instead. (For
example for `None` and `Ellipsis`).
"""
if obj is None:
return "None"
5 years ago
elif obj is Ellipsis:
return "Ellipsis"
5 years ago
cls = type(obj)
5 years ago
if cls.__module__ == "builtins":
return f"{cls.__name__} object"
return f"{cls.__module__}.{cls.__name__} object"
5 years ago
def pformat(obj: t.Any) -> str:
"""Format an object using :func:`pprint.pformat`."""
from pprint import pformat # type: ignore
5 years ago
return pformat(obj)
5 years ago
_http_re = re.compile(
r"""
^
(
(https?://|www\.) # scheme or www
(([\w%-]+\.)+)? # subdomain
(
[a-z]{2,63} # basic tld
|
xn--[\w%]{2,59} # idna tld
)
|
([\w%-]{2,63}\.)+ # basic domain
(com|net|int|edu|gov|org|info|mil) # basic tld
|
(https?://) # scheme
(
(([\d]{1,3})(\.[\d]{1,3}){3}) # IPv4
|
(\[([\da-f]{0,4}:){2}([\da-f]{0,4}:?){1,6}]) # IPv6
)
)
(?::[\d]{1,5})? # port
(?:[/?#]\S*)? # path, query, and fragment
$
""",
re.IGNORECASE | re.VERBOSE,
)
_email_re = re.compile(r"^\S+@\w[\w.-]*\.\w+$")
def urlize(
text: str,
trim_url_limit: t.Optional[int] = None,
rel: t.Optional[str] = None,
target: t.Optional[str] = None,
extra_schemes: t.Optional[t.Iterable[str]] = None,
) -> str:
"""Convert URLs in text into clickable links.
This may not recognize links in some situations. Usually, a more
comprehensive formatter, such as a Markdown library, is a better
choice.
Works on ``http://``, ``https://``, ``www.``, ``mailto:``, and email
addresses. Links with trailing punctuation (periods, commas, closing
parentheses) and leading punctuation (opening parentheses) are
recognized excluding the punctuation. Email addresses that include
header fields are not recognized (for example,
``mailto:address@example.com?cc=copy@example.com``).
:param text: Original text containing URLs to link.
:param trim_url_limit: Shorten displayed URL values to this length.
:param target: Add the ``target`` attribute to links.
:param rel: Add the ``rel`` attribute to links.
:param extra_schemes: Recognize URLs that start with these schemes
in addition to the default behavior.
.. versionchanged:: 3.0
The ``extra_schemes`` parameter was added.
.. versionchanged:: 3.0
Generate ``https://`` links for URLs without a scheme.
.. versionchanged:: 3.0
The parsing rules were updated. Recognize email addresses with
or without the ``mailto:`` scheme. Validate IP addresses. Ignore
parentheses and brackets in more cases.
5 years ago
"""
if trim_url_limit is not None:
def trim_url(x: str) -> str:
if len(x) > trim_url_limit: # type: ignore
return f"{x[:trim_url_limit]}..."
return x
else:
def trim_url(x: str) -> str:
return x
words = re.split(r"(\s+)", str(markupsafe.escape(text)))
rel_attr = f' rel="{markupsafe.escape(rel)}"' if rel else ""
target_attr = f' target="{markupsafe.escape(target)}"' if target else ""
5 years ago
for i, word in enumerate(words):
head, middle, tail = "", word, ""
match = re.match(r"^([(<]|&lt;)+", middle)
5 years ago
if match:
head = match.group()
middle = middle[match.end() :]
# Unlike lead, which is anchored to the start of the string,
# need to check that the string ends with any of the characters
# before trying to match all of them, to avoid backtracking.
if middle.endswith((")", ">", ".", ",", "\n", "&gt;")):
match = re.search(r"([)>.,\n]|&gt;)+$", middle)
if match:
tail = match.group()
middle = middle[: match.start()]
# Prefer balancing parentheses in URLs instead of ignoring a
# trailing character.
for start_char, end_char in ("(", ")"), ("<", ">"), ("&lt;", "&gt;"):
start_count = middle.count(start_char)
if start_count <= middle.count(end_char):
# Balanced, or lighter on the left
continue
# Move as many as possible from the tail to balance
for _ in range(min(start_count, tail.count(end_char))):
end_index = tail.index(end_char) + len(end_char)
# Move anything in the tail before the end char too
middle += tail[:end_index]
tail = tail[end_index:]
if _http_re.match(middle):
if middle.startswith("https://") or middle.startswith("http://"):
middle = (
f'<a href="{middle}"{rel_attr}{target_attr}>{trim_url(middle)}</a>'
)
else:
middle = (
f'<a href="https://{middle}"{rel_attr}{target_attr}>'
f"{trim_url(middle)}</a>"
)
elif middle.startswith("mailto:") and _email_re.match(middle[7:]):
middle = f'<a href="{middle}">{middle[7:]}</a>'
elif (
"@" in middle
and not middle.startswith("www.")
and ":" not in middle
and _email_re.match(middle)
):
middle = f'<a href="mailto:{middle}">{middle}</a>'
elif extra_schemes is not None:
for scheme in extra_schemes:
if middle != scheme and middle.startswith(scheme):
middle = f'<a href="{middle}"{rel_attr}{target_attr}>{middle}</a>'
words[i] = f"{head}{middle}{tail}"
return "".join(words)
def generate_lorem_ipsum(
n: int = 5, html: bool = True, min: int = 20, max: int = 100
) -> str:
5 years ago
"""Generate some lorem ipsum for the template."""
from .constants import LOREM_IPSUM_WORDS
5 years ago
words = LOREM_IPSUM_WORDS.split()
result = []
for _ in range(n):
next_capitalized = True
last_comma = last_fullstop = 0
word = None
last = None
p = []
# each paragraph contains out of 20 to 100 words.
for idx, _ in enumerate(range(randrange(min, max))):
while True:
word = choice(words)
if word != last:
last = word
break
if next_capitalized:
word = word.capitalize()
next_capitalized = False
# add commas
if idx - randrange(3, 8) > last_comma:
last_comma = idx
last_fullstop += 2
word += ","
5 years ago
# add end of sentences
if idx - randrange(10, 20) > last_fullstop:
last_comma = last_fullstop = idx
word += "."
5 years ago
next_capitalized = True
p.append(word)
# ensure that the paragraph ends with a dot.
p_str = " ".join(p)
if p_str.endswith(","):
p_str = p_str[:-1] + "."
elif not p_str.endswith("."):
p_str += "."
result.append(p_str)
5 years ago
if not html:
return "\n\n".join(result)
return markupsafe.Markup(
"\n".join(f"<p>{markupsafe.escape(x)}</p>" for x in result)
)
5 years ago
def url_quote(obj: t.Any, charset: str = "utf-8", for_qs: bool = False) -> str:
"""Quote a string for use in a URL using the given charset.
5 years ago
:param obj: String or bytes to quote. Other types are converted to
string then encoded to bytes using the given charset.
:param charset: Encode text to bytes using this charset.
:param for_qs: Quote "/" and use "+" for spaces.
5 years ago
"""
if not isinstance(obj, bytes):
if not isinstance(obj, str):
obj = str(obj)
5 years ago
obj = obj.encode(charset)
safe = b"" if for_qs else b"/"
rv = quote_from_bytes(obj, safe)
5 years ago
if for_qs:
rv = rv.replace("%20", "+")
5 years ago
return rv
@abc.MutableMapping.register
class LRUCache:
5 years ago
"""A simple LRU Cache implementation."""
# this is fast for small capacities (something below 1000) but doesn't
# scale. But as long as it's only used as storage for templates this
# won't do any harm.
def __init__(self, capacity: int) -> None:
5 years ago
self.capacity = capacity
self._mapping: t.Dict[t.Any, t.Any] = {}
self._queue: "te.Deque[t.Any]" = deque()
5 years ago
self._postinit()
def _postinit(self) -> None:
5 years ago
# alias all queue methods for faster lookup
self._popleft = self._queue.popleft
self._pop = self._queue.pop
self._remove = self._queue.remove
self._wlock = Lock()
self._append = self._queue.append
def __getstate__(self) -> t.Mapping[str, t.Any]:
5 years ago
return {
"capacity": self.capacity,
"_mapping": self._mapping,
"_queue": self._queue,
5 years ago
}
def __setstate__(self, d: t.Mapping[str, t.Any]) -> None:
5 years ago
self.__dict__.update(d)
self._postinit()
def __getnewargs__(self) -> t.Tuple:
5 years ago
return (self.capacity,)
def copy(self) -> "LRUCache":
5 years ago
"""Return a shallow copy of the instance."""
rv = self.__class__(self.capacity)
rv._mapping.update(self._mapping)
rv._queue.extend(self._queue)
5 years ago
return rv
def get(self, key: t.Any, default: t.Any = None) -> t.Any:
5 years ago
"""Return an item from the cache dict or `default`"""
try:
return self[key]
except KeyError:
return default
def setdefault(self, key: t.Any, default: t.Any = None) -> t.Any:
5 years ago
"""Set `default` if the key is not in the cache otherwise
leave unchanged. Return the value of this key.
"""
try:
return self[key]
except KeyError:
self[key] = default
return default
5 years ago
def clear(self) -> None:
5 years ago
"""Clear the cache."""
with self._wlock:
5 years ago
self._mapping.clear()
self._queue.clear()
def __contains__(self, key: t.Any) -> bool:
5 years ago
"""Check if a key exists in this cache."""
return key in self._mapping
def __len__(self) -> int:
5 years ago
"""Return the current size of the cache."""
return len(self._mapping)
def __repr__(self) -> str:
return f"<{type(self).__name__} {self._mapping!r}>"
5 years ago
def __getitem__(self, key: t.Any) -> t.Any:
5 years ago
"""Get an item from the cache. Moves the item up so that it has the
highest priority then.
Raise a `KeyError` if it does not exist.
"""
with self._wlock:
5 years ago
rv = self._mapping[key]
5 years ago
if self._queue[-1] != key:
try:
self._remove(key)
except ValueError:
# if something removed the key from the container
# when we read, ignore the ValueError that we would
# get otherwise.
pass
5 years ago
self._append(key)
5 years ago
return rv
def __setitem__(self, key: t.Any, value: t.Any) -> None:
5 years ago
"""Sets the value for an item. Moves the item up so that it
has the highest priority then.
"""
with self._wlock:
5 years ago
if key in self._mapping:
self._remove(key)
elif len(self._mapping) == self.capacity:
del self._mapping[self._popleft()]
5 years ago
self._append(key)
self._mapping[key] = value
def __delitem__(self, key: t.Any) -> None:
5 years ago
"""Remove an item from the cache dict.
Raise a `KeyError` if it does not exist.
"""
with self._wlock:
5 years ago
del self._mapping[key]
5 years ago
try:
self._remove(key)
except ValueError:
pass
def items(self) -> t.Iterable[t.Tuple[t.Any, t.Any]]:
5 years ago
"""Return a list of items."""
result = [(key, self._mapping[key]) for key in list(self._queue)]
result.reverse()
return result
def values(self) -> t.Iterable[t.Any]:
5 years ago
"""Return a list of all values."""
return [x[1] for x in self.items()]
def keys(self) -> t.Iterable[t.Any]:
5 years ago
"""Return a list of all keys ordered by most recent usage."""
return list(self)
def __iter__(self) -> t.Iterator[t.Any]:
5 years ago
return reversed(tuple(self._queue))
def __reversed__(self) -> t.Iterator[t.Any]:
"""Iterate over the keys in the cache dict, oldest items
5 years ago
coming first.
"""
return iter(tuple(self._queue))
__copy__ = copy
def select_autoescape(
enabled_extensions: t.Collection[str] = ("html", "htm", "xml"),
disabled_extensions: t.Collection[str] = (),
default_for_string: bool = True,
default: bool = False,
) -> t.Callable[[t.Optional[str]], bool]:
5 years ago
"""Intelligently sets the initial value of autoescaping based on the
filename of the template. This is the recommended way to configure
autoescaping if you do not want to write a custom function yourself.
If you want to enable it for all templates created from strings or
for all templates with `.html` and `.xml` extensions::
from jinja2 import Environment, select_autoescape
env = Environment(autoescape=select_autoescape(
enabled_extensions=('html', 'xml'),
default_for_string=True,
))
Example configuration to turn it on at all times except if the template
ends with `.txt`::
from jinja2 import Environment, select_autoescape
env = Environment(autoescape=select_autoescape(
disabled_extensions=('txt',),
default_for_string=True,
default=True,
))
The `enabled_extensions` is an iterable of all the extensions that
autoescaping should be enabled for. Likewise `disabled_extensions` is
a list of all templates it should be disabled for. If a template is
loaded from a string then the default from `default_for_string` is used.
If nothing matches then the initial value of autoescaping is set to the
value of `default`.
For security reasons this function operates case insensitive.
.. versionadded:: 2.9
"""
enabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in enabled_extensions)
disabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in disabled_extensions)
def autoescape(template_name: t.Optional[str]) -> bool:
5 years ago
if template_name is None:
return default_for_string
template_name = template_name.lower()
if template_name.endswith(enabled_patterns):
return True
if template_name.endswith(disabled_patterns):
return False
return default
5 years ago
return autoescape
def htmlsafe_json_dumps(
obj: t.Any, dumps: t.Optional[t.Callable[..., str]] = None, **kwargs: t.Any
) -> markupsafe.Markup:
"""Serialize an object to a string of JSON with :func:`json.dumps`,
then replace HTML-unsafe characters with Unicode escapes and mark
the result safe with :class:`~markupsafe.Markup`.
This is available in templates as the ``|tojson`` filter.
The following characters are escaped: ``<``, ``>``, ``&``, ``'``.
The returned string is safe to render in HTML documents and
``<script>`` tags. The exception is in HTML attributes that are
double quoted; either use single quotes or the ``|forceescape``
filter.
5 years ago
:param obj: The object to serialize to JSON.
:param dumps: The ``dumps`` function to use. Defaults to
``env.policies["json.dumps_function"]``, which defaults to
:func:`json.dumps`.
:param kwargs: Extra arguments to pass to ``dumps``. Merged onto
``env.policies["json.dumps_kwargs"]``.
5 years ago
.. versionchanged:: 3.0
The ``dumper`` parameter is renamed to ``dumps``.
5 years ago
.. versionadded:: 2.9
5 years ago
"""
if dumps is None:
dumps = json.dumps
return markupsafe.Markup(
dumps(obj, **kwargs)
.replace("<", "\\u003c")
.replace(">", "\\u003e")
.replace("&", "\\u0026")
.replace("'", "\\u0027")
)
class Cycler:
"""Cycle through values by yield them one at a time, then restarting
once the end is reached. Available as ``cycler`` in templates.
5 years ago
Similar to ``loop.cycle``, but can be used outside loops or across
multiple loops. For example, render a list of folders and files in a
list, alternating giving them "odd" and "even" classes.
5 years ago
.. code-block:: html+jinja
5 years ago
{% set row_class = cycler("odd", "even") %}
<ul class="browser">
{% for folder in folders %}
<li class="folder {{ row_class.next() }}">{{ folder }}
{% endfor %}
{% for file in files %}
<li class="file {{ row_class.next() }}">{{ file }}
{% endfor %}
</ul>
:param items: Each positional argument will be yielded in the order
given for each cycle.
.. versionadded:: 2.1
"""
def __init__(self, *items: t.Any) -> None:
5 years ago
if not items:
raise RuntimeError("at least one item has to be provided")
5 years ago
self.items = items
self.pos = 0
5 years ago
def reset(self) -> None:
"""Resets the current item to the first item."""
5 years ago
self.pos = 0
@property
def current(self) -> t.Any:
"""Return the current item. Equivalent to the item that will be
returned next time :meth:`next` is called.
"""
5 years ago
return self.items[self.pos]
def next(self) -> t.Any:
"""Return the current item, then advance :attr:`current` to the
next item.
"""
5 years ago
rv = self.current
self.pos = (self.pos + 1) % len(self.items)
return rv
__next__ = next
class Joiner:
5 years ago
"""A joining helper for templates."""
def __init__(self, sep: str = ", ") -> None:
5 years ago
self.sep = sep
self.used = False
def __call__(self) -> str:
5 years ago
if not self.used:
self.used = True
return ""
5 years ago
return self.sep
class Namespace:
5 years ago
"""A namespace object that can hold arbitrary attributes. It may be
initialized from a dictionary or with keyword arguments."""
5 years ago
def __init__(*args: t.Any, **kwargs: t.Any) -> None: # noqa: B902
5 years ago
self, args = args[0], args[1:]
self.__attrs = dict(*args, **kwargs)
def __getattribute__(self, name: str) -> t.Any:
# __class__ is needed for the awaitable check in async mode
if name in {"_Namespace__attrs", "__class__"}:
5 years ago
return object.__getattribute__(self, name)
try:
return self.__attrs[name]
except KeyError:
raise AttributeError(name) from None
5 years ago
def __setitem__(self, name: str, value: t.Any) -> None:
5 years ago
self.__attrs[name] = value
def __repr__(self) -> str:
return f"<Namespace {self.__attrs!r}>"