You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

307 lines
12 KiB

  1. import os
  2. import warnings
  3. from pathlib import Path
  4. from typing import AbstractSet, Any, Callable, ClassVar, Dict, List, Mapping, Optional, Tuple, Type, Union
  5. from .config import BaseConfig, Extra
  6. from .fields import ModelField
  7. from .main import BaseModel
  8. from .typing import StrPath, display_as_type, get_origin, is_union
  9. from .utils import deep_update, path_type, sequence_like
  10. env_file_sentinel = str(object())
  11. SettingsSourceCallable = Callable[['BaseSettings'], Dict[str, Any]]
  12. class SettingsError(ValueError):
  13. pass
  14. class BaseSettings(BaseModel):
  15. """
  16. Base class for settings, allowing values to be overridden by environment variables.
  17. This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose),
  18. Heroku and any 12 factor app design.
  19. """
  20. def __init__(
  21. __pydantic_self__,
  22. _env_file: Optional[StrPath] = env_file_sentinel,
  23. _env_file_encoding: Optional[str] = None,
  24. _env_nested_delimiter: Optional[str] = None,
  25. _secrets_dir: Optional[StrPath] = None,
  26. **values: Any,
  27. ) -> None:
  28. # Uses something other than `self` the first arg to allow "self" as a settable attribute
  29. super().__init__(
  30. **__pydantic_self__._build_values(
  31. values,
  32. _env_file=_env_file,
  33. _env_file_encoding=_env_file_encoding,
  34. _env_nested_delimiter=_env_nested_delimiter,
  35. _secrets_dir=_secrets_dir,
  36. )
  37. )
  38. def _build_values(
  39. self,
  40. init_kwargs: Dict[str, Any],
  41. _env_file: Optional[StrPath] = None,
  42. _env_file_encoding: Optional[str] = None,
  43. _env_nested_delimiter: Optional[str] = None,
  44. _secrets_dir: Optional[StrPath] = None,
  45. ) -> Dict[str, Any]:
  46. # Configure built-in sources
  47. init_settings = InitSettingsSource(init_kwargs=init_kwargs)
  48. env_settings = EnvSettingsSource(
  49. env_file=(_env_file if _env_file != env_file_sentinel else self.__config__.env_file),
  50. env_file_encoding=(
  51. _env_file_encoding if _env_file_encoding is not None else self.__config__.env_file_encoding
  52. ),
  53. env_nested_delimiter=(
  54. _env_nested_delimiter if _env_nested_delimiter is not None else self.__config__.env_nested_delimiter
  55. ),
  56. )
  57. file_secret_settings = SecretsSettingsSource(secrets_dir=_secrets_dir or self.__config__.secrets_dir)
  58. # Provide a hook to set built-in sources priority and add / remove sources
  59. sources = self.__config__.customise_sources(
  60. init_settings=init_settings, env_settings=env_settings, file_secret_settings=file_secret_settings
  61. )
  62. if sources:
  63. return deep_update(*reversed([source(self) for source in sources]))
  64. else:
  65. # no one should mean to do this, but I think returning an empty dict is marginally preferable
  66. # to an informative error and much better than a confusing error
  67. return {}
  68. class Config(BaseConfig):
  69. env_prefix = ''
  70. env_file = None
  71. env_file_encoding = None
  72. env_nested_delimiter = None
  73. secrets_dir = None
  74. validate_all = True
  75. extra = Extra.forbid
  76. arbitrary_types_allowed = True
  77. case_sensitive = False
  78. @classmethod
  79. def prepare_field(cls, field: ModelField) -> None:
  80. env_names: Union[List[str], AbstractSet[str]]
  81. field_info_from_config = cls.get_field_info(field.name)
  82. env = field_info_from_config.get('env') or field.field_info.extra.get('env')
  83. if env is None:
  84. if field.has_alias:
  85. warnings.warn(
  86. 'aliases are no longer used by BaseSettings to define which environment variables to read. '
  87. 'Instead use the "env" field setting. '
  88. 'See https://pydantic-docs.helpmanual.io/usage/settings/#environment-variable-names',
  89. FutureWarning,
  90. )
  91. env_names = {cls.env_prefix + field.name}
  92. elif isinstance(env, str):
  93. env_names = {env}
  94. elif isinstance(env, (set, frozenset)):
  95. env_names = env
  96. elif sequence_like(env):
  97. env_names = list(env)
  98. else:
  99. raise TypeError(f'invalid field env: {env!r} ({display_as_type(env)}); should be string, list or set')
  100. if not cls.case_sensitive:
  101. env_names = env_names.__class__(n.lower() for n in env_names)
  102. field.field_info.extra['env_names'] = env_names
  103. @classmethod
  104. def customise_sources(
  105. cls,
  106. init_settings: SettingsSourceCallable,
  107. env_settings: SettingsSourceCallable,
  108. file_secret_settings: SettingsSourceCallable,
  109. ) -> Tuple[SettingsSourceCallable, ...]:
  110. return init_settings, env_settings, file_secret_settings
  111. # populated by the metaclass using the Config class defined above, annotated here to help IDEs only
  112. __config__: ClassVar[Type[Config]]
  113. class InitSettingsSource:
  114. __slots__ = ('init_kwargs',)
  115. def __init__(self, init_kwargs: Dict[str, Any]):
  116. self.init_kwargs = init_kwargs
  117. def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
  118. return self.init_kwargs
  119. def __repr__(self) -> str:
  120. return f'InitSettingsSource(init_kwargs={self.init_kwargs!r})'
  121. class EnvSettingsSource:
  122. __slots__ = ('env_file', 'env_file_encoding', 'env_nested_delimiter')
  123. def __init__(
  124. self, env_file: Optional[StrPath], env_file_encoding: Optional[str], env_nested_delimiter: Optional[str] = None
  125. ):
  126. self.env_file: Optional[StrPath] = env_file
  127. self.env_file_encoding: Optional[str] = env_file_encoding
  128. self.env_nested_delimiter: Optional[str] = env_nested_delimiter
  129. def __call__(self, settings: BaseSettings) -> Dict[str, Any]: # noqa C901
  130. """
  131. Build environment variables suitable for passing to the Model.
  132. """
  133. d: Dict[str, Any] = {}
  134. if settings.__config__.case_sensitive:
  135. env_vars: Mapping[str, Optional[str]] = os.environ
  136. else:
  137. env_vars = {k.lower(): v for k, v in os.environ.items()}
  138. if self.env_file is not None:
  139. env_path = Path(self.env_file).expanduser()
  140. if env_path.is_file():
  141. env_vars = {
  142. **read_env_file(
  143. env_path, encoding=self.env_file_encoding, case_sensitive=settings.__config__.case_sensitive
  144. ),
  145. **env_vars,
  146. }
  147. for field in settings.__fields__.values():
  148. env_val: Optional[str] = None
  149. for env_name in field.field_info.extra['env_names']:
  150. env_val = env_vars.get(env_name)
  151. if env_val is not None:
  152. break
  153. is_complex, allow_json_failure = self.field_is_complex(field)
  154. if is_complex:
  155. if env_val is None:
  156. # field is complex but no value found so far, try explode_env_vars
  157. env_val_built = self.explode_env_vars(field, env_vars)
  158. if env_val_built:
  159. d[field.alias] = env_val_built
  160. else:
  161. # field is complex and there's a value, decode that as JSON, then add explode_env_vars
  162. try:
  163. env_val = settings.__config__.json_loads(env_val)
  164. except ValueError as e:
  165. if not allow_json_failure:
  166. raise SettingsError(f'error parsing JSON for "{env_name}"') from e
  167. if isinstance(env_val, dict):
  168. d[field.alias] = deep_update(env_val, self.explode_env_vars(field, env_vars))
  169. else:
  170. d[field.alias] = env_val
  171. elif env_val is not None:
  172. # simplest case, field is not complex, we only need to add the value if it was found
  173. d[field.alias] = env_val
  174. return d
  175. def field_is_complex(self, field: ModelField) -> Tuple[bool, bool]:
  176. """
  177. Find out if a field is complex, and if so whether JSON errors should be ignored
  178. """
  179. if field.is_complex():
  180. allow_json_failure = False
  181. elif is_union(get_origin(field.type_)) and field.sub_fields and any(f.is_complex() for f in field.sub_fields):
  182. allow_json_failure = True
  183. else:
  184. return False, False
  185. return True, allow_json_failure
  186. def explode_env_vars(self, field: ModelField, env_vars: Mapping[str, Optional[str]]) -> Dict[str, Any]:
  187. """
  188. Process env_vars and extract the values of keys containing env_nested_delimiter into nested dictionaries.
  189. This is applied to a single field, hence filtering by env_var prefix.
  190. """
  191. prefixes = [f'{env_name}{self.env_nested_delimiter}' for env_name in field.field_info.extra['env_names']]
  192. result: Dict[str, Any] = {}
  193. for env_name, env_val in env_vars.items():
  194. if not any(env_name.startswith(prefix) for prefix in prefixes):
  195. continue
  196. _, *keys, last_key = env_name.split(self.env_nested_delimiter)
  197. env_var = result
  198. for key in keys:
  199. env_var = env_var.setdefault(key, {})
  200. env_var[last_key] = env_val
  201. return result
  202. def __repr__(self) -> str:
  203. return (
  204. f'EnvSettingsSource(env_file={self.env_file!r}, env_file_encoding={self.env_file_encoding!r}, '
  205. f'env_nested_delimiter={self.env_nested_delimiter!r})'
  206. )
  207. class SecretsSettingsSource:
  208. __slots__ = ('secrets_dir',)
  209. def __init__(self, secrets_dir: Optional[StrPath]):
  210. self.secrets_dir: Optional[StrPath] = secrets_dir
  211. def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
  212. """
  213. Build fields from "secrets" files.
  214. """
  215. secrets: Dict[str, Optional[str]] = {}
  216. if self.secrets_dir is None:
  217. return secrets
  218. secrets_path = Path(self.secrets_dir).expanduser()
  219. if not secrets_path.exists():
  220. warnings.warn(f'directory "{secrets_path}" does not exist')
  221. return secrets
  222. if not secrets_path.is_dir():
  223. raise SettingsError(f'secrets_dir must reference a directory, not a {path_type(secrets_path)}')
  224. for field in settings.__fields__.values():
  225. for env_name in field.field_info.extra['env_names']:
  226. path = secrets_path / env_name
  227. if path.is_file():
  228. secret_value = path.read_text().strip()
  229. if field.is_complex():
  230. try:
  231. secret_value = settings.__config__.json_loads(secret_value)
  232. except ValueError as e:
  233. raise SettingsError(f'error parsing JSON for "{env_name}"') from e
  234. secrets[field.alias] = secret_value
  235. elif path.exists():
  236. warnings.warn(
  237. f'attempted to load secret file "{path}" but found a {path_type(path)} instead.',
  238. stacklevel=4,
  239. )
  240. return secrets
  241. def __repr__(self) -> str:
  242. return f'SecretsSettingsSource(secrets_dir={self.secrets_dir!r})'
  243. def read_env_file(
  244. file_path: StrPath, *, encoding: str = None, case_sensitive: bool = False
  245. ) -> Dict[str, Optional[str]]:
  246. try:
  247. from dotenv import dotenv_values
  248. except ImportError as e:
  249. raise ImportError('python-dotenv is not installed, run `pip install pydantic[dotenv]`') from e
  250. file_vars: Dict[str, Optional[str]] = dotenv_values(file_path, encoding=encoding or 'utf8')
  251. if not case_sensitive:
  252. return {k.lower(): v for k, v in file_vars.items()}
  253. else:
  254. return file_vars