You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
351 lines
14 KiB
351 lines
14 KiB
11 months ago
|
import os
|
||
|
import warnings
|
||
|
from pathlib import Path
|
||
|
from typing import AbstractSet, Any, Callable, ClassVar, Dict, List, Mapping, Optional, Tuple, Type, Union
|
||
|
|
||
|
from .config import BaseConfig, Extra
|
||
|
from .fields import ModelField
|
||
|
from .main import BaseModel
|
||
|
from .types import JsonWrapper
|
||
|
from .typing import StrPath, display_as_type, get_origin, is_union
|
||
|
from .utils import deep_update, lenient_issubclass, path_type, sequence_like
|
||
|
|
||
|
env_file_sentinel = str(object())
|
||
|
|
||
|
SettingsSourceCallable = Callable[['BaseSettings'], Dict[str, Any]]
|
||
|
DotenvType = Union[StrPath, List[StrPath], Tuple[StrPath, ...]]
|
||
|
|
||
|
|
||
|
class SettingsError(ValueError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class BaseSettings(BaseModel):
|
||
|
"""
|
||
|
Base class for settings, allowing values to be overridden by environment variables.
|
||
|
|
||
|
This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose),
|
||
|
Heroku and any 12 factor app design.
|
||
|
"""
|
||
|
|
||
|
def __init__(
|
||
|
__pydantic_self__,
|
||
|
_env_file: Optional[DotenvType] = env_file_sentinel,
|
||
|
_env_file_encoding: Optional[str] = None,
|
||
|
_env_nested_delimiter: Optional[str] = None,
|
||
|
_secrets_dir: Optional[StrPath] = None,
|
||
|
**values: Any,
|
||
|
) -> None:
|
||
|
# Uses something other than `self` the first arg to allow "self" as a settable attribute
|
||
|
super().__init__(
|
||
|
**__pydantic_self__._build_values(
|
||
|
values,
|
||
|
_env_file=_env_file,
|
||
|
_env_file_encoding=_env_file_encoding,
|
||
|
_env_nested_delimiter=_env_nested_delimiter,
|
||
|
_secrets_dir=_secrets_dir,
|
||
|
)
|
||
|
)
|
||
|
|
||
|
def _build_values(
|
||
|
self,
|
||
|
init_kwargs: Dict[str, Any],
|
||
|
_env_file: Optional[DotenvType] = None,
|
||
|
_env_file_encoding: Optional[str] = None,
|
||
|
_env_nested_delimiter: Optional[str] = None,
|
||
|
_secrets_dir: Optional[StrPath] = None,
|
||
|
) -> Dict[str, Any]:
|
||
|
# Configure built-in sources
|
||
|
init_settings = InitSettingsSource(init_kwargs=init_kwargs)
|
||
|
env_settings = EnvSettingsSource(
|
||
|
env_file=(_env_file if _env_file != env_file_sentinel else self.__config__.env_file),
|
||
|
env_file_encoding=(
|
||
|
_env_file_encoding if _env_file_encoding is not None else self.__config__.env_file_encoding
|
||
|
),
|
||
|
env_nested_delimiter=(
|
||
|
_env_nested_delimiter if _env_nested_delimiter is not None else self.__config__.env_nested_delimiter
|
||
|
),
|
||
|
env_prefix_len=len(self.__config__.env_prefix),
|
||
|
)
|
||
|
file_secret_settings = SecretsSettingsSource(secrets_dir=_secrets_dir or self.__config__.secrets_dir)
|
||
|
# Provide a hook to set built-in sources priority and add / remove sources
|
||
|
sources = self.__config__.customise_sources(
|
||
|
init_settings=init_settings, env_settings=env_settings, file_secret_settings=file_secret_settings
|
||
|
)
|
||
|
if sources:
|
||
|
return deep_update(*reversed([source(self) for source in sources]))
|
||
|
else:
|
||
|
# no one should mean to do this, but I think returning an empty dict is marginally preferable
|
||
|
# to an informative error and much better than a confusing error
|
||
|
return {}
|
||
|
|
||
|
class Config(BaseConfig):
|
||
|
env_prefix: str = ''
|
||
|
env_file: Optional[DotenvType] = None
|
||
|
env_file_encoding: Optional[str] = None
|
||
|
env_nested_delimiter: Optional[str] = None
|
||
|
secrets_dir: Optional[StrPath] = None
|
||
|
validate_all: bool = True
|
||
|
extra: Extra = Extra.forbid
|
||
|
arbitrary_types_allowed: bool = True
|
||
|
case_sensitive: bool = False
|
||
|
|
||
|
@classmethod
|
||
|
def prepare_field(cls, field: ModelField) -> None:
|
||
|
env_names: Union[List[str], AbstractSet[str]]
|
||
|
field_info_from_config = cls.get_field_info(field.name)
|
||
|
|
||
|
env = field_info_from_config.get('env') or field.field_info.extra.get('env')
|
||
|
if env is None:
|
||
|
if field.has_alias:
|
||
|
warnings.warn(
|
||
|
'aliases are no longer used by BaseSettings to define which environment variables to read. '
|
||
|
'Instead use the "env" field setting. '
|
||
|
'See https://pydantic-docs.helpmanual.io/usage/settings/#environment-variable-names',
|
||
|
FutureWarning,
|
||
|
)
|
||
|
env_names = {cls.env_prefix + field.name}
|
||
|
elif isinstance(env, str):
|
||
|
env_names = {env}
|
||
|
elif isinstance(env, (set, frozenset)):
|
||
|
env_names = env
|
||
|
elif sequence_like(env):
|
||
|
env_names = list(env)
|
||
|
else:
|
||
|
raise TypeError(f'invalid field env: {env!r} ({display_as_type(env)}); should be string, list or set')
|
||
|
|
||
|
if not cls.case_sensitive:
|
||
|
env_names = env_names.__class__(n.lower() for n in env_names)
|
||
|
field.field_info.extra['env_names'] = env_names
|
||
|
|
||
|
@classmethod
|
||
|
def customise_sources(
|
||
|
cls,
|
||
|
init_settings: SettingsSourceCallable,
|
||
|
env_settings: SettingsSourceCallable,
|
||
|
file_secret_settings: SettingsSourceCallable,
|
||
|
) -> Tuple[SettingsSourceCallable, ...]:
|
||
|
return init_settings, env_settings, file_secret_settings
|
||
|
|
||
|
@classmethod
|
||
|
def parse_env_var(cls, field_name: str, raw_val: str) -> Any:
|
||
|
return cls.json_loads(raw_val)
|
||
|
|
||
|
# populated by the metaclass using the Config class defined above, annotated here to help IDEs only
|
||
|
__config__: ClassVar[Type[Config]]
|
||
|
|
||
|
|
||
|
class InitSettingsSource:
|
||
|
__slots__ = ('init_kwargs',)
|
||
|
|
||
|
def __init__(self, init_kwargs: Dict[str, Any]):
|
||
|
self.init_kwargs = init_kwargs
|
||
|
|
||
|
def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
|
||
|
return self.init_kwargs
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return f'InitSettingsSource(init_kwargs={self.init_kwargs!r})'
|
||
|
|
||
|
|
||
|
class EnvSettingsSource:
|
||
|
__slots__ = ('env_file', 'env_file_encoding', 'env_nested_delimiter', 'env_prefix_len')
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
env_file: Optional[DotenvType],
|
||
|
env_file_encoding: Optional[str],
|
||
|
env_nested_delimiter: Optional[str] = None,
|
||
|
env_prefix_len: int = 0,
|
||
|
):
|
||
|
self.env_file: Optional[DotenvType] = env_file
|
||
|
self.env_file_encoding: Optional[str] = env_file_encoding
|
||
|
self.env_nested_delimiter: Optional[str] = env_nested_delimiter
|
||
|
self.env_prefix_len: int = env_prefix_len
|
||
|
|
||
|
def __call__(self, settings: BaseSettings) -> Dict[str, Any]: # noqa C901
|
||
|
"""
|
||
|
Build environment variables suitable for passing to the Model.
|
||
|
"""
|
||
|
d: Dict[str, Any] = {}
|
||
|
|
||
|
if settings.__config__.case_sensitive:
|
||
|
env_vars: Mapping[str, Optional[str]] = os.environ
|
||
|
else:
|
||
|
env_vars = {k.lower(): v for k, v in os.environ.items()}
|
||
|
|
||
|
dotenv_vars = self._read_env_files(settings.__config__.case_sensitive)
|
||
|
if dotenv_vars:
|
||
|
env_vars = {**dotenv_vars, **env_vars}
|
||
|
|
||
|
for field in settings.__fields__.values():
|
||
|
env_val: Optional[str] = None
|
||
|
for env_name in field.field_info.extra['env_names']:
|
||
|
env_val = env_vars.get(env_name)
|
||
|
if env_val is not None:
|
||
|
break
|
||
|
|
||
|
is_complex, allow_parse_failure = self.field_is_complex(field)
|
||
|
if is_complex:
|
||
|
if env_val is None:
|
||
|
# field is complex but no value found so far, try explode_env_vars
|
||
|
env_val_built = self.explode_env_vars(field, env_vars)
|
||
|
if env_val_built:
|
||
|
d[field.alias] = env_val_built
|
||
|
else:
|
||
|
# field is complex and there's a value, decode that as JSON, then add explode_env_vars
|
||
|
try:
|
||
|
env_val = settings.__config__.parse_env_var(field.name, env_val)
|
||
|
except ValueError as e:
|
||
|
if not allow_parse_failure:
|
||
|
raise SettingsError(f'error parsing env var "{env_name}"') from e
|
||
|
|
||
|
if isinstance(env_val, dict):
|
||
|
d[field.alias] = deep_update(env_val, self.explode_env_vars(field, env_vars))
|
||
|
else:
|
||
|
d[field.alias] = env_val
|
||
|
elif env_val is not None:
|
||
|
# simplest case, field is not complex, we only need to add the value if it was found
|
||
|
d[field.alias] = env_val
|
||
|
|
||
|
return d
|
||
|
|
||
|
def _read_env_files(self, case_sensitive: bool) -> Dict[str, Optional[str]]:
|
||
|
env_files = self.env_file
|
||
|
if env_files is None:
|
||
|
return {}
|
||
|
|
||
|
if isinstance(env_files, (str, os.PathLike)):
|
||
|
env_files = [env_files]
|
||
|
|
||
|
dotenv_vars = {}
|
||
|
for env_file in env_files:
|
||
|
env_path = Path(env_file).expanduser()
|
||
|
if env_path.is_file():
|
||
|
dotenv_vars.update(
|
||
|
read_env_file(env_path, encoding=self.env_file_encoding, case_sensitive=case_sensitive)
|
||
|
)
|
||
|
|
||
|
return dotenv_vars
|
||
|
|
||
|
def field_is_complex(self, field: ModelField) -> Tuple[bool, bool]:
|
||
|
"""
|
||
|
Find out if a field is complex, and if so whether JSON errors should be ignored
|
||
|
"""
|
||
|
if lenient_issubclass(field.annotation, JsonWrapper):
|
||
|
return False, False
|
||
|
|
||
|
if field.is_complex():
|
||
|
allow_parse_failure = False
|
||
|
elif is_union(get_origin(field.type_)) and field.sub_fields and any(f.is_complex() for f in field.sub_fields):
|
||
|
allow_parse_failure = True
|
||
|
else:
|
||
|
return False, False
|
||
|
|
||
|
return True, allow_parse_failure
|
||
|
|
||
|
def explode_env_vars(self, field: ModelField, env_vars: Mapping[str, Optional[str]]) -> Dict[str, Any]:
|
||
|
"""
|
||
|
Process env_vars and extract the values of keys containing env_nested_delimiter into nested dictionaries.
|
||
|
|
||
|
This is applied to a single field, hence filtering by env_var prefix.
|
||
|
"""
|
||
|
prefixes = [f'{env_name}{self.env_nested_delimiter}' for env_name in field.field_info.extra['env_names']]
|
||
|
result: Dict[str, Any] = {}
|
||
|
for env_name, env_val in env_vars.items():
|
||
|
if not any(env_name.startswith(prefix) for prefix in prefixes):
|
||
|
continue
|
||
|
# we remove the prefix before splitting in case the prefix has characters in common with the delimiter
|
||
|
env_name_without_prefix = env_name[self.env_prefix_len :]
|
||
|
_, *keys, last_key = env_name_without_prefix.split(self.env_nested_delimiter)
|
||
|
env_var = result
|
||
|
for key in keys:
|
||
|
env_var = env_var.setdefault(key, {})
|
||
|
env_var[last_key] = env_val
|
||
|
|
||
|
return result
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return (
|
||
|
f'EnvSettingsSource(env_file={self.env_file!r}, env_file_encoding={self.env_file_encoding!r}, '
|
||
|
f'env_nested_delimiter={self.env_nested_delimiter!r})'
|
||
|
)
|
||
|
|
||
|
|
||
|
class SecretsSettingsSource:
|
||
|
__slots__ = ('secrets_dir',)
|
||
|
|
||
|
def __init__(self, secrets_dir: Optional[StrPath]):
|
||
|
self.secrets_dir: Optional[StrPath] = secrets_dir
|
||
|
|
||
|
def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
|
||
|
"""
|
||
|
Build fields from "secrets" files.
|
||
|
"""
|
||
|
secrets: Dict[str, Optional[str]] = {}
|
||
|
|
||
|
if self.secrets_dir is None:
|
||
|
return secrets
|
||
|
|
||
|
secrets_path = Path(self.secrets_dir).expanduser()
|
||
|
|
||
|
if not secrets_path.exists():
|
||
|
warnings.warn(f'directory "{secrets_path}" does not exist')
|
||
|
return secrets
|
||
|
|
||
|
if not secrets_path.is_dir():
|
||
|
raise SettingsError(f'secrets_dir must reference a directory, not a {path_type(secrets_path)}')
|
||
|
|
||
|
for field in settings.__fields__.values():
|
||
|
for env_name in field.field_info.extra['env_names']:
|
||
|
path = find_case_path(secrets_path, env_name, settings.__config__.case_sensitive)
|
||
|
if not path:
|
||
|
# path does not exist, we currently don't return a warning for this
|
||
|
continue
|
||
|
|
||
|
if path.is_file():
|
||
|
secret_value = path.read_text().strip()
|
||
|
if field.is_complex():
|
||
|
try:
|
||
|
secret_value = settings.__config__.parse_env_var(field.name, secret_value)
|
||
|
except ValueError as e:
|
||
|
raise SettingsError(f'error parsing env var "{env_name}"') from e
|
||
|
|
||
|
secrets[field.alias] = secret_value
|
||
|
else:
|
||
|
warnings.warn(
|
||
|
f'attempted to load secret file "{path}" but found a {path_type(path)} instead.',
|
||
|
stacklevel=4,
|
||
|
)
|
||
|
return secrets
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return f'SecretsSettingsSource(secrets_dir={self.secrets_dir!r})'
|
||
|
|
||
|
|
||
|
def read_env_file(
|
||
|
file_path: StrPath, *, encoding: str = None, case_sensitive: bool = False
|
||
|
) -> Dict[str, Optional[str]]:
|
||
|
try:
|
||
|
from dotenv import dotenv_values
|
||
|
except ImportError as e:
|
||
|
raise ImportError('python-dotenv is not installed, run `pip install pydantic[dotenv]`') from e
|
||
|
|
||
|
file_vars: Dict[str, Optional[str]] = dotenv_values(file_path, encoding=encoding or 'utf8')
|
||
|
if not case_sensitive:
|
||
|
return {k.lower(): v for k, v in file_vars.items()}
|
||
|
else:
|
||
|
return file_vars
|
||
|
|
||
|
|
||
|
def find_case_path(dir_path: Path, file_name: str, case_sensitive: bool) -> Optional[Path]:
|
||
|
"""
|
||
|
Find a file within path's directory matching filename, optionally ignoring case.
|
||
|
"""
|
||
|
for f in dir_path.iterdir():
|
||
|
if f.name == file_name:
|
||
|
return f
|
||
|
elif not case_sensitive and f.name.lower() == file_name.lower():
|
||
|
return f
|
||
|
return None
|