You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1164 lines
47 KiB

  1. import re
  2. import warnings
  3. from collections import defaultdict
  4. from dataclasses import is_dataclass
  5. from datetime import date, datetime, time, timedelta
  6. from decimal import Decimal
  7. from enum import Enum
  8. from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
  9. from pathlib import Path
  10. from typing import (
  11. TYPE_CHECKING,
  12. Any,
  13. Callable,
  14. Dict,
  15. ForwardRef,
  16. FrozenSet,
  17. Generic,
  18. Iterable,
  19. List,
  20. Optional,
  21. Pattern,
  22. Sequence,
  23. Set,
  24. Tuple,
  25. Type,
  26. TypeVar,
  27. Union,
  28. cast,
  29. )
  30. from uuid import UUID
  31. from typing_extensions import Annotated, Literal
  32. from pydantic.v1.fields import (
  33. MAPPING_LIKE_SHAPES,
  34. SHAPE_DEQUE,
  35. SHAPE_FROZENSET,
  36. SHAPE_GENERIC,
  37. SHAPE_ITERABLE,
  38. SHAPE_LIST,
  39. SHAPE_SEQUENCE,
  40. SHAPE_SET,
  41. SHAPE_SINGLETON,
  42. SHAPE_TUPLE,
  43. SHAPE_TUPLE_ELLIPSIS,
  44. FieldInfo,
  45. ModelField,
  46. )
  47. from pydantic.v1.json import pydantic_encoder
  48. from pydantic.v1.networks import AnyUrl, EmailStr
  49. from pydantic.v1.types import (
  50. ConstrainedDecimal,
  51. ConstrainedFloat,
  52. ConstrainedFrozenSet,
  53. ConstrainedInt,
  54. ConstrainedList,
  55. ConstrainedSet,
  56. ConstrainedStr,
  57. SecretBytes,
  58. SecretStr,
  59. StrictBytes,
  60. StrictStr,
  61. conbytes,
  62. condecimal,
  63. confloat,
  64. confrozenset,
  65. conint,
  66. conlist,
  67. conset,
  68. constr,
  69. )
  70. from pydantic.v1.typing import (
  71. all_literal_values,
  72. get_args,
  73. get_origin,
  74. get_sub_types,
  75. is_callable_type,
  76. is_literal_type,
  77. is_namedtuple,
  78. is_none_type,
  79. is_union,
  80. )
  81. from pydantic.v1.utils import ROOT_KEY, get_model, lenient_issubclass
  82. if TYPE_CHECKING:
  83. from pydantic.v1.dataclasses import Dataclass
  84. from pydantic.v1.main import BaseModel
  85. default_prefix = '#/definitions/'
  86. default_ref_template = '#/definitions/{model}'
  87. TypeModelOrEnum = Union[Type['BaseModel'], Type[Enum]]
  88. TypeModelSet = Set[TypeModelOrEnum]
  89. def _apply_modify_schema(
  90. modify_schema: Callable[..., None], field: Optional[ModelField], field_schema: Dict[str, Any]
  91. ) -> None:
  92. from inspect import signature
  93. sig = signature(modify_schema)
  94. args = set(sig.parameters.keys())
  95. if 'field' in args or 'kwargs' in args:
  96. modify_schema(field_schema, field=field)
  97. else:
  98. modify_schema(field_schema)
  99. def schema(
  100. models: Sequence[Union[Type['BaseModel'], Type['Dataclass']]],
  101. *,
  102. by_alias: bool = True,
  103. title: Optional[str] = None,
  104. description: Optional[str] = None,
  105. ref_prefix: Optional[str] = None,
  106. ref_template: str = default_ref_template,
  107. ) -> Dict[str, Any]:
  108. """
  109. Process a list of models and generate a single JSON Schema with all of them defined in the ``definitions``
  110. top-level JSON key, including their sub-models.
  111. :param models: a list of models to include in the generated JSON Schema
  112. :param by_alias: generate the schemas using the aliases defined, if any
  113. :param title: title for the generated schema that includes the definitions
  114. :param description: description for the generated schema
  115. :param ref_prefix: the JSON Pointer prefix for schema references with ``$ref``, if None, will be set to the
  116. default of ``#/definitions/``. Update it if you want the schemas to reference the definitions somewhere
  117. else, e.g. for OpenAPI use ``#/components/schemas/``. The resulting generated schemas will still be at the
  118. top-level key ``definitions``, so you can extract them from there. But all the references will have the set
  119. prefix.
  120. :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful
  121. for references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For
  122. a sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``.
  123. :return: dict with the JSON Schema with a ``definitions`` top-level key including the schema definitions for
  124. the models and sub-models passed in ``models``.
  125. """
  126. clean_models = [get_model(model) for model in models]
  127. flat_models = get_flat_models_from_models(clean_models)
  128. model_name_map = get_model_name_map(flat_models)
  129. definitions = {}
  130. output_schema: Dict[str, Any] = {}
  131. if title:
  132. output_schema['title'] = title
  133. if description:
  134. output_schema['description'] = description
  135. for model in clean_models:
  136. m_schema, m_definitions, m_nested_models = model_process_schema(
  137. model,
  138. by_alias=by_alias,
  139. model_name_map=model_name_map,
  140. ref_prefix=ref_prefix,
  141. ref_template=ref_template,
  142. )
  143. definitions.update(m_definitions)
  144. model_name = model_name_map[model]
  145. definitions[model_name] = m_schema
  146. if definitions:
  147. output_schema['definitions'] = definitions
  148. return output_schema
  149. def model_schema(
  150. model: Union[Type['BaseModel'], Type['Dataclass']],
  151. by_alias: bool = True,
  152. ref_prefix: Optional[str] = None,
  153. ref_template: str = default_ref_template,
  154. ) -> Dict[str, Any]:
  155. """
  156. Generate a JSON Schema for one model. With all the sub-models defined in the ``definitions`` top-level
  157. JSON key.
  158. :param model: a Pydantic model (a class that inherits from BaseModel)
  159. :param by_alias: generate the schemas using the aliases defined, if any
  160. :param ref_prefix: the JSON Pointer prefix for schema references with ``$ref``, if None, will be set to the
  161. default of ``#/definitions/``. Update it if you want the schemas to reference the definitions somewhere
  162. else, e.g. for OpenAPI use ``#/components/schemas/``. The resulting generated schemas will still be at the
  163. top-level key ``definitions``, so you can extract them from there. But all the references will have the set
  164. prefix.
  165. :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful for
  166. references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For a
  167. sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``.
  168. :return: dict with the JSON Schema for the passed ``model``
  169. """
  170. model = get_model(model)
  171. flat_models = get_flat_models_from_model(model)
  172. model_name_map = get_model_name_map(flat_models)
  173. model_name = model_name_map[model]
  174. m_schema, m_definitions, nested_models = model_process_schema(
  175. model, by_alias=by_alias, model_name_map=model_name_map, ref_prefix=ref_prefix, ref_template=ref_template
  176. )
  177. if model_name in nested_models:
  178. # model_name is in Nested models, it has circular references
  179. m_definitions[model_name] = m_schema
  180. m_schema = get_schema_ref(model_name, ref_prefix, ref_template, False)
  181. if m_definitions:
  182. m_schema.update({'definitions': m_definitions})
  183. return m_schema
  184. def get_field_info_schema(field: ModelField, schema_overrides: bool = False) -> Tuple[Dict[str, Any], bool]:
  185. # If no title is explicitly set, we don't set title in the schema for enums.
  186. # The behaviour is the same as `BaseModel` reference, where the default title
  187. # is in the definitions part of the schema.
  188. schema_: Dict[str, Any] = {}
  189. if field.field_info.title or not lenient_issubclass(field.type_, Enum):
  190. schema_['title'] = field.field_info.title or field.alias.title().replace('_', ' ')
  191. if field.field_info.title:
  192. schema_overrides = True
  193. if field.field_info.description:
  194. schema_['description'] = field.field_info.description
  195. schema_overrides = True
  196. if not field.required and field.default is not None and not is_callable_type(field.outer_type_):
  197. schema_['default'] = encode_default(field.default)
  198. schema_overrides = True
  199. return schema_, schema_overrides
  200. def field_schema(
  201. field: ModelField,
  202. *,
  203. by_alias: bool = True,
  204. model_name_map: Dict[TypeModelOrEnum, str],
  205. ref_prefix: Optional[str] = None,
  206. ref_template: str = default_ref_template,
  207. known_models: Optional[TypeModelSet] = None,
  208. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  209. """
  210. Process a Pydantic field and return a tuple with a JSON Schema for it as the first item.
  211. Also return a dictionary of definitions with models as keys and their schemas as values. If the passed field
  212. is a model and has sub-models, and those sub-models don't have overrides (as ``title``, ``default``, etc), they
  213. will be included in the definitions and referenced in the schema instead of included recursively.
  214. :param field: a Pydantic ``ModelField``
  215. :param by_alias: use the defined alias (if any) in the returned schema
  216. :param model_name_map: used to generate the JSON Schema references to other models included in the definitions
  217. :param ref_prefix: the JSON Pointer prefix to use for references to other schemas, if None, the default of
  218. #/definitions/ will be used
  219. :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful for
  220. references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For a
  221. sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``.
  222. :param known_models: used to solve circular references
  223. :return: tuple of the schema for this field and additional definitions
  224. """
  225. s, schema_overrides = get_field_info_schema(field)
  226. validation_schema = get_field_schema_validations(field)
  227. if validation_schema:
  228. s.update(validation_schema)
  229. schema_overrides = True
  230. f_schema, f_definitions, f_nested_models = field_type_schema(
  231. field,
  232. by_alias=by_alias,
  233. model_name_map=model_name_map,
  234. schema_overrides=schema_overrides,
  235. ref_prefix=ref_prefix,
  236. ref_template=ref_template,
  237. known_models=known_models or set(),
  238. )
  239. # $ref will only be returned when there are no schema_overrides
  240. if '$ref' in f_schema:
  241. return f_schema, f_definitions, f_nested_models
  242. else:
  243. s.update(f_schema)
  244. return s, f_definitions, f_nested_models
  245. numeric_types = (int, float, Decimal)
  246. _str_types_attrs: Tuple[Tuple[str, Union[type, Tuple[type, ...]], str], ...] = (
  247. ('max_length', numeric_types, 'maxLength'),
  248. ('min_length', numeric_types, 'minLength'),
  249. ('regex', str, 'pattern'),
  250. )
  251. _numeric_types_attrs: Tuple[Tuple[str, Union[type, Tuple[type, ...]], str], ...] = (
  252. ('gt', numeric_types, 'exclusiveMinimum'),
  253. ('lt', numeric_types, 'exclusiveMaximum'),
  254. ('ge', numeric_types, 'minimum'),
  255. ('le', numeric_types, 'maximum'),
  256. ('multiple_of', numeric_types, 'multipleOf'),
  257. )
  258. def get_field_schema_validations(field: ModelField) -> Dict[str, Any]:
  259. """
  260. Get the JSON Schema validation keywords for a ``field`` with an annotation of
  261. a Pydantic ``FieldInfo`` with validation arguments.
  262. """
  263. f_schema: Dict[str, Any] = {}
  264. if lenient_issubclass(field.type_, Enum):
  265. # schema is already updated by `enum_process_schema`; just update with field extra
  266. if field.field_info.extra:
  267. f_schema.update(field.field_info.extra)
  268. return f_schema
  269. if lenient_issubclass(field.type_, (str, bytes)):
  270. for attr_name, t, keyword in _str_types_attrs:
  271. attr = getattr(field.field_info, attr_name, None)
  272. if isinstance(attr, t):
  273. f_schema[keyword] = attr
  274. if lenient_issubclass(field.type_, numeric_types) and not issubclass(field.type_, bool):
  275. for attr_name, t, keyword in _numeric_types_attrs:
  276. attr = getattr(field.field_info, attr_name, None)
  277. if isinstance(attr, t):
  278. f_schema[keyword] = attr
  279. if field.field_info is not None and field.field_info.const:
  280. f_schema['const'] = field.default
  281. if field.field_info.extra:
  282. f_schema.update(field.field_info.extra)
  283. modify_schema = getattr(field.outer_type_, '__modify_schema__', None)
  284. if modify_schema:
  285. _apply_modify_schema(modify_schema, field, f_schema)
  286. return f_schema
  287. def get_model_name_map(unique_models: TypeModelSet) -> Dict[TypeModelOrEnum, str]:
  288. """
  289. Process a set of models and generate unique names for them to be used as keys in the JSON Schema
  290. definitions. By default the names are the same as the class name. But if two models in different Python
  291. modules have the same name (e.g. "users.Model" and "items.Model"), the generated names will be
  292. based on the Python module path for those conflicting models to prevent name collisions.
  293. :param unique_models: a Python set of models
  294. :return: dict mapping models to names
  295. """
  296. name_model_map = {}
  297. conflicting_names: Set[str] = set()
  298. for model in unique_models:
  299. model_name = normalize_name(model.__name__)
  300. if model_name in conflicting_names:
  301. model_name = get_long_model_name(model)
  302. name_model_map[model_name] = model
  303. elif model_name in name_model_map:
  304. conflicting_names.add(model_name)
  305. conflicting_model = name_model_map.pop(model_name)
  306. name_model_map[get_long_model_name(conflicting_model)] = conflicting_model
  307. name_model_map[get_long_model_name(model)] = model
  308. else:
  309. name_model_map[model_name] = model
  310. return {v: k for k, v in name_model_map.items()}
  311. def get_flat_models_from_model(model: Type['BaseModel'], known_models: Optional[TypeModelSet] = None) -> TypeModelSet:
  312. """
  313. Take a single ``model`` and generate a set with itself and all the sub-models in the tree. I.e. if you pass
  314. model ``Foo`` (subclass of Pydantic ``BaseModel``) as ``model``, and it has a field of type ``Bar`` (also
  315. subclass of ``BaseModel``) and that model ``Bar`` has a field of type ``Baz`` (also subclass of ``BaseModel``),
  316. the return value will be ``set([Foo, Bar, Baz])``.
  317. :param model: a Pydantic ``BaseModel`` subclass
  318. :param known_models: used to solve circular references
  319. :return: a set with the initial model and all its sub-models
  320. """
  321. known_models = known_models or set()
  322. flat_models: TypeModelSet = set()
  323. flat_models.add(model)
  324. known_models |= flat_models
  325. fields = cast(Sequence[ModelField], model.__fields__.values())
  326. flat_models |= get_flat_models_from_fields(fields, known_models=known_models)
  327. return flat_models
  328. def get_flat_models_from_field(field: ModelField, known_models: TypeModelSet) -> TypeModelSet:
  329. """
  330. Take a single Pydantic ``ModelField`` (from a model) that could have been declared as a subclass of BaseModel
  331. (so, it could be a submodel), and generate a set with its model and all the sub-models in the tree.
  332. I.e. if you pass a field that was declared to be of type ``Foo`` (subclass of BaseModel) as ``field``, and that
  333. model ``Foo`` has a field of type ``Bar`` (also subclass of ``BaseModel``) and that model ``Bar`` has a field of
  334. type ``Baz`` (also subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``.
  335. :param field: a Pydantic ``ModelField``
  336. :param known_models: used to solve circular references
  337. :return: a set with the model used in the declaration for this field, if any, and all its sub-models
  338. """
  339. from pydantic.v1.main import BaseModel
  340. flat_models: TypeModelSet = set()
  341. field_type = field.type_
  342. if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel):
  343. field_type = field_type.__pydantic_model__
  344. if field.sub_fields and not lenient_issubclass(field_type, BaseModel):
  345. flat_models |= get_flat_models_from_fields(field.sub_fields, known_models=known_models)
  346. elif lenient_issubclass(field_type, BaseModel) and field_type not in known_models:
  347. flat_models |= get_flat_models_from_model(field_type, known_models=known_models)
  348. elif lenient_issubclass(field_type, Enum):
  349. flat_models.add(field_type)
  350. return flat_models
  351. def get_flat_models_from_fields(fields: Sequence[ModelField], known_models: TypeModelSet) -> TypeModelSet:
  352. """
  353. Take a list of Pydantic ``ModelField``s (from a model) that could have been declared as subclasses of ``BaseModel``
  354. (so, any of them could be a submodel), and generate a set with their models and all the sub-models in the tree.
  355. I.e. if you pass a the fields of a model ``Foo`` (subclass of ``BaseModel``) as ``fields``, and on of them has a
  356. field of type ``Bar`` (also subclass of ``BaseModel``) and that model ``Bar`` has a field of type ``Baz`` (also
  357. subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``.
  358. :param fields: a list of Pydantic ``ModelField``s
  359. :param known_models: used to solve circular references
  360. :return: a set with any model declared in the fields, and all their sub-models
  361. """
  362. flat_models: TypeModelSet = set()
  363. for field in fields:
  364. flat_models |= get_flat_models_from_field(field, known_models=known_models)
  365. return flat_models
  366. def get_flat_models_from_models(models: Sequence[Type['BaseModel']]) -> TypeModelSet:
  367. """
  368. Take a list of ``models`` and generate a set with them and all their sub-models in their trees. I.e. if you pass
  369. a list of two models, ``Foo`` and ``Bar``, both subclasses of Pydantic ``BaseModel`` as models, and ``Bar`` has
  370. a field of type ``Baz`` (also subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``.
  371. """
  372. flat_models: TypeModelSet = set()
  373. for model in models:
  374. flat_models |= get_flat_models_from_model(model)
  375. return flat_models
  376. def get_long_model_name(model: TypeModelOrEnum) -> str:
  377. return f'{model.__module__}__{model.__qualname__}'.replace('.', '__')
  378. def field_type_schema(
  379. field: ModelField,
  380. *,
  381. by_alias: bool,
  382. model_name_map: Dict[TypeModelOrEnum, str],
  383. ref_template: str,
  384. schema_overrides: bool = False,
  385. ref_prefix: Optional[str] = None,
  386. known_models: TypeModelSet,
  387. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  388. """
  389. Used by ``field_schema()``, you probably should be using that function.
  390. Take a single ``field`` and generate the schema for its type only, not including additional
  391. information as title, etc. Also return additional schema definitions, from sub-models.
  392. """
  393. from pydantic.v1.main import BaseModel # noqa: F811
  394. definitions = {}
  395. nested_models: Set[str] = set()
  396. f_schema: Dict[str, Any]
  397. if field.shape in {
  398. SHAPE_LIST,
  399. SHAPE_TUPLE_ELLIPSIS,
  400. SHAPE_SEQUENCE,
  401. SHAPE_SET,
  402. SHAPE_FROZENSET,
  403. SHAPE_ITERABLE,
  404. SHAPE_DEQUE,
  405. }:
  406. items_schema, f_definitions, f_nested_models = field_singleton_schema(
  407. field,
  408. by_alias=by_alias,
  409. model_name_map=model_name_map,
  410. ref_prefix=ref_prefix,
  411. ref_template=ref_template,
  412. known_models=known_models,
  413. )
  414. definitions.update(f_definitions)
  415. nested_models.update(f_nested_models)
  416. f_schema = {'type': 'array', 'items': items_schema}
  417. if field.shape in {SHAPE_SET, SHAPE_FROZENSET}:
  418. f_schema['uniqueItems'] = True
  419. elif field.shape in MAPPING_LIKE_SHAPES:
  420. f_schema = {'type': 'object'}
  421. key_field = cast(ModelField, field.key_field)
  422. regex = getattr(key_field.type_, 'regex', None)
  423. items_schema, f_definitions, f_nested_models = field_singleton_schema(
  424. field,
  425. by_alias=by_alias,
  426. model_name_map=model_name_map,
  427. ref_prefix=ref_prefix,
  428. ref_template=ref_template,
  429. known_models=known_models,
  430. )
  431. definitions.update(f_definitions)
  432. nested_models.update(f_nested_models)
  433. if regex:
  434. # Dict keys have a regex pattern
  435. # items_schema might be a schema or empty dict, add it either way
  436. f_schema['patternProperties'] = {ConstrainedStr._get_pattern(regex): items_schema}
  437. if items_schema:
  438. # The dict values are not simply Any, so they need a schema
  439. f_schema['additionalProperties'] = items_schema
  440. elif field.shape == SHAPE_TUPLE or (field.shape == SHAPE_GENERIC and not issubclass(field.type_, BaseModel)):
  441. sub_schema = []
  442. sub_fields = cast(List[ModelField], field.sub_fields)
  443. for sf in sub_fields:
  444. sf_schema, sf_definitions, sf_nested_models = field_type_schema(
  445. sf,
  446. by_alias=by_alias,
  447. model_name_map=model_name_map,
  448. ref_prefix=ref_prefix,
  449. ref_template=ref_template,
  450. known_models=known_models,
  451. )
  452. definitions.update(sf_definitions)
  453. nested_models.update(sf_nested_models)
  454. sub_schema.append(sf_schema)
  455. sub_fields_len = len(sub_fields)
  456. if field.shape == SHAPE_GENERIC:
  457. all_of_schemas = sub_schema[0] if sub_fields_len == 1 else {'type': 'array', 'items': sub_schema}
  458. f_schema = {'allOf': [all_of_schemas]}
  459. else:
  460. f_schema = {
  461. 'type': 'array',
  462. 'minItems': sub_fields_len,
  463. 'maxItems': sub_fields_len,
  464. }
  465. if sub_fields_len >= 1:
  466. f_schema['items'] = sub_schema
  467. else:
  468. assert field.shape in {SHAPE_SINGLETON, SHAPE_GENERIC}, field.shape
  469. f_schema, f_definitions, f_nested_models = field_singleton_schema(
  470. field,
  471. by_alias=by_alias,
  472. model_name_map=model_name_map,
  473. schema_overrides=schema_overrides,
  474. ref_prefix=ref_prefix,
  475. ref_template=ref_template,
  476. known_models=known_models,
  477. )
  478. definitions.update(f_definitions)
  479. nested_models.update(f_nested_models)
  480. # check field type to avoid repeated calls to the same __modify_schema__ method
  481. if field.type_ != field.outer_type_:
  482. if field.shape == SHAPE_GENERIC:
  483. field_type = field.type_
  484. else:
  485. field_type = field.outer_type_
  486. modify_schema = getattr(field_type, '__modify_schema__', None)
  487. if modify_schema:
  488. _apply_modify_schema(modify_schema, field, f_schema)
  489. return f_schema, definitions, nested_models
  490. def model_process_schema(
  491. model: TypeModelOrEnum,
  492. *,
  493. by_alias: bool = True,
  494. model_name_map: Dict[TypeModelOrEnum, str],
  495. ref_prefix: Optional[str] = None,
  496. ref_template: str = default_ref_template,
  497. known_models: Optional[TypeModelSet] = None,
  498. field: Optional[ModelField] = None,
  499. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  500. """
  501. Used by ``model_schema()``, you probably should be using that function.
  502. Take a single ``model`` and generate its schema. Also return additional schema definitions, from sub-models. The
  503. sub-models of the returned schema will be referenced, but their definitions will not be included in the schema. All
  504. the definitions are returned as the second value.
  505. """
  506. from inspect import getdoc, signature
  507. known_models = known_models or set()
  508. if lenient_issubclass(model, Enum):
  509. model = cast(Type[Enum], model)
  510. s = enum_process_schema(model, field=field)
  511. return s, {}, set()
  512. model = cast(Type['BaseModel'], model)
  513. s = {'title': model.__config__.title or model.__name__}
  514. doc = getdoc(model)
  515. if doc:
  516. s['description'] = doc
  517. known_models.add(model)
  518. m_schema, m_definitions, nested_models = model_type_schema(
  519. model,
  520. by_alias=by_alias,
  521. model_name_map=model_name_map,
  522. ref_prefix=ref_prefix,
  523. ref_template=ref_template,
  524. known_models=known_models,
  525. )
  526. s.update(m_schema)
  527. schema_extra = model.__config__.schema_extra
  528. if callable(schema_extra):
  529. if len(signature(schema_extra).parameters) == 1:
  530. schema_extra(s)
  531. else:
  532. schema_extra(s, model)
  533. else:
  534. s.update(schema_extra)
  535. return s, m_definitions, nested_models
  536. def model_type_schema(
  537. model: Type['BaseModel'],
  538. *,
  539. by_alias: bool,
  540. model_name_map: Dict[TypeModelOrEnum, str],
  541. ref_template: str,
  542. ref_prefix: Optional[str] = None,
  543. known_models: TypeModelSet,
  544. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  545. """
  546. You probably should be using ``model_schema()``, this function is indirectly used by that function.
  547. Take a single ``model`` and generate the schema for its type only, not including additional
  548. information as title, etc. Also return additional schema definitions, from sub-models.
  549. """
  550. properties = {}
  551. required = []
  552. definitions: Dict[str, Any] = {}
  553. nested_models: Set[str] = set()
  554. for k, f in model.__fields__.items():
  555. try:
  556. f_schema, f_definitions, f_nested_models = field_schema(
  557. f,
  558. by_alias=by_alias,
  559. model_name_map=model_name_map,
  560. ref_prefix=ref_prefix,
  561. ref_template=ref_template,
  562. known_models=known_models,
  563. )
  564. except SkipField as skip:
  565. warnings.warn(skip.message, UserWarning)
  566. continue
  567. definitions.update(f_definitions)
  568. nested_models.update(f_nested_models)
  569. if by_alias:
  570. properties[f.alias] = f_schema
  571. if f.required:
  572. required.append(f.alias)
  573. else:
  574. properties[k] = f_schema
  575. if f.required:
  576. required.append(k)
  577. if ROOT_KEY in properties:
  578. out_schema = properties[ROOT_KEY]
  579. out_schema['title'] = model.__config__.title or model.__name__
  580. else:
  581. out_schema = {'type': 'object', 'properties': properties}
  582. if required:
  583. out_schema['required'] = required
  584. if model.__config__.extra == 'forbid':
  585. out_schema['additionalProperties'] = False
  586. return out_schema, definitions, nested_models
  587. def enum_process_schema(enum: Type[Enum], *, field: Optional[ModelField] = None) -> Dict[str, Any]:
  588. """
  589. Take a single `enum` and generate its schema.
  590. This is similar to the `model_process_schema` function, but applies to ``Enum`` objects.
  591. """
  592. import inspect
  593. schema_: Dict[str, Any] = {
  594. 'title': enum.__name__,
  595. # Python assigns all enums a default docstring value of 'An enumeration', so
  596. # all enums will have a description field even if not explicitly provided.
  597. 'description': inspect.cleandoc(enum.__doc__ or 'An enumeration.'),
  598. # Add enum values and the enum field type to the schema.
  599. 'enum': [item.value for item in cast(Iterable[Enum], enum)],
  600. }
  601. add_field_type_to_schema(enum, schema_)
  602. modify_schema = getattr(enum, '__modify_schema__', None)
  603. if modify_schema:
  604. _apply_modify_schema(modify_schema, field, schema_)
  605. return schema_
  606. def field_singleton_sub_fields_schema(
  607. field: ModelField,
  608. *,
  609. by_alias: bool,
  610. model_name_map: Dict[TypeModelOrEnum, str],
  611. ref_template: str,
  612. schema_overrides: bool = False,
  613. ref_prefix: Optional[str] = None,
  614. known_models: TypeModelSet,
  615. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  616. """
  617. This function is indirectly used by ``field_schema()``, you probably should be using that function.
  618. Take a list of Pydantic ``ModelField`` from the declaration of a type with parameters, and generate their
  619. schema. I.e., fields used as "type parameters", like ``str`` and ``int`` in ``Tuple[str, int]``.
  620. """
  621. sub_fields = cast(List[ModelField], field.sub_fields)
  622. definitions = {}
  623. nested_models: Set[str] = set()
  624. if len(sub_fields) == 1:
  625. return field_type_schema(
  626. sub_fields[0],
  627. by_alias=by_alias,
  628. model_name_map=model_name_map,
  629. schema_overrides=schema_overrides,
  630. ref_prefix=ref_prefix,
  631. ref_template=ref_template,
  632. known_models=known_models,
  633. )
  634. else:
  635. s: Dict[str, Any] = {}
  636. # https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#discriminator-object
  637. field_has_discriminator: bool = field.discriminator_key is not None
  638. if field_has_discriminator:
  639. assert field.sub_fields_mapping is not None
  640. discriminator_models_refs: Dict[str, Union[str, Dict[str, Any]]] = {}
  641. for discriminator_value, sub_field in field.sub_fields_mapping.items():
  642. if isinstance(discriminator_value, Enum):
  643. discriminator_value = str(discriminator_value.value)
  644. # sub_field is either a `BaseModel` or directly an `Annotated` `Union` of many
  645. if is_union(get_origin(sub_field.type_)):
  646. sub_models = get_sub_types(sub_field.type_)
  647. discriminator_models_refs[discriminator_value] = {
  648. model_name_map[sub_model]: get_schema_ref(
  649. model_name_map[sub_model], ref_prefix, ref_template, False
  650. )
  651. for sub_model in sub_models
  652. }
  653. else:
  654. sub_field_type = sub_field.type_
  655. if hasattr(sub_field_type, '__pydantic_model__'):
  656. sub_field_type = sub_field_type.__pydantic_model__
  657. discriminator_model_name = model_name_map[sub_field_type]
  658. discriminator_model_ref = get_schema_ref(discriminator_model_name, ref_prefix, ref_template, False)
  659. discriminator_models_refs[discriminator_value] = discriminator_model_ref['$ref']
  660. s['discriminator'] = {
  661. 'propertyName': field.discriminator_alias if by_alias else field.discriminator_key,
  662. 'mapping': discriminator_models_refs,
  663. }
  664. sub_field_schemas = []
  665. for sf in sub_fields:
  666. sub_schema, sub_definitions, sub_nested_models = field_type_schema(
  667. sf,
  668. by_alias=by_alias,
  669. model_name_map=model_name_map,
  670. schema_overrides=schema_overrides,
  671. ref_prefix=ref_prefix,
  672. ref_template=ref_template,
  673. known_models=known_models,
  674. )
  675. definitions.update(sub_definitions)
  676. if schema_overrides and 'allOf' in sub_schema:
  677. # if the sub_field is a referenced schema we only need the referenced
  678. # object. Otherwise we will end up with several allOf inside anyOf/oneOf.
  679. # See https://github.com/pydantic/pydantic/issues/1209
  680. sub_schema = sub_schema['allOf'][0]
  681. if sub_schema.keys() == {'discriminator', 'oneOf'}:
  682. # we don't want discriminator information inside oneOf choices, this is dealt with elsewhere
  683. sub_schema.pop('discriminator')
  684. sub_field_schemas.append(sub_schema)
  685. nested_models.update(sub_nested_models)
  686. s['oneOf' if field_has_discriminator else 'anyOf'] = sub_field_schemas
  687. return s, definitions, nested_models
  688. # Order is important, e.g. subclasses of str must go before str
  689. # this is used only for standard library types, custom types should use __modify_schema__ instead
  690. field_class_to_schema: Tuple[Tuple[Any, Dict[str, Any]], ...] = (
  691. (Path, {'type': 'string', 'format': 'path'}),
  692. (datetime, {'type': 'string', 'format': 'date-time'}),
  693. (date, {'type': 'string', 'format': 'date'}),
  694. (time, {'type': 'string', 'format': 'time'}),
  695. (timedelta, {'type': 'number', 'format': 'time-delta'}),
  696. (IPv4Network, {'type': 'string', 'format': 'ipv4network'}),
  697. (IPv6Network, {'type': 'string', 'format': 'ipv6network'}),
  698. (IPv4Interface, {'type': 'string', 'format': 'ipv4interface'}),
  699. (IPv6Interface, {'type': 'string', 'format': 'ipv6interface'}),
  700. (IPv4Address, {'type': 'string', 'format': 'ipv4'}),
  701. (IPv6Address, {'type': 'string', 'format': 'ipv6'}),
  702. (Pattern, {'type': 'string', 'format': 'regex'}),
  703. (str, {'type': 'string'}),
  704. (bytes, {'type': 'string', 'format': 'binary'}),
  705. (bool, {'type': 'boolean'}),
  706. (int, {'type': 'integer'}),
  707. (float, {'type': 'number'}),
  708. (Decimal, {'type': 'number'}),
  709. (UUID, {'type': 'string', 'format': 'uuid'}),
  710. (dict, {'type': 'object'}),
  711. (list, {'type': 'array', 'items': {}}),
  712. (tuple, {'type': 'array', 'items': {}}),
  713. (set, {'type': 'array', 'items': {}, 'uniqueItems': True}),
  714. (frozenset, {'type': 'array', 'items': {}, 'uniqueItems': True}),
  715. )
  716. json_scheme = {'type': 'string', 'format': 'json-string'}
  717. def add_field_type_to_schema(field_type: Any, schema_: Dict[str, Any]) -> None:
  718. """
  719. Update the given `schema` with the type-specific metadata for the given `field_type`.
  720. This function looks through `field_class_to_schema` for a class that matches the given `field_type`,
  721. and then modifies the given `schema` with the information from that type.
  722. """
  723. for type_, t_schema in field_class_to_schema:
  724. # Fallback for `typing.Pattern` and `re.Pattern` as they are not a valid class
  725. if lenient_issubclass(field_type, type_) or field_type is type_ is Pattern:
  726. schema_.update(t_schema)
  727. break
  728. def get_schema_ref(name: str, ref_prefix: Optional[str], ref_template: str, schema_overrides: bool) -> Dict[str, Any]:
  729. if ref_prefix:
  730. schema_ref = {'$ref': ref_prefix + name}
  731. else:
  732. schema_ref = {'$ref': ref_template.format(model=name)}
  733. return {'allOf': [schema_ref]} if schema_overrides else schema_ref
  734. def field_singleton_schema( # noqa: C901 (ignore complexity)
  735. field: ModelField,
  736. *,
  737. by_alias: bool,
  738. model_name_map: Dict[TypeModelOrEnum, str],
  739. ref_template: str,
  740. schema_overrides: bool = False,
  741. ref_prefix: Optional[str] = None,
  742. known_models: TypeModelSet,
  743. ) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]:
  744. """
  745. This function is indirectly used by ``field_schema()``, you should probably be using that function.
  746. Take a single Pydantic ``ModelField``, and return its schema and any additional definitions from sub-models.
  747. """
  748. from pydantic.v1.main import BaseModel
  749. definitions: Dict[str, Any] = {}
  750. nested_models: Set[str] = set()
  751. field_type = field.type_
  752. # Recurse into this field if it contains sub_fields and is NOT a
  753. # BaseModel OR that BaseModel is a const
  754. if field.sub_fields and (
  755. (field.field_info and field.field_info.const) or not lenient_issubclass(field_type, BaseModel)
  756. ):
  757. return field_singleton_sub_fields_schema(
  758. field,
  759. by_alias=by_alias,
  760. model_name_map=model_name_map,
  761. schema_overrides=schema_overrides,
  762. ref_prefix=ref_prefix,
  763. ref_template=ref_template,
  764. known_models=known_models,
  765. )
  766. if field_type is Any or field_type is object or field_type.__class__ == TypeVar or get_origin(field_type) is type:
  767. return {}, definitions, nested_models # no restrictions
  768. if is_none_type(field_type):
  769. return {'type': 'null'}, definitions, nested_models
  770. if is_callable_type(field_type):
  771. raise SkipField(f'Callable {field.name} was excluded from schema since JSON schema has no equivalent type.')
  772. f_schema: Dict[str, Any] = {}
  773. if field.field_info is not None and field.field_info.const:
  774. f_schema['const'] = field.default
  775. if is_literal_type(field_type):
  776. values = tuple(x.value if isinstance(x, Enum) else x for x in all_literal_values(field_type))
  777. if len({v.__class__ for v in values}) > 1:
  778. return field_schema(
  779. multitypes_literal_field_for_schema(values, field),
  780. by_alias=by_alias,
  781. model_name_map=model_name_map,
  782. ref_prefix=ref_prefix,
  783. ref_template=ref_template,
  784. known_models=known_models,
  785. )
  786. # All values have the same type
  787. field_type = values[0].__class__
  788. f_schema['enum'] = list(values)
  789. add_field_type_to_schema(field_type, f_schema)
  790. elif lenient_issubclass(field_type, Enum):
  791. enum_name = model_name_map[field_type]
  792. f_schema, schema_overrides = get_field_info_schema(field, schema_overrides)
  793. f_schema.update(get_schema_ref(enum_name, ref_prefix, ref_template, schema_overrides))
  794. definitions[enum_name] = enum_process_schema(field_type, field=field)
  795. elif is_namedtuple(field_type):
  796. sub_schema, *_ = model_process_schema(
  797. field_type.__pydantic_model__,
  798. by_alias=by_alias,
  799. model_name_map=model_name_map,
  800. ref_prefix=ref_prefix,
  801. ref_template=ref_template,
  802. known_models=known_models,
  803. field=field,
  804. )
  805. items_schemas = list(sub_schema['properties'].values())
  806. f_schema.update(
  807. {
  808. 'type': 'array',
  809. 'items': items_schemas,
  810. 'minItems': len(items_schemas),
  811. 'maxItems': len(items_schemas),
  812. }
  813. )
  814. elif not hasattr(field_type, '__pydantic_model__'):
  815. add_field_type_to_schema(field_type, f_schema)
  816. modify_schema = getattr(field_type, '__modify_schema__', None)
  817. if modify_schema:
  818. _apply_modify_schema(modify_schema, field, f_schema)
  819. if f_schema:
  820. return f_schema, definitions, nested_models
  821. # Handle dataclass-based models
  822. if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel):
  823. field_type = field_type.__pydantic_model__
  824. if issubclass(field_type, BaseModel):
  825. model_name = model_name_map[field_type]
  826. if field_type not in known_models:
  827. sub_schema, sub_definitions, sub_nested_models = model_process_schema(
  828. field_type,
  829. by_alias=by_alias,
  830. model_name_map=model_name_map,
  831. ref_prefix=ref_prefix,
  832. ref_template=ref_template,
  833. known_models=known_models,
  834. field=field,
  835. )
  836. definitions.update(sub_definitions)
  837. definitions[model_name] = sub_schema
  838. nested_models.update(sub_nested_models)
  839. else:
  840. nested_models.add(model_name)
  841. schema_ref = get_schema_ref(model_name, ref_prefix, ref_template, schema_overrides)
  842. return schema_ref, definitions, nested_models
  843. # For generics with no args
  844. args = get_args(field_type)
  845. if args is not None and not args and Generic in field_type.__bases__:
  846. return f_schema, definitions, nested_models
  847. raise ValueError(f'Value not declarable with JSON Schema, field: {field}')
  848. def multitypes_literal_field_for_schema(values: Tuple[Any, ...], field: ModelField) -> ModelField:
  849. """
  850. To support `Literal` with values of different types, we split it into multiple `Literal` with same type
  851. e.g. `Literal['qwe', 'asd', 1, 2]` becomes `Union[Literal['qwe', 'asd'], Literal[1, 2]]`
  852. """
  853. literal_distinct_types = defaultdict(list)
  854. for v in values:
  855. literal_distinct_types[v.__class__].append(v)
  856. distinct_literals = (Literal[tuple(same_type_values)] for same_type_values in literal_distinct_types.values())
  857. return ModelField(
  858. name=field.name,
  859. type_=Union[tuple(distinct_literals)], # type: ignore
  860. class_validators=field.class_validators,
  861. model_config=field.model_config,
  862. default=field.default,
  863. required=field.required,
  864. alias=field.alias,
  865. field_info=field.field_info,
  866. )
  867. def encode_default(dft: Any) -> Any:
  868. from pydantic.v1.main import BaseModel
  869. if isinstance(dft, BaseModel) or is_dataclass(dft):
  870. dft = cast('dict[str, Any]', pydantic_encoder(dft))
  871. if isinstance(dft, dict):
  872. return {encode_default(k): encode_default(v) for k, v in dft.items()}
  873. elif isinstance(dft, Enum):
  874. return dft.value
  875. elif isinstance(dft, (int, float, str)):
  876. return dft
  877. elif isinstance(dft, (list, tuple)):
  878. t = dft.__class__
  879. seq_args = (encode_default(v) for v in dft)
  880. return t(*seq_args) if is_namedtuple(t) else t(seq_args)
  881. elif dft is None:
  882. return None
  883. else:
  884. return pydantic_encoder(dft)
  885. _map_types_constraint: Dict[Any, Callable[..., type]] = {int: conint, float: confloat, Decimal: condecimal}
  886. def get_annotation_from_field_info(
  887. annotation: Any, field_info: FieldInfo, field_name: str, validate_assignment: bool = False
  888. ) -> Type[Any]:
  889. """
  890. Get an annotation with validation implemented for numbers and strings based on the field_info.
  891. :param annotation: an annotation from a field specification, as ``str``, ``ConstrainedStr``
  892. :param field_info: an instance of FieldInfo, possibly with declarations for validations and JSON Schema
  893. :param field_name: name of the field for use in error messages
  894. :param validate_assignment: default False, flag for BaseModel Config value of validate_assignment
  895. :return: the same ``annotation`` if unmodified or a new annotation with validation in place
  896. """
  897. constraints = field_info.get_constraints()
  898. used_constraints: Set[str] = set()
  899. if constraints:
  900. annotation, used_constraints = get_annotation_with_constraints(annotation, field_info)
  901. if validate_assignment:
  902. used_constraints.add('allow_mutation')
  903. unused_constraints = constraints - used_constraints
  904. if unused_constraints:
  905. raise ValueError(
  906. f'On field "{field_name}" the following field constraints are set but not enforced: '
  907. f'{", ".join(unused_constraints)}. '
  908. f'\nFor more details see https://docs.pydantic.dev/usage/schema/#unenforced-field-constraints'
  909. )
  910. return annotation
  911. def get_annotation_with_constraints(annotation: Any, field_info: FieldInfo) -> Tuple[Type[Any], Set[str]]: # noqa: C901
  912. """
  913. Get an annotation with used constraints implemented for numbers and strings based on the field_info.
  914. :param annotation: an annotation from a field specification, as ``str``, ``ConstrainedStr``
  915. :param field_info: an instance of FieldInfo, possibly with declarations for validations and JSON Schema
  916. :return: the same ``annotation`` if unmodified or a new annotation along with the used constraints.
  917. """
  918. used_constraints: Set[str] = set()
  919. def go(type_: Any) -> Type[Any]:
  920. if (
  921. is_literal_type(type_)
  922. or isinstance(type_, ForwardRef)
  923. or lenient_issubclass(type_, (ConstrainedList, ConstrainedSet, ConstrainedFrozenSet))
  924. ):
  925. return type_
  926. origin = get_origin(type_)
  927. if origin is not None:
  928. args: Tuple[Any, ...] = get_args(type_)
  929. if any(isinstance(a, ForwardRef) for a in args):
  930. # forward refs cause infinite recursion below
  931. return type_
  932. if origin is Annotated:
  933. return go(args[0])
  934. if is_union(origin):
  935. return Union[tuple(go(a) for a in args)] # type: ignore
  936. if issubclass(origin, List) and (
  937. field_info.min_items is not None
  938. or field_info.max_items is not None
  939. or field_info.unique_items is not None
  940. ):
  941. used_constraints.update({'min_items', 'max_items', 'unique_items'})
  942. return conlist(
  943. go(args[0]),
  944. min_items=field_info.min_items,
  945. max_items=field_info.max_items,
  946. unique_items=field_info.unique_items,
  947. )
  948. if issubclass(origin, Set) and (field_info.min_items is not None or field_info.max_items is not None):
  949. used_constraints.update({'min_items', 'max_items'})
  950. return conset(go(args[0]), min_items=field_info.min_items, max_items=field_info.max_items)
  951. if issubclass(origin, FrozenSet) and (field_info.min_items is not None or field_info.max_items is not None):
  952. used_constraints.update({'min_items', 'max_items'})
  953. return confrozenset(go(args[0]), min_items=field_info.min_items, max_items=field_info.max_items)
  954. for t in (Tuple, List, Set, FrozenSet, Sequence):
  955. if issubclass(origin, t): # type: ignore
  956. return t[tuple(go(a) for a in args)] # type: ignore
  957. if issubclass(origin, Dict):
  958. return Dict[args[0], go(args[1])] # type: ignore
  959. attrs: Optional[Tuple[str, ...]] = None
  960. constraint_func: Optional[Callable[..., type]] = None
  961. if isinstance(type_, type):
  962. if issubclass(type_, (SecretStr, SecretBytes)):
  963. attrs = ('max_length', 'min_length')
  964. def constraint_func(**kw: Any) -> Type[Any]: # noqa: F811
  965. return type(type_.__name__, (type_,), kw)
  966. elif issubclass(type_, str) and not issubclass(type_, (EmailStr, AnyUrl)):
  967. attrs = ('max_length', 'min_length', 'regex')
  968. if issubclass(type_, StrictStr):
  969. def constraint_func(**kw: Any) -> Type[Any]:
  970. return type(type_.__name__, (type_,), kw)
  971. else:
  972. constraint_func = constr
  973. elif issubclass(type_, bytes):
  974. attrs = ('max_length', 'min_length', 'regex')
  975. if issubclass(type_, StrictBytes):
  976. def constraint_func(**kw: Any) -> Type[Any]:
  977. return type(type_.__name__, (type_,), kw)
  978. else:
  979. constraint_func = conbytes
  980. elif issubclass(type_, numeric_types) and not issubclass(
  981. type_,
  982. (
  983. ConstrainedInt,
  984. ConstrainedFloat,
  985. ConstrainedDecimal,
  986. ConstrainedList,
  987. ConstrainedSet,
  988. ConstrainedFrozenSet,
  989. bool,
  990. ),
  991. ):
  992. # Is numeric type
  993. attrs = ('gt', 'lt', 'ge', 'le', 'multiple_of')
  994. if issubclass(type_, float):
  995. attrs += ('allow_inf_nan',)
  996. if issubclass(type_, Decimal):
  997. attrs += ('max_digits', 'decimal_places')
  998. numeric_type = next(t for t in numeric_types if issubclass(type_, t)) # pragma: no branch
  999. constraint_func = _map_types_constraint[numeric_type]
  1000. if attrs:
  1001. used_constraints.update(set(attrs))
  1002. kwargs = {
  1003. attr_name: attr
  1004. for attr_name, attr in ((attr_name, getattr(field_info, attr_name)) for attr_name in attrs)
  1005. if attr is not None
  1006. }
  1007. if kwargs:
  1008. constraint_func = cast(Callable[..., type], constraint_func)
  1009. return constraint_func(**kwargs)
  1010. return type_
  1011. return go(annotation), used_constraints
  1012. def normalize_name(name: str) -> str:
  1013. """
  1014. Normalizes the given name. This can be applied to either a model *or* enum.
  1015. """
  1016. return re.sub(r'[^a-zA-Z0-9.\-_]', '_', name)
  1017. class SkipField(Exception):
  1018. """
  1019. Utility exception used to exclude fields from schema.
  1020. """
  1021. def __init__(self, message: str) -> None:
  1022. self.message = message