Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 

1518 рядки
55 KiB

  1. from __future__ import annotations
  2. import sys
  3. import os
  4. import warnings
  5. import glob
  6. from importlib import import_module
  7. import ruamel.yaml
  8. from ruamel.yaml.error import UnsafeLoaderWarning, YAMLError # NOQA
  9. from ruamel.yaml.tokens import * # NOQA
  10. from ruamel.yaml.events import * # NOQA
  11. from ruamel.yaml.nodes import * # NOQA
  12. from ruamel.yaml.loader import BaseLoader, SafeLoader, Loader, RoundTripLoader # NOQA
  13. from ruamel.yaml.dumper import BaseDumper, SafeDumper, Dumper, RoundTripDumper # NOQA
  14. from ruamel.yaml.compat import StringIO, BytesIO, with_metaclass, nprint, nprintf # NOQA
  15. from ruamel.yaml.resolver import VersionedResolver, Resolver # NOQA
  16. from ruamel.yaml.representer import (
  17. BaseRepresenter,
  18. SafeRepresenter,
  19. Representer,
  20. RoundTripRepresenter,
  21. )
  22. from ruamel.yaml.constructor import (
  23. BaseConstructor,
  24. SafeConstructor,
  25. Constructor,
  26. RoundTripConstructor,
  27. )
  28. from ruamel.yaml.loader import Loader as UnsafeLoader # NOQA
  29. from ruamel.yaml.comments import CommentedMap, CommentedSeq, C_PRE
  30. from ruamel.yaml.docinfo import DocInfo, version, Version
  31. if False: # MYPY
  32. from typing import List, Set, Dict, Tuple, Union, Any, Callable, Optional, Text, Type # NOQA
  33. from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA
  34. from types import TracebackType
  35. from pathlib import Path
  36. try:
  37. from _ruamel_yaml import CParser, CEmitter # type: ignore
  38. except: # NOQA
  39. CParser = CEmitter = None
  40. # import io
  41. # YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a
  42. # subset of abbreviations, which should be all caps according to PEP8
  43. class YAML:
  44. def __init__(
  45. self: Any,
  46. *,
  47. typ: Optional[Union[List[Text], Text]] = None,
  48. pure: Any = False,
  49. output: Any = None,
  50. plug_ins: Any = None,
  51. ) -> None: # input=None,
  52. """
  53. typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default)
  54. 'safe' -> SafeLoader/SafeDumper,
  55. 'unsafe' -> normal/unsafe Loader/Dumper (pending deprecation)
  56. 'full' -> full Dumper only, including python built-ins that are
  57. potentially unsafe to load
  58. 'base' -> baseloader
  59. pure: if True only use Python modules
  60. input/output: needed to work as context manager
  61. plug_ins: a list of plug-in files
  62. """
  63. self.typ = ['rt'] if typ is None else (typ if isinstance(typ, list) else [typ])
  64. self.pure = pure
  65. # self._input = input
  66. self._output = output
  67. self._context_manager: Any = None
  68. self.plug_ins: List[Any] = []
  69. for pu in ([] if plug_ins is None else plug_ins) + self.official_plug_ins():
  70. file_name = pu.replace(os.sep, '.')
  71. self.plug_ins.append(import_module(file_name))
  72. self.Resolver: Any = ruamel.yaml.resolver.VersionedResolver
  73. self.allow_unicode = True
  74. self.Reader: Any = None
  75. self.Representer: Any = None
  76. self.Constructor: Any = None
  77. self.Scanner: Any = None
  78. self.Serializer: Any = None
  79. self.default_flow_style: Any = None
  80. self.comment_handling = None
  81. typ_found = 1
  82. setup_rt = False
  83. if 'rt' in self.typ:
  84. setup_rt = True
  85. elif 'safe' in self.typ:
  86. self.Emitter = (
  87. ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
  88. )
  89. self.Representer = ruamel.yaml.representer.SafeRepresenter
  90. self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
  91. self.Composer = ruamel.yaml.composer.Composer
  92. self.Constructor = ruamel.yaml.constructor.SafeConstructor
  93. elif 'base' in self.typ:
  94. self.Emitter = ruamel.yaml.emitter.Emitter
  95. self.Representer = ruamel.yaml.representer.BaseRepresenter
  96. self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
  97. self.Composer = ruamel.yaml.composer.Composer
  98. self.Constructor = ruamel.yaml.constructor.BaseConstructor
  99. elif 'unsafe' in self.typ:
  100. warnings.warn(
  101. "\nyou should no longer specify 'unsafe'.\nFor **dumping only** use yaml=YAML(typ='full')\n", # NOQA
  102. PendingDeprecationWarning,
  103. stacklevel=2,
  104. )
  105. self.Emitter = (
  106. ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
  107. )
  108. self.Representer = ruamel.yaml.representer.Representer
  109. self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
  110. self.Composer = ruamel.yaml.composer.Composer
  111. self.Constructor = ruamel.yaml.constructor.Constructor
  112. elif 'full' in self.typ:
  113. self.Emitter = (
  114. ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
  115. )
  116. self.Representer = ruamel.yaml.representer.Representer
  117. self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
  118. # self.Composer = ruamel.yaml.composer.Composer
  119. # self.Constructor = ruamel.yaml.constructor.Constructor
  120. elif 'rtsc' in self.typ:
  121. self.default_flow_style = False
  122. # no optimized rt-dumper yet
  123. self.Emitter = ruamel.yaml.emitter.RoundTripEmitter
  124. self.Serializer = ruamel.yaml.serializer.Serializer
  125. self.Representer = ruamel.yaml.representer.RoundTripRepresenter
  126. self.Scanner = ruamel.yaml.scanner.RoundTripScannerSC
  127. # no optimized rt-parser yet
  128. self.Parser = ruamel.yaml.parser.RoundTripParserSC
  129. self.Composer = ruamel.yaml.composer.Composer
  130. self.Constructor = ruamel.yaml.constructor.RoundTripConstructor
  131. self.comment_handling = C_PRE
  132. else:
  133. setup_rt = True
  134. typ_found = 0
  135. if setup_rt:
  136. self.default_flow_style = False
  137. # no optimized rt-dumper yet
  138. self.Emitter = ruamel.yaml.emitter.RoundTripEmitter
  139. self.Serializer = ruamel.yaml.serializer.Serializer
  140. self.Representer = ruamel.yaml.representer.RoundTripRepresenter
  141. self.Scanner = ruamel.yaml.scanner.RoundTripScanner
  142. # no optimized rt-parser yet
  143. self.Parser = ruamel.yaml.parser.RoundTripParser
  144. self.Composer = ruamel.yaml.composer.Composer
  145. self.Constructor = ruamel.yaml.constructor.RoundTripConstructor
  146. del setup_rt
  147. self.stream = None
  148. self.canonical = None
  149. self.old_indent = None
  150. self.width: Union[int, None] = None
  151. self.line_break = None
  152. self.map_indent: Union[int, None] = None
  153. self.sequence_indent: Union[int, None] = None
  154. self.sequence_dash_offset: int = 0
  155. self.compact_seq_seq = None
  156. self.compact_seq_map = None
  157. self.sort_base_mapping_type_on_output = None # default: sort
  158. self.top_level_colon_align = None
  159. self.prefix_colon = None
  160. self._version: Optional[Any] = None
  161. self.preserve_quotes: Optional[bool] = None
  162. self.allow_duplicate_keys = False # duplicate keys in map, set
  163. self.encoding = 'utf-8'
  164. self.explicit_start: Union[bool, None] = None
  165. self.explicit_end: Union[bool, None] = None
  166. self._tags = None
  167. self.doc_infos: List[DocInfo] = []
  168. self.default_style = None
  169. self.top_level_block_style_scalar_no_indent_error_1_1 = False
  170. # directives end indicator with single scalar document
  171. self.scalar_after_indicator: Optional[bool] = None
  172. # [a, b: 1, c: {d: 2}] vs. [a, {b: 1}, {c: {d: 2}}]
  173. self.brace_single_entry_mapping_in_flow_sequence = False
  174. for module in self.plug_ins:
  175. if getattr(module, 'typ', None) in self.typ:
  176. typ_found += 1
  177. module.init_typ(self)
  178. break
  179. if typ_found == 0:
  180. raise NotImplementedError(
  181. f'typ "{self.typ}" not recognised (need to install plug-in?)',
  182. )
  183. @property
  184. def reader(self) -> Any:
  185. try:
  186. return self._reader # type: ignore
  187. except AttributeError:
  188. self._reader = self.Reader(None, loader=self)
  189. return self._reader
  190. @property
  191. def scanner(self) -> Any:
  192. try:
  193. return self._scanner # type: ignore
  194. except AttributeError:
  195. if self.Scanner is None:
  196. raise
  197. self._scanner = self.Scanner(loader=self)
  198. return self._scanner
  199. @property
  200. def parser(self) -> Any:
  201. attr = '_' + sys._getframe().f_code.co_name
  202. if not hasattr(self, attr):
  203. if self.Parser is not CParser:
  204. setattr(self, attr, self.Parser(loader=self))
  205. else:
  206. if getattr(self, '_stream', None) is None:
  207. # wait for the stream
  208. return None
  209. else:
  210. # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'):
  211. # # pathlib.Path() instance
  212. # setattr(self, attr, CParser(self._stream))
  213. # else:
  214. setattr(self, attr, CParser(self._stream))
  215. # self._parser = self._composer = self
  216. # nprint('scanner', self.loader.scanner)
  217. return getattr(self, attr)
  218. @property
  219. def composer(self) -> Any:
  220. attr = '_' + sys._getframe().f_code.co_name
  221. if not hasattr(self, attr):
  222. setattr(self, attr, self.Composer(loader=self))
  223. return getattr(self, attr)
  224. @property
  225. def constructor(self) -> Any:
  226. attr = '_' + sys._getframe().f_code.co_name
  227. if not hasattr(self, attr):
  228. if self.Constructor is None:
  229. if 'full' in self.typ:
  230. raise YAMLError(
  231. "\nyou can only use yaml=YAML(typ='full') for dumping\n", # NOQA
  232. )
  233. cnst = self.Constructor(preserve_quotes=self.preserve_quotes, loader=self) # type: ignore # NOQA
  234. cnst.allow_duplicate_keys = self.allow_duplicate_keys
  235. setattr(self, attr, cnst)
  236. return getattr(self, attr)
  237. @property
  238. def resolver(self) -> Any:
  239. try:
  240. rslvr = self._resolver # type: ignore
  241. except AttributeError:
  242. rslvr = None
  243. if rslvr is None or rslvr._loader_version != self.version:
  244. rslvr = self._resolver = self.Resolver(version=self.version, loader=self)
  245. return rslvr
  246. @property
  247. def emitter(self) -> Any:
  248. attr = '_' + sys._getframe().f_code.co_name
  249. if not hasattr(self, attr):
  250. if self.Emitter is not CEmitter:
  251. _emitter = self.Emitter(
  252. None,
  253. canonical=self.canonical,
  254. indent=self.old_indent,
  255. width=self.width,
  256. allow_unicode=self.allow_unicode,
  257. line_break=self.line_break,
  258. prefix_colon=self.prefix_colon,
  259. brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence, # NOQA
  260. dumper=self,
  261. )
  262. setattr(self, attr, _emitter)
  263. if self.map_indent is not None:
  264. _emitter.best_map_indent = self.map_indent
  265. if self.sequence_indent is not None:
  266. _emitter.best_sequence_indent = self.sequence_indent
  267. if self.sequence_dash_offset is not None:
  268. _emitter.sequence_dash_offset = self.sequence_dash_offset
  269. # _emitter.block_seq_indent = self.sequence_dash_offset
  270. if self.compact_seq_seq is not None:
  271. _emitter.compact_seq_seq = self.compact_seq_seq
  272. if self.compact_seq_map is not None:
  273. _emitter.compact_seq_map = self.compact_seq_map
  274. else:
  275. if getattr(self, '_stream', None) is None:
  276. # wait for the stream
  277. return None
  278. return None
  279. return getattr(self, attr)
  280. @property
  281. def serializer(self) -> Any:
  282. attr = '_' + sys._getframe().f_code.co_name
  283. if not hasattr(self, attr):
  284. setattr(
  285. self,
  286. attr,
  287. self.Serializer(
  288. encoding=self.encoding,
  289. explicit_start=self.explicit_start,
  290. explicit_end=self.explicit_end,
  291. version=self.version,
  292. tags=self.tags,
  293. dumper=self,
  294. ),
  295. )
  296. return getattr(self, attr)
  297. @property
  298. def representer(self) -> Any:
  299. attr = '_' + sys._getframe().f_code.co_name
  300. if not hasattr(self, attr):
  301. repres = self.Representer(
  302. default_style=self.default_style,
  303. default_flow_style=self.default_flow_style,
  304. dumper=self,
  305. )
  306. if self.sort_base_mapping_type_on_output is not None:
  307. repres.sort_base_mapping_type_on_output = self.sort_base_mapping_type_on_output
  308. setattr(self, attr, repres)
  309. return getattr(self, attr)
  310. def scan(self, stream: StreamTextType) -> Any:
  311. """
  312. Scan a YAML stream and produce scanning tokens.
  313. """
  314. if not hasattr(stream, 'read') and hasattr(stream, 'open'):
  315. # pathlib.Path() instance
  316. with stream.open('rb') as fp:
  317. return self.scan(fp)
  318. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  319. self.tags = {}
  320. _, parser = self.get_constructor_parser(stream)
  321. try:
  322. while self.scanner.check_token():
  323. yield self.scanner.get_token()
  324. finally:
  325. parser.dispose()
  326. for comp in ('reader', 'scanner'):
  327. try:
  328. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  329. except AttributeError:
  330. pass
  331. def parse(self, stream: StreamTextType) -> Any:
  332. """
  333. Parse a YAML stream and produce parsing events.
  334. """
  335. if not hasattr(stream, 'read') and hasattr(stream, 'open'):
  336. # pathlib.Path() instance
  337. with stream.open('rb') as fp:
  338. return self.parse(fp)
  339. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  340. self.tags = {}
  341. _, parser = self.get_constructor_parser(stream)
  342. try:
  343. while parser.check_event():
  344. yield parser.get_event()
  345. finally:
  346. parser.dispose()
  347. for comp in ('reader', 'scanner'):
  348. try:
  349. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  350. except AttributeError:
  351. pass
  352. def compose(self, stream: Union[Path, StreamTextType]) -> Any:
  353. """
  354. Parse the first YAML document in a stream
  355. and produce the corresponding representation tree.
  356. """
  357. if not hasattr(stream, 'read') and hasattr(stream, 'open'):
  358. # pathlib.Path() instance
  359. with stream.open('rb') as fp:
  360. return self.compose(fp)
  361. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  362. self.tags = {}
  363. constructor, parser = self.get_constructor_parser(stream)
  364. try:
  365. return constructor.composer.get_single_node()
  366. finally:
  367. parser.dispose()
  368. for comp in ('reader', 'scanner'):
  369. try:
  370. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  371. except AttributeError:
  372. pass
  373. def compose_all(self, stream: Union[Path, StreamTextType]) -> Any:
  374. """
  375. Parse all YAML documents in a stream
  376. and produce corresponding representation trees.
  377. """
  378. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  379. self.tags = {}
  380. constructor, parser = self.get_constructor_parser(stream)
  381. try:
  382. while constructor.composer.check_node():
  383. yield constructor.composer.get_node()
  384. finally:
  385. parser.dispose()
  386. for comp in ('reader', 'scanner'):
  387. try:
  388. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  389. except AttributeError:
  390. pass
  391. # separate output resolver?
  392. # def load(self, stream=None):
  393. # if self._context_manager:
  394. # if not self._input:
  395. # raise TypeError("Missing input stream while dumping from context manager")
  396. # for data in self._context_manager.load():
  397. # yield data
  398. # return
  399. # if stream is None:
  400. # raise TypeError("Need a stream argument when not loading from context manager")
  401. # return self.load_one(stream)
  402. def load(self, stream: Union[Path, StreamTextType]) -> Any:
  403. """
  404. at this point you either have the non-pure Parser (which has its own reader and
  405. scanner) or you have the pure Parser.
  406. If the pure Parser is set, then set the Reader and Scanner, if not already set.
  407. If either the Scanner or Reader are set, you cannot use the non-pure Parser,
  408. so reset it to the pure parser and set the Reader resp. Scanner if necessary
  409. """
  410. if not hasattr(stream, 'read') and hasattr(stream, 'open'):
  411. # pathlib.Path() instance
  412. with stream.open('rb') as fp:
  413. return self.load(fp)
  414. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  415. self.tags = {}
  416. constructor, parser = self.get_constructor_parser(stream)
  417. try:
  418. return constructor.get_single_data()
  419. finally:
  420. parser.dispose()
  421. for comp in ('reader', 'scanner'):
  422. try:
  423. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  424. except AttributeError:
  425. pass
  426. def load_all(self, stream: Union[Path, StreamTextType]) -> Any: # *, skip=None):
  427. if not hasattr(stream, 'read') and hasattr(stream, 'open'):
  428. # pathlib.Path() instance
  429. with stream.open('r') as fp:
  430. for d in self.load_all(fp):
  431. yield d
  432. return
  433. # if skip is None:
  434. # skip = []
  435. # elif isinstance(skip, int):
  436. # skip = [skip]
  437. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  438. self.tags = {}
  439. constructor, parser = self.get_constructor_parser(stream)
  440. try:
  441. while constructor.check_data():
  442. yield constructor.get_data()
  443. self.doc_infos.append(DocInfo(requested_version=version(self.version)))
  444. finally:
  445. parser.dispose()
  446. for comp in ('reader', 'scanner'):
  447. try:
  448. getattr(getattr(self, '_' + comp), f'reset_{comp}')()
  449. except AttributeError:
  450. pass
  451. def get_constructor_parser(self, stream: StreamTextType) -> Any:
  452. """
  453. the old cyaml needs special setup, and therefore the stream
  454. """
  455. if self.Constructor is None:
  456. if 'full' in self.typ:
  457. raise YAMLError(
  458. "\nyou can only use yaml=YAML(typ='full') for dumping\n", # NOQA
  459. )
  460. if self.Parser is not CParser:
  461. if self.Reader is None:
  462. self.Reader = ruamel.yaml.reader.Reader
  463. if self.Scanner is None:
  464. self.Scanner = ruamel.yaml.scanner.Scanner
  465. self.reader.stream = stream
  466. else:
  467. if self.Reader is not None:
  468. if self.Scanner is None:
  469. self.Scanner = ruamel.yaml.scanner.Scanner
  470. self.Parser = ruamel.yaml.parser.Parser
  471. self.reader.stream = stream
  472. elif self.Scanner is not None:
  473. if self.Reader is None:
  474. self.Reader = ruamel.yaml.reader.Reader
  475. self.Parser = ruamel.yaml.parser.Parser
  476. self.reader.stream = stream
  477. else:
  478. # combined C level reader>scanner>parser
  479. # does some calls to the resolver, e.g. BaseResolver.descend_resolver
  480. # if you just initialise the CParser, too much of resolver.py
  481. # is actually used
  482. rslvr = self.Resolver
  483. # if rslvr is ruamel.yaml.resolver.VersionedResolver:
  484. # rslvr = ruamel.yaml.resolver.Resolver
  485. class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore
  486. def __init__(
  487. selfx,
  488. stream: StreamTextType,
  489. version: Optional[VersionType] = self.version,
  490. preserve_quotes: Optional[bool] = None,
  491. ) -> None:
  492. # NOQA
  493. CParser.__init__(selfx, stream)
  494. selfx._parser = selfx._composer = selfx
  495. self.Constructor.__init__(selfx, loader=selfx)
  496. selfx.allow_duplicate_keys = self.allow_duplicate_keys
  497. rslvr.__init__(selfx, version=version, loadumper=selfx)
  498. self._stream = stream
  499. loader = XLoader(stream)
  500. self._scanner = loader
  501. return loader, loader
  502. return self.constructor, self.parser
  503. def emit(self, events: Any, stream: Any) -> None:
  504. """
  505. Emit YAML parsing events into a stream.
  506. If stream is None, return the produced string instead.
  507. """
  508. _, _, emitter = self.get_serializer_representer_emitter(stream, None)
  509. try:
  510. for event in events:
  511. emitter.emit(event)
  512. finally:
  513. try:
  514. emitter.dispose()
  515. except AttributeError:
  516. raise
  517. def serialize(self, node: Any, stream: Optional[StreamType]) -> Any:
  518. """
  519. Serialize a representation tree into a YAML stream.
  520. If stream is None, return the produced string instead.
  521. """
  522. self.serialize_all([node], stream)
  523. def serialize_all(self, nodes: Any, stream: Optional[StreamType]) -> Any:
  524. """
  525. Serialize a sequence of representation trees into a YAML stream.
  526. If stream is None, return the produced string instead.
  527. """
  528. serializer, _, emitter = self.get_serializer_representer_emitter(stream, None)
  529. try:
  530. serializer.open()
  531. for node in nodes:
  532. serializer.serialize(node)
  533. serializer.close()
  534. finally:
  535. try:
  536. emitter.dispose()
  537. except AttributeError:
  538. raise
  539. def dump(
  540. self: Any, data: Union[Path, StreamType], stream: Any = None, *, transform: Any = None,
  541. ) -> Any:
  542. if self._context_manager:
  543. if not self._output:
  544. raise TypeError('Missing output stream while dumping from context manager')
  545. if transform is not None:
  546. x = self.__class__.__name__
  547. raise TypeError(
  548. f'{x}.dump() in the context manager cannot have transform keyword',
  549. )
  550. self._context_manager.dump(data)
  551. else: # old style
  552. if stream is None:
  553. raise TypeError('Need a stream argument when not dumping from context manager')
  554. return self.dump_all([data], stream, transform=transform)
  555. def dump_all(
  556. self, documents: Any, stream: Union[Path, StreamType], *, transform: Any = None,
  557. ) -> Any:
  558. if self._context_manager:
  559. raise NotImplementedError
  560. self._output = stream
  561. self._context_manager = YAMLContextManager(self, transform=transform)
  562. for data in documents:
  563. self._context_manager.dump(data)
  564. self._context_manager.teardown_output()
  565. self._output = None
  566. self._context_manager = None
  567. def Xdump_all(self, documents: Any, stream: Any, *, transform: Any = None) -> Any:
  568. """
  569. Serialize a sequence of Python objects into a YAML stream.
  570. """
  571. if not hasattr(stream, 'write') and hasattr(stream, 'open'):
  572. # pathlib.Path() instance
  573. with stream.open('w') as fp:
  574. return self.dump_all(documents, fp, transform=transform)
  575. # The stream should have the methods `write` and possibly `flush`.
  576. if self.top_level_colon_align is True:
  577. tlca: Any = max([len(str(x)) for x in documents[0]])
  578. else:
  579. tlca = self.top_level_colon_align
  580. if transform is not None:
  581. fstream = stream
  582. if self.encoding is None:
  583. stream = StringIO()
  584. else:
  585. stream = BytesIO()
  586. serializer, representer, emitter = self.get_serializer_representer_emitter(
  587. stream, tlca,
  588. )
  589. try:
  590. self.serializer.open()
  591. for data in documents:
  592. try:
  593. self.representer.represent(data)
  594. except AttributeError:
  595. # nprint(dir(dumper._representer))
  596. raise
  597. self.serializer.close()
  598. finally:
  599. try:
  600. self.emitter.dispose()
  601. except AttributeError:
  602. raise
  603. # self.dumper.dispose() # cyaml
  604. delattr(self, '_serializer')
  605. delattr(self, '_emitter')
  606. if transform:
  607. val = stream.getvalue()
  608. if self.encoding:
  609. val = val.decode(self.encoding)
  610. if fstream is None:
  611. transform(val)
  612. else:
  613. fstream.write(transform(val))
  614. return None
  615. def get_serializer_representer_emitter(self, stream: StreamType, tlca: Any) -> Any:
  616. # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler
  617. if self.Emitter is not CEmitter:
  618. if self.Serializer is None:
  619. self.Serializer = ruamel.yaml.serializer.Serializer
  620. self.emitter.stream = stream
  621. self.emitter.top_level_colon_align = tlca
  622. if self.scalar_after_indicator is not None:
  623. self.emitter.scalar_after_indicator = self.scalar_after_indicator
  624. return self.serializer, self.representer, self.emitter
  625. if self.Serializer is not None:
  626. # cannot set serializer with CEmitter
  627. self.Emitter = ruamel.yaml.emitter.Emitter
  628. self.emitter.stream = stream
  629. self.emitter.top_level_colon_align = tlca
  630. if self.scalar_after_indicator is not None:
  631. self.emitter.scalar_after_indicator = self.scalar_after_indicator
  632. return self.serializer, self.representer, self.emitter
  633. # C routines
  634. rslvr = (
  635. ruamel.yaml.resolver.BaseResolver
  636. if 'base' in self.typ
  637. else ruamel.yaml.resolver.Resolver
  638. )
  639. class XDumper(CEmitter, self.Representer, rslvr): # type: ignore
  640. def __init__(
  641. selfx: StreamType,
  642. stream: Any,
  643. default_style: Any = None,
  644. default_flow_style: Any = None,
  645. canonical: Optional[bool] = None,
  646. indent: Optional[int] = None,
  647. width: Optional[int] = None,
  648. allow_unicode: Optional[bool] = None,
  649. line_break: Any = None,
  650. encoding: Any = None,
  651. explicit_start: Optional[bool] = None,
  652. explicit_end: Optional[bool] = None,
  653. version: Any = None,
  654. tags: Any = None,
  655. block_seq_indent: Any = None,
  656. top_level_colon_align: Any = None,
  657. prefix_colon: Any = None,
  658. ) -> None:
  659. # NOQA
  660. CEmitter.__init__(
  661. selfx,
  662. stream,
  663. canonical=canonical,
  664. indent=indent,
  665. width=width,
  666. encoding=encoding,
  667. allow_unicode=allow_unicode,
  668. line_break=line_break,
  669. explicit_start=explicit_start,
  670. explicit_end=explicit_end,
  671. version=version,
  672. tags=tags,
  673. )
  674. selfx._emitter = selfx._serializer = selfx._representer = selfx
  675. self.Representer.__init__(
  676. selfx, default_style=default_style, default_flow_style=default_flow_style,
  677. )
  678. rslvr.__init__(selfx)
  679. self._stream = stream
  680. dumper = XDumper(
  681. stream,
  682. default_style=self.default_style,
  683. default_flow_style=self.default_flow_style,
  684. canonical=self.canonical,
  685. indent=self.old_indent,
  686. width=self.width,
  687. allow_unicode=self.allow_unicode,
  688. line_break=self.line_break,
  689. encoding=self.encoding,
  690. explicit_start=self.explicit_start,
  691. explicit_end=self.explicit_end,
  692. version=self.version,
  693. tags=self.tags,
  694. )
  695. self._emitter = self._serializer = dumper
  696. return dumper, dumper, dumper
  697. # basic types
  698. def map(self, **kw: Any) -> Any:
  699. if 'rt' in self.typ:
  700. return CommentedMap(**kw)
  701. else:
  702. return dict(**kw)
  703. def seq(self, *args: Any) -> Any:
  704. if 'rt' in self.typ:
  705. return CommentedSeq(*args)
  706. else:
  707. return list(*args)
  708. # helpers
  709. def official_plug_ins(self) -> Any:
  710. """search for list of subdirs that are plug-ins, if __file__ is not available, e.g.
  711. single file installers that are not properly emulating a file-system (issue 324)
  712. no plug-ins will be found. If any are packaged, you know which file that are
  713. and you can explicitly provide it during instantiation:
  714. yaml = ruamel.yaml.YAML(plug_ins=['ruamel/yaml/jinja2/__plug_in__'])
  715. """
  716. try:
  717. bd = os.path.dirname(__file__)
  718. except NameError:
  719. return []
  720. gpbd = os.path.dirname(os.path.dirname(bd))
  721. res = [x.replace(gpbd, "")[1:-3] for x in glob.glob(bd + '/*/__plug_in__.py')]
  722. return res
  723. def register_class(self, cls: Any) -> Any:
  724. """
  725. register a class for dumping/loading
  726. - if it has attribute yaml_tag use that to register, else use class name
  727. - if it has methods to_yaml/from_yaml use those to dump/load else dump attributes
  728. as mapping
  729. """
  730. tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
  731. try:
  732. self.representer.add_representer(cls, cls.to_yaml)
  733. except AttributeError:
  734. def t_y(representer: Any, data: Any) -> Any:
  735. return representer.represent_yaml_object(
  736. tag, data, cls, flow_style=representer.default_flow_style,
  737. )
  738. self.representer.add_representer(cls, t_y)
  739. try:
  740. self.constructor.add_constructor(tag, cls.from_yaml)
  741. except AttributeError:
  742. def f_y(constructor: Any, node: Any) -> Any:
  743. return constructor.construct_yaml_object(node, cls)
  744. self.constructor.add_constructor(tag, f_y)
  745. return cls
  746. # ### context manager
  747. def __enter__(self) -> Any:
  748. self._context_manager = YAMLContextManager(self)
  749. return self
  750. def __exit__(
  751. self,
  752. typ: Optional[Type[BaseException]],
  753. value: Optional[BaseException],
  754. traceback: Optional[TracebackType],
  755. ) -> None:
  756. if typ:
  757. nprint('typ', typ)
  758. self._context_manager.teardown_output()
  759. # self._context_manager.teardown_input()
  760. self._context_manager = None
  761. # ### backwards compatibility
  762. def _indent(self, mapping: Any = None, sequence: Any = None, offset: Any = None) -> None:
  763. if mapping is not None:
  764. self.map_indent = mapping
  765. if sequence is not None:
  766. self.sequence_indent = sequence
  767. if offset is not None:
  768. self.sequence_dash_offset = offset
  769. @property
  770. def version(self) -> Optional[Tuple[int, int]]:
  771. return self._version
  772. @version.setter
  773. def version(self, val: VersionType) -> None:
  774. if val is None:
  775. self._version = val
  776. return
  777. elif isinstance(val, str):
  778. sval = tuple(int(x) for x in val.split('.'))
  779. elif isinstance(val, (list, tuple)):
  780. sval = tuple(int(x) for x in val)
  781. elif isinstance(val, Version):
  782. sval = (val.major, val.minor)
  783. else:
  784. raise TypeError(f'unknown version type {type(val)}')
  785. assert len(sval) == 2, f'version can only have major.minor, got {val}'
  786. assert sval[0] == 1, f'version major part can only be 1, got {val}'
  787. assert sval[1] in [1, 2], f'version minor part can only be 2 or 1, got {val}'
  788. self._version = sval
  789. @property
  790. def tags(self) -> Any:
  791. return self._tags
  792. @tags.setter
  793. def tags(self, val: Any) -> None:
  794. self._tags = val
  795. @property
  796. def indent(self) -> Any:
  797. return self._indent
  798. @indent.setter
  799. def indent(self, val: Any) -> None:
  800. self.old_indent = val
  801. @property
  802. def block_seq_indent(self) -> Any:
  803. return self.sequence_dash_offset
  804. @block_seq_indent.setter
  805. def block_seq_indent(self, val: Any) -> None:
  806. self.sequence_dash_offset = val
  807. def compact(self, seq_seq: Any = None, seq_map: Any = None) -> None:
  808. self.compact_seq_seq = seq_seq
  809. self.compact_seq_map = seq_map
  810. class YAMLContextManager:
  811. def __init__(self, yaml: Any, transform: Any = None) -> None:
  812. # used to be: (Any, Optional[Callable]) -> None
  813. self._yaml = yaml
  814. self._output_inited = False
  815. self._output_path = None
  816. self._output = self._yaml._output
  817. self._transform = transform
  818. # self._input_inited = False
  819. # self._input = input
  820. # self._input_path = None
  821. # self._transform = yaml.transform
  822. # self._fstream = None
  823. if not hasattr(self._output, 'write') and hasattr(self._output, 'open'):
  824. # pathlib.Path() instance, open with the same mode
  825. self._output_path = self._output
  826. self._output = self._output_path.open('w')
  827. # if not hasattr(self._stream, 'write') and hasattr(stream, 'open'):
  828. # if not hasattr(self._input, 'read') and hasattr(self._input, 'open'):
  829. # # pathlib.Path() instance, open with the same mode
  830. # self._input_path = self._input
  831. # self._input = self._input_path.open('r')
  832. if self._transform is not None:
  833. self._fstream = self._output
  834. if self._yaml.encoding is None:
  835. self._output = StringIO()
  836. else:
  837. self._output = BytesIO()
  838. def teardown_output(self) -> None:
  839. if self._output_inited:
  840. self._yaml.serializer.close()
  841. else:
  842. return
  843. try:
  844. self._yaml.emitter.dispose()
  845. except AttributeError:
  846. raise
  847. # self.dumper.dispose() # cyaml
  848. try:
  849. delattr(self._yaml, '_serializer')
  850. delattr(self._yaml, '_emitter')
  851. except AttributeError:
  852. raise
  853. if self._transform:
  854. val = self._output.getvalue()
  855. if self._yaml.encoding:
  856. val = val.decode(self._yaml.encoding)
  857. if self._fstream is None:
  858. self._transform(val)
  859. else:
  860. self._fstream.write(self._transform(val))
  861. self._fstream.flush()
  862. self._output = self._fstream # maybe not necessary
  863. if self._output_path is not None:
  864. self._output.close()
  865. def init_output(self, first_data: Any) -> None:
  866. if self._yaml.top_level_colon_align is True:
  867. tlca: Any = max([len(str(x)) for x in first_data])
  868. else:
  869. tlca = self._yaml.top_level_colon_align
  870. self._yaml.get_serializer_representer_emitter(self._output, tlca)
  871. self._yaml.serializer.open()
  872. self._output_inited = True
  873. def dump(self, data: Any) -> None:
  874. if not self._output_inited:
  875. self.init_output(data)
  876. try:
  877. self._yaml.representer.represent(data)
  878. except AttributeError:
  879. # nprint(dir(dumper._representer))
  880. raise
  881. # def teardown_input(self):
  882. # pass
  883. #
  884. # def init_input(self):
  885. # # set the constructor and parser on YAML() instance
  886. # self._yaml.get_constructor_parser(stream)
  887. #
  888. # def load(self):
  889. # if not self._input_inited:
  890. # self.init_input()
  891. # try:
  892. # while self._yaml.constructor.check_data():
  893. # yield self._yaml.constructor.get_data()
  894. # finally:
  895. # parser.dispose()
  896. # try:
  897. # self._reader.reset_reader() # type: ignore
  898. # except AttributeError:
  899. # pass
  900. # try:
  901. # self._scanner.reset_scanner() # type: ignore
  902. # except AttributeError:
  903. # pass
  904. def yaml_object(yml: Any) -> Any:
  905. """ decorator for classes that needs to dump/load objects
  906. The tag for such objects is taken from the class attribute yaml_tag (or the
  907. class name in lowercase in case unavailable)
  908. If methods to_yaml and/or from_yaml are available, these are called for dumping resp.
  909. loading, default routines (dumping a mapping of the attributes) used otherwise.
  910. """
  911. def yo_deco(cls: Any) -> Any:
  912. tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
  913. try:
  914. yml.representer.add_representer(cls, cls.to_yaml)
  915. except AttributeError:
  916. def t_y(representer: Any, data: Any) -> Any:
  917. return representer.represent_yaml_object(
  918. tag, data, cls, flow_style=representer.default_flow_style,
  919. )
  920. yml.representer.add_representer(cls, t_y)
  921. try:
  922. yml.constructor.add_constructor(tag, cls.from_yaml)
  923. except AttributeError:
  924. def f_y(constructor: Any, node: Any) -> Any:
  925. return constructor.construct_yaml_object(node, cls)
  926. yml.constructor.add_constructor(tag, f_y)
  927. return cls
  928. return yo_deco
  929. ########################################################################################
  930. def warn_deprecation(fun: Any, method: Any, arg: str = '') -> None:
  931. warnings.warn(
  932. f'\n{fun} will be removed, use\n\n yaml=YAML({arg})\n yaml.{method}(...)\n\ninstead', # NOQA
  933. PendingDeprecationWarning, # this will show when testing with pytest/tox
  934. stacklevel=3,
  935. )
  936. def error_deprecation(fun: Any, method: Any, arg: str = '', comment: str = 'instead of') -> None: # NOQA
  937. import inspect
  938. s = f'\n"{fun}()" has been removed, use\n\n yaml = YAML({arg})\n yaml.{method}(...)\n\n{comment}' # NOQA
  939. try:
  940. info = inspect.getframeinfo(inspect.stack()[2][0])
  941. context = '' if info.code_context is None else "".join(info.code_context)
  942. s += f' file "{info.filename}", line {info.lineno}\n\n{context}'
  943. except Exception as e:
  944. _ = e
  945. s += '\n'
  946. if sys.version_info < (3, 10):
  947. raise AttributeError(s)
  948. else:
  949. raise AttributeError(s, name=None)
  950. _error_dep_arg = "typ='rt'"
  951. _error_dep_comment = "and register any classes that you use, or check the tag attribute on the loaded data,\ninstead of" # NOQA
  952. ########################################################################################
  953. def scan(stream: StreamTextType, Loader: Any = Loader) -> Any:
  954. """
  955. Scan a YAML stream and produce scanning tokens.
  956. """
  957. error_deprecation('scan', 'scan', arg=_error_dep_arg, comment=_error_dep_comment)
  958. def parse(stream: StreamTextType, Loader: Any = Loader) -> Any:
  959. """
  960. Parse a YAML stream and produce parsing events.
  961. """
  962. error_deprecation('parse', 'parse', arg=_error_dep_arg, comment=_error_dep_comment)
  963. def compose(stream: StreamTextType, Loader: Any = Loader) -> Any:
  964. """
  965. Parse the first YAML document in a stream
  966. and produce the corresponding representation tree.
  967. """
  968. error_deprecation('compose', 'compose', arg=_error_dep_arg, comment=_error_dep_comment)
  969. def compose_all(stream: StreamTextType, Loader: Any = Loader) -> Any:
  970. """
  971. Parse all YAML documents in a stream
  972. and produce corresponding representation trees.
  973. """
  974. error_deprecation('compose', 'compose', arg=_error_dep_arg, comment=_error_dep_comment)
  975. def load(
  976. stream: Any, Loader: Any = None, version: Any = None, preserve_quotes: Any = None,
  977. ) -> Any:
  978. """
  979. Parse the first YAML document in a stream
  980. and produce the corresponding Python object.
  981. """
  982. error_deprecation('load', 'load', arg=_error_dep_arg, comment=_error_dep_comment)
  983. def load_all(
  984. stream: Any, Loader: Any = None, version: Any = None, preserve_quotes: Any = None,
  985. ) -> Any:
  986. # NOQA
  987. """
  988. Parse all YAML documents in a stream
  989. and produce corresponding Python objects.
  990. """
  991. error_deprecation('load_all', 'load_all', arg=_error_dep_arg, comment=_error_dep_comment)
  992. def safe_load(stream: StreamTextType, version: Optional[VersionType] = None) -> Any:
  993. """
  994. Parse the first YAML document in a stream
  995. and produce the corresponding Python object.
  996. Resolve only basic YAML tags.
  997. """
  998. error_deprecation('safe_load', 'load', arg="typ='safe', pure=True")
  999. def safe_load_all(stream: StreamTextType, version: Optional[VersionType] = None) -> Any:
  1000. """
  1001. Parse all YAML documents in a stream
  1002. and produce corresponding Python objects.
  1003. Resolve only basic YAML tags.
  1004. """
  1005. error_deprecation('safe_load_all', 'load_all', arg="typ='safe', pure=True")
  1006. def round_trip_load(
  1007. stream: StreamTextType,
  1008. version: Optional[VersionType] = None,
  1009. preserve_quotes: Optional[bool] = None,
  1010. ) -> Any:
  1011. """
  1012. Parse the first YAML document in a stream
  1013. and produce the corresponding Python object.
  1014. Resolve only basic YAML tags.
  1015. """
  1016. error_deprecation('round_trip_load_all', 'load')
  1017. def round_trip_load_all(
  1018. stream: StreamTextType,
  1019. version: Optional[VersionType] = None,
  1020. preserve_quotes: Optional[bool] = None,
  1021. ) -> Any:
  1022. """
  1023. Parse all YAML documents in a stream
  1024. and produce corresponding Python objects.
  1025. Resolve only basic YAML tags.
  1026. """
  1027. error_deprecation('round_trip_load_all', 'load_all')
  1028. def emit(
  1029. events: Any,
  1030. stream: Optional[StreamType] = None,
  1031. Dumper: Any = Dumper,
  1032. canonical: Optional[bool] = None,
  1033. indent: Union[int, None] = None,
  1034. width: Optional[int] = None,
  1035. allow_unicode: Optional[bool] = None,
  1036. line_break: Any = None,
  1037. ) -> Any:
  1038. # NOQA
  1039. """
  1040. Emit YAML parsing events into a stream.
  1041. If stream is None, return the produced string instead.
  1042. """
  1043. error_deprecation('emit', 'emit', arg="typ='safe', pure=True")
  1044. enc = None
  1045. def serialize_all(
  1046. nodes: Any,
  1047. stream: Optional[StreamType] = None,
  1048. Dumper: Any = Dumper,
  1049. canonical: Any = None,
  1050. indent: Optional[int] = None,
  1051. width: Optional[int] = None,
  1052. allow_unicode: Optional[bool] = None,
  1053. line_break: Any = None,
  1054. encoding: Any = enc,
  1055. explicit_start: Optional[bool] = None,
  1056. explicit_end: Optional[bool] = None,
  1057. version: Optional[VersionType] = None,
  1058. tags: Any = None,
  1059. ) -> Any:
  1060. # NOQA
  1061. """
  1062. Serialize a sequence of representation trees into a YAML stream.
  1063. If stream is None, return the produced string instead.
  1064. """
  1065. error_deprecation('serialize_all', 'serialize_all', arg="typ='safe', pure=True")
  1066. def serialize(
  1067. node: Any, stream: Optional[StreamType] = None, Dumper: Any = Dumper, **kwds: Any,
  1068. ) -> Any:
  1069. """
  1070. Serialize a representation tree into a YAML stream.
  1071. If stream is None, return the produced string instead.
  1072. """
  1073. error_deprecation('serialize', 'serialize', arg="typ='safe', pure=True")
  1074. def dump_all(
  1075. documents: Any,
  1076. stream: Optional[StreamType] = None,
  1077. Dumper: Any = Dumper,
  1078. default_style: Any = None,
  1079. default_flow_style: Any = None,
  1080. canonical: Optional[bool] = None,
  1081. indent: Optional[int] = None,
  1082. width: Optional[int] = None,
  1083. allow_unicode: Optional[bool] = None,
  1084. line_break: Any = None,
  1085. encoding: Any = enc,
  1086. explicit_start: Optional[bool] = None,
  1087. explicit_end: Optional[bool] = None,
  1088. version: Any = None,
  1089. tags: Any = None,
  1090. block_seq_indent: Any = None,
  1091. top_level_colon_align: Any = None,
  1092. prefix_colon: Any = None,
  1093. ) -> Any:
  1094. # NOQA
  1095. """
  1096. Serialize a sequence of Python objects into a YAML stream.
  1097. If stream is None, return the produced string instead.
  1098. """
  1099. error_deprecation('dump_all', 'dump_all', arg="typ='unsafe', pure=True")
  1100. def dump(
  1101. data: Any,
  1102. stream: Optional[StreamType] = None,
  1103. Dumper: Any = Dumper,
  1104. default_style: Any = None,
  1105. default_flow_style: Any = None,
  1106. canonical: Optional[bool] = None,
  1107. indent: Optional[int] = None,
  1108. width: Optional[int] = None,
  1109. allow_unicode: Optional[bool] = None,
  1110. line_break: Any = None,
  1111. encoding: Any = enc,
  1112. explicit_start: Optional[bool] = None,
  1113. explicit_end: Optional[bool] = None,
  1114. version: Optional[VersionType] = None,
  1115. tags: Any = None,
  1116. block_seq_indent: Any = None,
  1117. ) -> Any:
  1118. # NOQA
  1119. """
  1120. Serialize a Python object into a YAML stream.
  1121. If stream is None, return the produced string instead.
  1122. default_style ∈ None, '', '"', "'", '|', '>'
  1123. """
  1124. error_deprecation('dump', 'dump', arg="typ='unsafe', pure=True")
  1125. def safe_dump(data: Any, stream: Optional[StreamType] = None, **kwds: Any) -> Any:
  1126. """
  1127. Serialize a Python object into a YAML stream.
  1128. Produce only basic YAML tags.
  1129. If stream is None, return the produced string instead.
  1130. """
  1131. error_deprecation('safe_dump', 'dump', arg="typ='safe', pure=True")
  1132. def round_trip_dump(
  1133. data: Any,
  1134. stream: Optional[StreamType] = None,
  1135. Dumper: Any = RoundTripDumper,
  1136. default_style: Any = None,
  1137. default_flow_style: Any = None,
  1138. canonical: Optional[bool] = None,
  1139. indent: Optional[int] = None,
  1140. width: Optional[int] = None,
  1141. allow_unicode: Optional[bool] = None,
  1142. line_break: Any = None,
  1143. encoding: Any = enc,
  1144. explicit_start: Optional[bool] = None,
  1145. explicit_end: Optional[bool] = None,
  1146. version: Optional[VersionType] = None,
  1147. tags: Any = None,
  1148. block_seq_indent: Any = None,
  1149. top_level_colon_align: Any = None,
  1150. prefix_colon: Any = None,
  1151. ) -> Any:
  1152. allow_unicode = True if allow_unicode is None else allow_unicode
  1153. error_deprecation('round_trip_dump', 'dump')
  1154. # Loader/Dumper are no longer composites, to get to the associated
  1155. # Resolver()/Representer(), etc., you need to instantiate the class
  1156. def add_implicit_resolver(
  1157. tag: Any,
  1158. regexp: Any,
  1159. first: Any = None,
  1160. Loader: Any = None,
  1161. Dumper: Any = None,
  1162. resolver: Any = Resolver,
  1163. ) -> None:
  1164. """
  1165. Add an implicit scalar detector.
  1166. If an implicit scalar value matches the given regexp,
  1167. the corresponding tag is assigned to the scalar.
  1168. first is a sequence of possible initial characters or None.
  1169. """
  1170. if Loader is None and Dumper is None:
  1171. resolver.add_implicit_resolver(tag, regexp, first)
  1172. return
  1173. if Loader:
  1174. if hasattr(Loader, 'add_implicit_resolver'):
  1175. Loader.add_implicit_resolver(tag, regexp, first)
  1176. elif issubclass(
  1177. Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader),
  1178. ):
  1179. Resolver.add_implicit_resolver(tag, regexp, first)
  1180. else:
  1181. raise NotImplementedError
  1182. if Dumper:
  1183. if hasattr(Dumper, 'add_implicit_resolver'):
  1184. Dumper.add_implicit_resolver(tag, regexp, first)
  1185. elif issubclass(
  1186. Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper),
  1187. ):
  1188. Resolver.add_implicit_resolver(tag, regexp, first)
  1189. else:
  1190. raise NotImplementedError
  1191. # this code currently not tested
  1192. def add_path_resolver(
  1193. tag: Any,
  1194. path: Any,
  1195. kind: Any = None,
  1196. Loader: Any = None,
  1197. Dumper: Any = None,
  1198. resolver: Any = Resolver,
  1199. ) -> None:
  1200. """
  1201. Add a path based resolver for the given tag.
  1202. A path is a list of keys that forms a path
  1203. to a node in the representation tree.
  1204. Keys can be string values, integers, or None.
  1205. """
  1206. if Loader is None and Dumper is None:
  1207. resolver.add_path_resolver(tag, path, kind)
  1208. return
  1209. if Loader:
  1210. if hasattr(Loader, 'add_path_resolver'):
  1211. Loader.add_path_resolver(tag, path, kind)
  1212. elif issubclass(
  1213. Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader),
  1214. ):
  1215. Resolver.add_path_resolver(tag, path, kind)
  1216. else:
  1217. raise NotImplementedError
  1218. if Dumper:
  1219. if hasattr(Dumper, 'add_path_resolver'):
  1220. Dumper.add_path_resolver(tag, path, kind)
  1221. elif issubclass(
  1222. Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper),
  1223. ):
  1224. Resolver.add_path_resolver(tag, path, kind)
  1225. else:
  1226. raise NotImplementedError
  1227. def add_constructor(
  1228. tag: Any, object_constructor: Any, Loader: Any = None, constructor: Any = Constructor,
  1229. ) -> None:
  1230. """
  1231. Add an object constructor for the given tag.
  1232. object_onstructor is a function that accepts a Loader instance
  1233. and a node object and produces the corresponding Python object.
  1234. """
  1235. if Loader is None:
  1236. constructor.add_constructor(tag, object_constructor)
  1237. else:
  1238. if hasattr(Loader, 'add_constructor'):
  1239. Loader.add_constructor(tag, object_constructor)
  1240. return
  1241. if issubclass(Loader, BaseLoader):
  1242. BaseConstructor.add_constructor(tag, object_constructor)
  1243. elif issubclass(Loader, SafeLoader):
  1244. SafeConstructor.add_constructor(tag, object_constructor)
  1245. elif issubclass(Loader, Loader):
  1246. Constructor.add_constructor(tag, object_constructor)
  1247. elif issubclass(Loader, RoundTripLoader):
  1248. RoundTripConstructor.add_constructor(tag, object_constructor)
  1249. else:
  1250. raise NotImplementedError
  1251. def add_multi_constructor(
  1252. tag_prefix: Any, multi_constructor: Any, Loader: Any = None, constructor: Any = Constructor, # NOQA
  1253. ) -> None:
  1254. """
  1255. Add a multi-constructor for the given tag prefix.
  1256. Multi-constructor is called for a node if its tag starts with tag_prefix.
  1257. Multi-constructor accepts a Loader instance, a tag suffix,
  1258. and a node object and produces the corresponding Python object.
  1259. """
  1260. if Loader is None:
  1261. constructor.add_multi_constructor(tag_prefix, multi_constructor)
  1262. else:
  1263. if False and hasattr(Loader, 'add_multi_constructor'):
  1264. Loader.add_multi_constructor(tag_prefix, constructor)
  1265. return
  1266. if issubclass(Loader, BaseLoader):
  1267. BaseConstructor.add_multi_constructor(tag_prefix, multi_constructor)
  1268. elif issubclass(Loader, SafeLoader):
  1269. SafeConstructor.add_multi_constructor(tag_prefix, multi_constructor)
  1270. elif issubclass(Loader, ruamel.yaml.loader.Loader):
  1271. Constructor.add_multi_constructor(tag_prefix, multi_constructor)
  1272. elif issubclass(Loader, RoundTripLoader):
  1273. RoundTripConstructor.add_multi_constructor(tag_prefix, multi_constructor)
  1274. else:
  1275. raise NotImplementedError
  1276. def add_representer(
  1277. data_type: Any, object_representer: Any, Dumper: Any = None, representer: Any = Representer, # NOQA
  1278. ) -> None:
  1279. """
  1280. Add a representer for the given type.
  1281. object_representer is a function accepting a Dumper instance
  1282. and an instance of the given data type
  1283. and producing the corresponding representation node.
  1284. """
  1285. if Dumper is None:
  1286. representer.add_representer(data_type, object_representer)
  1287. else:
  1288. if hasattr(Dumper, 'add_representer'):
  1289. Dumper.add_representer(data_type, object_representer)
  1290. return
  1291. if issubclass(Dumper, BaseDumper):
  1292. BaseRepresenter.add_representer(data_type, object_representer)
  1293. elif issubclass(Dumper, SafeDumper):
  1294. SafeRepresenter.add_representer(data_type, object_representer)
  1295. elif issubclass(Dumper, Dumper):
  1296. Representer.add_representer(data_type, object_representer)
  1297. elif issubclass(Dumper, RoundTripDumper):
  1298. RoundTripRepresenter.add_representer(data_type, object_representer)
  1299. else:
  1300. raise NotImplementedError
  1301. # this code currently not tested
  1302. def add_multi_representer(
  1303. data_type: Any, multi_representer: Any, Dumper: Any = None, representer: Any = Representer,
  1304. ) -> None:
  1305. """
  1306. Add a representer for the given type.
  1307. multi_representer is a function accepting a Dumper instance
  1308. and an instance of the given data type or subtype
  1309. and producing the corresponding representation node.
  1310. """
  1311. if Dumper is None:
  1312. representer.add_multi_representer(data_type, multi_representer)
  1313. else:
  1314. if hasattr(Dumper, 'add_multi_representer'):
  1315. Dumper.add_multi_representer(data_type, multi_representer)
  1316. return
  1317. if issubclass(Dumper, BaseDumper):
  1318. BaseRepresenter.add_multi_representer(data_type, multi_representer)
  1319. elif issubclass(Dumper, SafeDumper):
  1320. SafeRepresenter.add_multi_representer(data_type, multi_representer)
  1321. elif issubclass(Dumper, Dumper):
  1322. Representer.add_multi_representer(data_type, multi_representer)
  1323. elif issubclass(Dumper, RoundTripDumper):
  1324. RoundTripRepresenter.add_multi_representer(data_type, multi_representer)
  1325. else:
  1326. raise NotImplementedError
  1327. class YAMLObjectMetaclass(type):
  1328. """
  1329. The metaclass for YAMLObject.
  1330. """
  1331. def __init__(cls, name: Any, bases: Any, kwds: Any) -> None:
  1332. super().__init__(name, bases, kwds)
  1333. if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None:
  1334. cls.yaml_constructor.add_constructor(cls.yaml_tag, cls.from_yaml) # type: ignore
  1335. cls.yaml_representer.add_representer(cls, cls.to_yaml) # type: ignore
  1336. class YAMLObject(with_metaclass(YAMLObjectMetaclass)): # type: ignore
  1337. """
  1338. An object that can dump itself to a YAML stream
  1339. and load itself from a YAML stream.
  1340. """
  1341. __slots__ = () # no direct instantiation, so allow immutable subclasses
  1342. yaml_constructor = Constructor
  1343. yaml_representer = Representer
  1344. yaml_tag: Any = None
  1345. yaml_flow_style: Any = None
  1346. @classmethod
  1347. def from_yaml(cls, constructor: Any, node: Any) -> Any:
  1348. """
  1349. Convert a representation node to a Python object.
  1350. """
  1351. return constructor.construct_yaml_object(node, cls)
  1352. @classmethod
  1353. def to_yaml(cls, representer: Any, data: Any) -> Any:
  1354. """
  1355. Convert a Python object to a representation node.
  1356. """
  1357. return representer.represent_yaml_object(
  1358. cls.yaml_tag, data, cls, flow_style=cls.yaml_flow_style,
  1359. )