You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

142 lines
4.5 KiB

  1. import datetime
  2. import warnings
  3. from collections import deque
  4. from decimal import Decimal
  5. from enum import Enum
  6. from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
  7. from pathlib import Path
  8. from re import Pattern
  9. from types import GeneratorType
  10. from typing import TYPE_CHECKING, Any, Callable, Union
  11. from uuid import UUID
  12. from typing_extensions import deprecated
  13. from .._internal._import_utils import import_cached_base_model
  14. from ..color import Color
  15. from ..networks import NameEmail
  16. from ..types import SecretBytes, SecretStr
  17. from ..warnings import PydanticDeprecatedSince20
  18. if not TYPE_CHECKING:
  19. # See PyCharm issues https://youtrack.jetbrains.com/issue/PY-21915
  20. # and https://youtrack.jetbrains.com/issue/PY-51428
  21. DeprecationWarning = PydanticDeprecatedSince20
  22. __all__ = 'pydantic_encoder', 'custom_pydantic_encoder', 'timedelta_isoformat'
  23. def isoformat(o: Union[datetime.date, datetime.time]) -> str:
  24. return o.isoformat()
  25. def decimal_encoder(dec_value: Decimal) -> Union[int, float]:
  26. """Encodes a Decimal as int of there's no exponent, otherwise float.
  27. This is useful when we use ConstrainedDecimal to represent Numeric(x,0)
  28. where a integer (but not int typed) is used. Encoding this as a float
  29. results in failed round-tripping between encode and parse.
  30. Our Id type is a prime example of this.
  31. >>> decimal_encoder(Decimal("1.0"))
  32. 1.0
  33. >>> decimal_encoder(Decimal("1"))
  34. 1
  35. """
  36. exponent = dec_value.as_tuple().exponent
  37. if isinstance(exponent, int) and exponent >= 0:
  38. return int(dec_value)
  39. else:
  40. return float(dec_value)
  41. ENCODERS_BY_TYPE: dict[type[Any], Callable[[Any], Any]] = {
  42. bytes: lambda o: o.decode(),
  43. Color: str,
  44. datetime.date: isoformat,
  45. datetime.datetime: isoformat,
  46. datetime.time: isoformat,
  47. datetime.timedelta: lambda td: td.total_seconds(),
  48. Decimal: decimal_encoder,
  49. Enum: lambda o: o.value,
  50. frozenset: list,
  51. deque: list,
  52. GeneratorType: list,
  53. IPv4Address: str,
  54. IPv4Interface: str,
  55. IPv4Network: str,
  56. IPv6Address: str,
  57. IPv6Interface: str,
  58. IPv6Network: str,
  59. NameEmail: str,
  60. Path: str,
  61. Pattern: lambda o: o.pattern,
  62. SecretBytes: str,
  63. SecretStr: str,
  64. set: list,
  65. UUID: str,
  66. }
  67. @deprecated(
  68. '`pydantic_encoder` is deprecated, use `pydantic_core.to_jsonable_python` instead.',
  69. category=None,
  70. )
  71. def pydantic_encoder(obj: Any) -> Any:
  72. warnings.warn(
  73. '`pydantic_encoder` is deprecated, use `pydantic_core.to_jsonable_python` instead.',
  74. category=PydanticDeprecatedSince20,
  75. stacklevel=2,
  76. )
  77. from dataclasses import asdict, is_dataclass
  78. BaseModel = import_cached_base_model()
  79. if isinstance(obj, BaseModel):
  80. return obj.model_dump()
  81. elif is_dataclass(obj):
  82. return asdict(obj) # type: ignore
  83. # Check the class type and its superclasses for a matching encoder
  84. for base in obj.__class__.__mro__[:-1]:
  85. try:
  86. encoder = ENCODERS_BY_TYPE[base]
  87. except KeyError:
  88. continue
  89. return encoder(obj)
  90. else: # We have exited the for loop without finding a suitable encoder
  91. raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")
  92. # TODO: Add a suggested migration path once there is a way to use custom encoders
  93. @deprecated(
  94. '`custom_pydantic_encoder` is deprecated, use `BaseModel.model_dump` instead.',
  95. category=None,
  96. )
  97. def custom_pydantic_encoder(type_encoders: dict[Any, Callable[[type[Any]], Any]], obj: Any) -> Any:
  98. warnings.warn(
  99. '`custom_pydantic_encoder` is deprecated, use `BaseModel.model_dump` instead.',
  100. category=PydanticDeprecatedSince20,
  101. stacklevel=2,
  102. )
  103. # Check the class type and its superclasses for a matching encoder
  104. for base in obj.__class__.__mro__[:-1]:
  105. try:
  106. encoder = type_encoders[base]
  107. except KeyError:
  108. continue
  109. return encoder(obj)
  110. else: # We have exited the for loop without finding a suitable encoder
  111. return pydantic_encoder(obj)
  112. @deprecated('`timedelta_isoformat` is deprecated.', category=None)
  113. def timedelta_isoformat(td: datetime.timedelta) -> str:
  114. """ISO 8601 encoding for Python timedelta object."""
  115. warnings.warn('`timedelta_isoformat` is deprecated.', category=PydanticDeprecatedSince20, stacklevel=2)
  116. minutes, seconds = divmod(td.seconds, 60)
  117. hours, minutes = divmod(minutes, 60)
  118. return f'{"-" if td.days < 0 else ""}P{abs(td.days)}DT{hours:d}H{minutes:d}M{seconds:d}.{td.microseconds:06d}S'