25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

337 lines
11 KiB

  1. from __future__ import annotations
  2. import sys
  3. from collections.abc import Callable, Sequence
  4. from functools import partial
  5. from inspect import getmro, isclass
  6. from typing import TYPE_CHECKING, Generic, Type, TypeVar, cast, overload
  7. if sys.version_info < (3, 13):
  8. from typing_extensions import TypeVar
  9. _BaseExceptionT_co = TypeVar(
  10. "_BaseExceptionT_co", bound=BaseException, covariant=True, default=BaseException
  11. )
  12. _BaseExceptionT = TypeVar("_BaseExceptionT", bound=BaseException)
  13. _ExceptionT_co = TypeVar(
  14. "_ExceptionT_co", bound=Exception, covariant=True, default=Exception
  15. )
  16. _ExceptionT = TypeVar("_ExceptionT", bound=Exception)
  17. # using typing.Self would require a typing_extensions dependency on py<3.11
  18. _ExceptionGroupSelf = TypeVar("_ExceptionGroupSelf", bound="ExceptionGroup")
  19. _BaseExceptionGroupSelf = TypeVar("_BaseExceptionGroupSelf", bound="BaseExceptionGroup")
  20. def check_direct_subclass(
  21. exc: BaseException, parents: tuple[type[BaseException]]
  22. ) -> bool:
  23. for cls in getmro(exc.__class__)[:-1]:
  24. if cls in parents:
  25. return True
  26. return False
  27. def get_condition_filter(
  28. condition: type[_BaseExceptionT]
  29. | tuple[type[_BaseExceptionT], ...]
  30. | Callable[[_BaseExceptionT_co], bool],
  31. ) -> Callable[[_BaseExceptionT_co], bool]:
  32. if isclass(condition) and issubclass(
  33. cast(Type[BaseException], condition), BaseException
  34. ):
  35. return partial(check_direct_subclass, parents=(condition,))
  36. elif isinstance(condition, tuple):
  37. if all(isclass(x) and issubclass(x, BaseException) for x in condition):
  38. return partial(check_direct_subclass, parents=condition)
  39. elif callable(condition):
  40. return cast("Callable[[BaseException], bool]", condition)
  41. raise TypeError("expected a function, exception type or tuple of exception types")
  42. def _derive_and_copy_attributes(self, excs):
  43. eg = self.derive(excs)
  44. eg.__cause__ = self.__cause__
  45. eg.__context__ = self.__context__
  46. eg.__traceback__ = self.__traceback__
  47. if hasattr(self, "__notes__"):
  48. # Create a new list so that add_note() only affects one exceptiongroup
  49. eg.__notes__ = list(self.__notes__)
  50. return eg
  51. class BaseExceptionGroup(BaseException, Generic[_BaseExceptionT_co]):
  52. """A combination of multiple unrelated exceptions."""
  53. def __new__(
  54. cls: type[_BaseExceptionGroupSelf],
  55. __message: str,
  56. __exceptions: Sequence[_BaseExceptionT_co],
  57. ) -> _BaseExceptionGroupSelf:
  58. if not isinstance(__message, str):
  59. raise TypeError(f"argument 1 must be str, not {type(__message)}")
  60. if not isinstance(__exceptions, Sequence):
  61. raise TypeError("second argument (exceptions) must be a sequence")
  62. if not __exceptions:
  63. raise ValueError(
  64. "second argument (exceptions) must be a non-empty sequence"
  65. )
  66. for i, exc in enumerate(__exceptions):
  67. if not isinstance(exc, BaseException):
  68. raise ValueError(
  69. f"Item {i} of second argument (exceptions) is not an exception"
  70. )
  71. if cls is BaseExceptionGroup:
  72. if all(isinstance(exc, Exception) for exc in __exceptions):
  73. cls = ExceptionGroup
  74. if issubclass(cls, Exception):
  75. for exc in __exceptions:
  76. if not isinstance(exc, Exception):
  77. if cls is ExceptionGroup:
  78. raise TypeError(
  79. "Cannot nest BaseExceptions in an ExceptionGroup"
  80. )
  81. else:
  82. raise TypeError(
  83. f"Cannot nest BaseExceptions in {cls.__name__!r}"
  84. )
  85. instance = super().__new__(cls, __message, __exceptions)
  86. instance._exceptions = tuple(__exceptions)
  87. return instance
  88. def __init__(
  89. self,
  90. __message: str,
  91. __exceptions: Sequence[_BaseExceptionT_co],
  92. *args: object,
  93. ) -> None:
  94. BaseException.__init__(self, __message, __exceptions, *args)
  95. def add_note(self, note: str) -> None:
  96. if not isinstance(note, str):
  97. raise TypeError(
  98. f"Expected a string, got note={note!r} (type {type(note).__name__})"
  99. )
  100. if not hasattr(self, "__notes__"):
  101. self.__notes__: list[str] = []
  102. self.__notes__.append(note)
  103. @property
  104. def message(self) -> str:
  105. return self.args[0]
  106. @property
  107. def exceptions(
  108. self,
  109. ) -> tuple[_BaseExceptionT_co | BaseExceptionGroup[_BaseExceptionT_co], ...]:
  110. return tuple(self._exceptions)
  111. @overload
  112. def subgroup(
  113. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  114. ) -> ExceptionGroup[_ExceptionT] | None: ...
  115. @overload
  116. def subgroup(
  117. self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
  118. ) -> BaseExceptionGroup[_BaseExceptionT] | None: ...
  119. @overload
  120. def subgroup(
  121. self,
  122. __condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  123. ) -> BaseExceptionGroup[_BaseExceptionT_co] | None: ...
  124. def subgroup(
  125. self,
  126. __condition: type[_BaseExceptionT]
  127. | tuple[type[_BaseExceptionT], ...]
  128. | Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  129. ) -> BaseExceptionGroup[_BaseExceptionT] | None:
  130. condition = get_condition_filter(__condition)
  131. modified = False
  132. if condition(self):
  133. return self
  134. exceptions: list[BaseException] = []
  135. for exc in self.exceptions:
  136. if isinstance(exc, BaseExceptionGroup):
  137. subgroup = exc.subgroup(__condition)
  138. if subgroup is not None:
  139. exceptions.append(subgroup)
  140. if subgroup is not exc:
  141. modified = True
  142. elif condition(exc):
  143. exceptions.append(exc)
  144. else:
  145. modified = True
  146. if not modified:
  147. return self
  148. elif exceptions:
  149. group = _derive_and_copy_attributes(self, exceptions)
  150. return group
  151. else:
  152. return None
  153. @overload
  154. def split(
  155. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  156. ) -> tuple[
  157. ExceptionGroup[_ExceptionT] | None,
  158. BaseExceptionGroup[_BaseExceptionT_co] | None,
  159. ]: ...
  160. @overload
  161. def split(
  162. self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
  163. ) -> tuple[
  164. BaseExceptionGroup[_BaseExceptionT] | None,
  165. BaseExceptionGroup[_BaseExceptionT_co] | None,
  166. ]: ...
  167. @overload
  168. def split(
  169. self,
  170. __condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  171. ) -> tuple[
  172. BaseExceptionGroup[_BaseExceptionT_co] | None,
  173. BaseExceptionGroup[_BaseExceptionT_co] | None,
  174. ]: ...
  175. def split(
  176. self,
  177. __condition: type[_BaseExceptionT]
  178. | tuple[type[_BaseExceptionT], ...]
  179. | Callable[[_BaseExceptionT_co], bool],
  180. ) -> (
  181. tuple[
  182. ExceptionGroup[_ExceptionT] | None,
  183. BaseExceptionGroup[_BaseExceptionT_co] | None,
  184. ]
  185. | tuple[
  186. BaseExceptionGroup[_BaseExceptionT] | None,
  187. BaseExceptionGroup[_BaseExceptionT_co] | None,
  188. ]
  189. | tuple[
  190. BaseExceptionGroup[_BaseExceptionT_co] | None,
  191. BaseExceptionGroup[_BaseExceptionT_co] | None,
  192. ]
  193. ):
  194. condition = get_condition_filter(__condition)
  195. if condition(self):
  196. return self, None
  197. matching_exceptions: list[BaseException] = []
  198. nonmatching_exceptions: list[BaseException] = []
  199. for exc in self.exceptions:
  200. if isinstance(exc, BaseExceptionGroup):
  201. matching, nonmatching = exc.split(condition)
  202. if matching is not None:
  203. matching_exceptions.append(matching)
  204. if nonmatching is not None:
  205. nonmatching_exceptions.append(nonmatching)
  206. elif condition(exc):
  207. matching_exceptions.append(exc)
  208. else:
  209. nonmatching_exceptions.append(exc)
  210. matching_group: _BaseExceptionGroupSelf | None = None
  211. if matching_exceptions:
  212. matching_group = _derive_and_copy_attributes(self, matching_exceptions)
  213. nonmatching_group: _BaseExceptionGroupSelf | None = None
  214. if nonmatching_exceptions:
  215. nonmatching_group = _derive_and_copy_attributes(
  216. self, nonmatching_exceptions
  217. )
  218. return matching_group, nonmatching_group
  219. @overload
  220. def derive(self, __excs: Sequence[_ExceptionT]) -> ExceptionGroup[_ExceptionT]: ...
  221. @overload
  222. def derive(
  223. self, __excs: Sequence[_BaseExceptionT]
  224. ) -> BaseExceptionGroup[_BaseExceptionT]: ...
  225. def derive(
  226. self, __excs: Sequence[_BaseExceptionT]
  227. ) -> BaseExceptionGroup[_BaseExceptionT]:
  228. return BaseExceptionGroup(self.message, __excs)
  229. def __str__(self) -> str:
  230. suffix = "" if len(self._exceptions) == 1 else "s"
  231. return f"{self.message} ({len(self._exceptions)} sub-exception{suffix})"
  232. def __repr__(self) -> str:
  233. return f"{self.__class__.__name__}({self.args[0]!r}, {self.args[1]!r})"
  234. class ExceptionGroup(BaseExceptionGroup[_ExceptionT_co], Exception):
  235. def __new__(
  236. cls: type[_ExceptionGroupSelf],
  237. __message: str,
  238. __exceptions: Sequence[_ExceptionT_co],
  239. ) -> _ExceptionGroupSelf:
  240. return super().__new__(cls, __message, __exceptions)
  241. if TYPE_CHECKING:
  242. @property
  243. def exceptions(
  244. self,
  245. ) -> tuple[_ExceptionT_co | ExceptionGroup[_ExceptionT_co], ...]: ...
  246. @overload # type: ignore[override]
  247. def subgroup(
  248. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  249. ) -> ExceptionGroup[_ExceptionT] | None: ...
  250. @overload
  251. def subgroup(
  252. self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
  253. ) -> ExceptionGroup[_ExceptionT_co] | None: ...
  254. def subgroup(
  255. self,
  256. __condition: type[_ExceptionT]
  257. | tuple[type[_ExceptionT], ...]
  258. | Callable[[_ExceptionT_co], bool],
  259. ) -> ExceptionGroup[_ExceptionT] | None:
  260. return super().subgroup(__condition)
  261. @overload
  262. def split(
  263. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  264. ) -> tuple[
  265. ExceptionGroup[_ExceptionT] | None, ExceptionGroup[_ExceptionT_co] | None
  266. ]: ...
  267. @overload
  268. def split(
  269. self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
  270. ) -> tuple[
  271. ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
  272. ]: ...
  273. def split(
  274. self: _ExceptionGroupSelf,
  275. __condition: type[_ExceptionT]
  276. | tuple[type[_ExceptionT], ...]
  277. | Callable[[_ExceptionT_co], bool],
  278. ) -> tuple[
  279. ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
  280. ]:
  281. return super().split(__condition)