You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1797 line
69 KiB

  1. from __future__ import annotations
  2. # Emitter expects events obeying the following grammar:
  3. # stream ::= STREAM-START document* STREAM-END
  4. # document ::= DOCUMENT-START node DOCUMENT-END
  5. # node ::= SCALAR | sequence | mapping
  6. # sequence ::= SEQUENCE-START node* SEQUENCE-END
  7. # mapping ::= MAPPING-START (node node)* MAPPING-END
  8. import sys
  9. from ruamel.yaml.error import YAMLError, YAMLStreamError
  10. from ruamel.yaml.events import * # NOQA
  11. # fmt: off
  12. from ruamel.yaml.compat import nprint, dbg, DBG_EVENT, \
  13. check_anchorname_char, nprintf # NOQA
  14. # fmt: on
  15. if False: # MYPY
  16. from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
  17. from ruamel.yaml.compat import StreamType # NOQA
  18. __all__ = ['Emitter', 'EmitterError']
  19. class EmitterError(YAMLError):
  20. pass
  21. class ScalarAnalysis:
  22. def __init__(
  23. self,
  24. scalar: Any,
  25. empty: Any,
  26. multiline: Any,
  27. allow_flow_plain: bool,
  28. allow_block_plain: bool,
  29. allow_single_quoted: bool,
  30. allow_double_quoted: bool,
  31. allow_block: bool,
  32. ) -> None:
  33. self.scalar = scalar
  34. self.empty = empty
  35. self.multiline = multiline
  36. self.allow_flow_plain = allow_flow_plain
  37. self.allow_block_plain = allow_block_plain
  38. self.allow_single_quoted = allow_single_quoted
  39. self.allow_double_quoted = allow_double_quoted
  40. self.allow_block = allow_block
  41. def __repr__(self) -> str:
  42. return f'scalar={self.scalar!r}, empty={self.empty}, multiline={self.multiline}, allow_flow_plain={self.allow_flow_plain}, allow_block_plain={self.allow_block_plain}, allow_single_quoted={self.allow_single_quoted}, allow_double_quoted={self.allow_double_quoted}, allow_block={self.allow_block}' # NOQA
  43. class Indents:
  44. # replacement for the list based stack of None/int
  45. def __init__(self) -> None:
  46. self.values: List[Tuple[Any, bool]] = []
  47. def append(self, val: Any, seq: Any) -> None:
  48. self.values.append((val, seq))
  49. def pop(self) -> Any:
  50. return self.values.pop()[0]
  51. def seq_seq(self) -> bool:
  52. try:
  53. if self.values[-2][1] and self.values[-1][1]:
  54. return True
  55. except IndexError:
  56. pass
  57. return False
  58. def last_seq(self) -> bool:
  59. # return the seq(uence) value for the element added before the last one
  60. # in increase_indent()
  61. try:
  62. return self.values[-2][1]
  63. except IndexError:
  64. return False
  65. def seq_flow_align(
  66. self, seq_indent: int, column: int, pre_comment: Optional[bool] = False,
  67. ) -> int:
  68. # extra spaces because of dash
  69. # nprint('seq_flow_align', self.values, pre_comment)
  70. if len(self.values) < 2 or not self.values[-1][1]:
  71. if len(self.values) == 0 or not pre_comment:
  72. return 0
  73. base = self.values[-1][0] if self.values[-1][0] is not None else 0
  74. if pre_comment:
  75. return base + seq_indent # type: ignore
  76. # return (len(self.values)) * seq_indent
  77. # -1 for the dash
  78. return base + seq_indent - column - 1 # type: ignore
  79. def __len__(self) -> int:
  80. return len(self.values)
  81. class Emitter:
  82. # fmt: off
  83. DEFAULT_TAG_PREFIXES = {
  84. '!': '!',
  85. 'tag:yaml.org,2002:': '!!',
  86. '!!': '!!',
  87. }
  88. # fmt: on
  89. MAX_SIMPLE_KEY_LENGTH = 128
  90. flow_seq_start = '['
  91. flow_seq_end = ']'
  92. flow_seq_separator = ','
  93. flow_map_start = '{'
  94. flow_map_end = '}'
  95. flow_map_separator = ','
  96. def __init__(
  97. self,
  98. stream: StreamType,
  99. canonical: Any = None,
  100. indent: Optional[int] = None,
  101. width: Optional[int] = None,
  102. allow_unicode: Optional[bool] = None,
  103. line_break: Any = None,
  104. block_seq_indent: Optional[int] = None,
  105. top_level_colon_align: Optional[bool] = None,
  106. prefix_colon: Any = None,
  107. brace_single_entry_mapping_in_flow_sequence: Optional[bool] = None,
  108. dumper: Any = None,
  109. ) -> None:
  110. # NOQA
  111. self.dumper = dumper
  112. if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None:
  113. self.dumper._emitter = self
  114. self.stream = stream
  115. # Encoding can be overriden by STREAM-START.
  116. self.encoding: Optional[Text] = None
  117. self.allow_space_break = None
  118. # Emitter is a state machine with a stack of states to handle nested
  119. # structures.
  120. self.states: List[Any] = []
  121. self.state: Any = self.expect_stream_start
  122. # Current event and the event queue.
  123. self.events: List[Any] = []
  124. self.event: Any = None
  125. # The current indentation level and the stack of previous indents.
  126. self.indents = Indents()
  127. self.indent: Optional[int] = None
  128. # flow_context is an expanding/shrinking list consisting of '{' and '['
  129. # for each unclosed flow context. If empty list that means block context
  130. self.flow_context: List[Text] = []
  131. # Contexts.
  132. self.root_context = False
  133. self.sequence_context = False
  134. self.mapping_context = False
  135. self.simple_key_context = False
  136. # Characteristics of the last emitted character:
  137. # - current position.
  138. # - is it a whitespace?
  139. # - is it an indention character
  140. # (indentation space, '-', '?', or ':')?
  141. self.line = 0
  142. self.column = 0
  143. self.whitespace = True
  144. self.indention = True
  145. self.compact_seq_seq = True # dash after dash
  146. self.compact_seq_map = True # key after dash
  147. # self.compact_ms = False # dash after key, only when excplicit key with ?
  148. self.no_newline: Optional[bool] = None # set if directly after `- `
  149. # Whether the document requires an explicit document end indicator
  150. self.open_ended = False
  151. # colon handling
  152. self.colon = ':'
  153. self.prefixed_colon = self.colon if prefix_colon is None else prefix_colon + self.colon
  154. # single entry mappings in flow sequence
  155. self.brace_single_entry_mapping_in_flow_sequence = (
  156. brace_single_entry_mapping_in_flow_sequence # NOQA
  157. )
  158. # Formatting details.
  159. self.canonical = canonical
  160. self.allow_unicode = allow_unicode
  161. # set to False to get "\Uxxxxxxxx" for non-basic unicode like emojis
  162. self.unicode_supplementary = sys.maxunicode > 0xFFFF
  163. self.sequence_dash_offset = block_seq_indent if block_seq_indent else 0
  164. self.top_level_colon_align = top_level_colon_align
  165. self.best_sequence_indent = 2
  166. self.requested_indent = indent # specific for literal zero indent
  167. if indent and 1 < indent < 10:
  168. self.best_sequence_indent = indent
  169. self.best_map_indent = self.best_sequence_indent
  170. # if self.best_sequence_indent < self.sequence_dash_offset + 1:
  171. # self.best_sequence_indent = self.sequence_dash_offset + 1
  172. self.best_width = 80
  173. if width and width > self.best_sequence_indent * 2:
  174. self.best_width = width
  175. self.best_line_break: Any = '\n'
  176. if line_break in ['\r', '\n', '\r\n']:
  177. self.best_line_break = line_break
  178. # Tag prefixes.
  179. self.tag_prefixes: Any = None
  180. # Prepared anchor and tag.
  181. self.prepared_anchor: Any = None
  182. self.prepared_tag: Any = None
  183. # Scalar analysis and style.
  184. self.analysis: Any = None
  185. self.style: Any = None
  186. self.scalar_after_indicator = True # write a scalar on the same line as `---`
  187. self.alt_null = 'null'
  188. @property
  189. def stream(self) -> Any:
  190. try:
  191. return self._stream
  192. except AttributeError:
  193. raise YAMLStreamError('output stream needs to be specified')
  194. @stream.setter
  195. def stream(self, val: Any) -> None:
  196. if val is None:
  197. return
  198. if not hasattr(val, 'write'):
  199. raise YAMLStreamError('stream argument needs to have a write() method')
  200. self._stream = val
  201. @property
  202. def serializer(self) -> Any:
  203. try:
  204. if hasattr(self.dumper, 'typ'):
  205. return self.dumper.serializer
  206. return self.dumper._serializer
  207. except AttributeError:
  208. return self # cyaml
  209. @property
  210. def flow_level(self) -> int:
  211. return len(self.flow_context)
  212. def dispose(self) -> None:
  213. # Reset the state attributes (to clear self-references)
  214. self.states = []
  215. self.state = None
  216. def emit(self, event: Any) -> None:
  217. if dbg(DBG_EVENT):
  218. nprint(event)
  219. self.events.append(event)
  220. while not self.need_more_events():
  221. self.event = self.events.pop(0)
  222. self.state()
  223. self.event = None
  224. # In some cases, we wait for a few next events before emitting.
  225. def need_more_events(self) -> bool:
  226. if not self.events:
  227. return True
  228. event = self.events[0]
  229. if isinstance(event, DocumentStartEvent):
  230. return self.need_events(1)
  231. elif isinstance(event, SequenceStartEvent):
  232. return self.need_events(2)
  233. elif isinstance(event, MappingStartEvent):
  234. return self.need_events(3)
  235. else:
  236. return False
  237. def need_events(self, count: int) -> bool:
  238. level = 0
  239. for event in self.events[1:]:
  240. if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
  241. level += 1
  242. elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
  243. level -= 1
  244. elif isinstance(event, StreamEndEvent):
  245. level = -1
  246. if level < 0:
  247. return False
  248. return len(self.events) < count + 1
  249. def increase_indent(
  250. self, flow: bool = False, sequence: Optional[bool] = None, indentless: bool = False,
  251. ) -> None:
  252. self.indents.append(self.indent, sequence)
  253. if self.indent is None: # top level
  254. if flow:
  255. # self.indent = self.best_sequence_indent if self.indents.last_seq() else \
  256. # self.best_map_indent
  257. # self.indent = self.best_sequence_indent
  258. self.indent = self.requested_indent
  259. else:
  260. self.indent = 0
  261. elif not indentless:
  262. self.indent += (
  263. self.best_sequence_indent if self.indents.last_seq() else self.best_map_indent
  264. )
  265. # if self.indents.last_seq():
  266. # if self.indent == 0: # top level block sequence
  267. # self.indent = self.best_sequence_indent - self.sequence_dash_offset
  268. # else:
  269. # self.indent += self.best_sequence_indent
  270. # else:
  271. # self.indent += self.best_map_indent
  272. # States.
  273. # Stream handlers.
  274. def expect_stream_start(self) -> None:
  275. if isinstance(self.event, StreamStartEvent):
  276. if self.event.encoding and not hasattr(self.stream, 'encoding'):
  277. self.encoding = self.event.encoding
  278. self.write_stream_start()
  279. self.state = self.expect_first_document_start
  280. else:
  281. raise EmitterError(f'expected StreamStartEvent, but got {self.event!s}')
  282. def expect_nothing(self) -> None:
  283. raise EmitterError(f'expected nothing, but got {self.event!s}')
  284. # Document handlers.
  285. def expect_first_document_start(self) -> Any:
  286. return self.expect_document_start(first=True)
  287. def expect_document_start(self, first: bool = False) -> None:
  288. if isinstance(self.event, DocumentStartEvent):
  289. if (self.event.version or self.event.tags) and self.open_ended:
  290. self.write_indicator('...', True)
  291. self.write_indent()
  292. if self.event.version:
  293. version_text = self.prepare_version(self.event.version)
  294. self.write_version_directive(version_text)
  295. self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
  296. if self.event.tags:
  297. handles = sorted(self.event.tags.keys())
  298. for handle in handles:
  299. prefix = self.event.tags[handle]
  300. self.tag_prefixes[prefix] = handle
  301. handle_text = self.prepare_tag_handle(handle)
  302. prefix_text = self.prepare_tag_prefix(prefix)
  303. self.write_tag_directive(handle_text, prefix_text)
  304. implicit = (
  305. first
  306. and not self.event.explicit
  307. and not self.canonical
  308. and not self.event.version
  309. and not self.event.tags
  310. and not self.check_empty_document()
  311. )
  312. if not implicit:
  313. self.write_indent()
  314. self.write_indicator('---', True)
  315. if self.canonical:
  316. self.write_indent()
  317. self.state = self.expect_document_root
  318. elif isinstance(self.event, StreamEndEvent):
  319. if self.open_ended:
  320. self.write_indicator('...', True)
  321. self.write_indent()
  322. self.write_stream_end()
  323. self.state = self.expect_nothing
  324. else:
  325. raise EmitterError(f'expected DocumentStartEvent, but got {self.event!s}')
  326. def expect_document_end(self) -> None:
  327. if isinstance(self.event, DocumentEndEvent):
  328. self.write_indent()
  329. if self.event.explicit:
  330. self.write_indicator('...', True)
  331. self.write_indent()
  332. self.flush_stream()
  333. self.state = self.expect_document_start
  334. else:
  335. raise EmitterError(f'expected DocumentEndEvent, but got {self.event!s}')
  336. def expect_document_root(self) -> None:
  337. self.states.append(self.expect_document_end)
  338. self.expect_node(root=True)
  339. # Node handlers.
  340. def expect_node(
  341. self,
  342. root: bool = False,
  343. sequence: bool = False,
  344. mapping: bool = False,
  345. simple_key: bool = False,
  346. ) -> None:
  347. self.root_context = root
  348. self.sequence_context = sequence # not used in PyYAML
  349. force_flow_indent = False
  350. self.mapping_context = mapping
  351. self.simple_key_context = simple_key
  352. if isinstance(self.event, AliasEvent):
  353. self.expect_alias()
  354. elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
  355. if (
  356. self.process_anchor('&')
  357. and isinstance(self.event, ScalarEvent)
  358. and self.sequence_context
  359. ):
  360. self.sequence_context = False
  361. if (
  362. root
  363. and isinstance(self.event, ScalarEvent)
  364. and not self.scalar_after_indicator
  365. ):
  366. self.write_indent()
  367. self.process_tag()
  368. if isinstance(self.event, ScalarEvent):
  369. # nprint('@', self.indention, self.no_newline, self.column)
  370. self.expect_scalar()
  371. elif isinstance(self.event, SequenceStartEvent):
  372. i2, n2 = self.indention, self.no_newline # NOQA
  373. if self.event.comment:
  374. if self.event.flow_style is False:
  375. if self.write_post_comment(self.event):
  376. self.indention = False
  377. self.no_newline = True
  378. if self.event.flow_style:
  379. column = self.column
  380. if self.write_pre_comment(self.event):
  381. if self.event.flow_style:
  382. # force_flow_indent = True
  383. force_flow_indent = not self.indents.values[-1][1]
  384. self.indention = i2
  385. self.no_newline = not self.indention
  386. if self.event.flow_style:
  387. self.column = column
  388. if (
  389. self.flow_level
  390. or self.canonical
  391. or self.event.flow_style
  392. or self.check_empty_sequence()
  393. ):
  394. self.expect_flow_sequence(force_flow_indent)
  395. else:
  396. self.expect_block_sequence()
  397. if self.indents.seq_seq():
  398. # - -
  399. self.indention = True
  400. self.no_newline = False
  401. elif isinstance(self.event, MappingStartEvent):
  402. if self.event.flow_style is False and self.event.comment:
  403. self.write_post_comment(self.event)
  404. if self.event.comment and self.event.comment[1]:
  405. self.write_pre_comment(self.event)
  406. if self.event.flow_style and self.indents.values:
  407. force_flow_indent = not self.indents.values[-1][1]
  408. if (
  409. self.flow_level
  410. or self.canonical
  411. or self.event.flow_style
  412. or self.check_empty_mapping()
  413. ):
  414. self.expect_flow_mapping(
  415. single=self.event.nr_items == 1, force_flow_indent=force_flow_indent,
  416. )
  417. else:
  418. self.expect_block_mapping()
  419. else:
  420. raise EmitterError(f'expected NodeEvent, but got {self.event!s}')
  421. def expect_alias(self) -> None:
  422. if self.event.anchor is None:
  423. raise EmitterError('anchor is not specified for alias')
  424. self.process_anchor('*')
  425. self.state = self.states.pop()
  426. def expect_scalar(self) -> None:
  427. self.increase_indent(flow=True)
  428. self.process_scalar()
  429. self.indent = self.indents.pop()
  430. self.state = self.states.pop()
  431. # Flow sequence handlers.
  432. def expect_flow_sequence(self, force_flow_indent: Optional[bool] = False) -> None:
  433. if force_flow_indent:
  434. self.increase_indent(flow=True, sequence=True)
  435. ind = self.indents.seq_flow_align(
  436. self.best_sequence_indent, self.column, force_flow_indent,
  437. )
  438. self.write_indicator(' ' * ind + self.flow_seq_start, True, whitespace=True)
  439. if not force_flow_indent:
  440. self.increase_indent(flow=True, sequence=True)
  441. self.flow_context.append('[')
  442. self.state = self.expect_first_flow_sequence_item
  443. def expect_first_flow_sequence_item(self) -> None:
  444. if isinstance(self.event, SequenceEndEvent):
  445. self.indent = self.indents.pop()
  446. popped = self.flow_context.pop()
  447. assert popped == '['
  448. self.write_indicator(self.flow_seq_end, False)
  449. if self.event.comment and self.event.comment[0]:
  450. # eol comment on empty flow sequence
  451. self.write_post_comment(self.event)
  452. elif self.flow_level == 0:
  453. self.write_line_break()
  454. self.state = self.states.pop()
  455. else:
  456. if self.canonical or self.column > self.best_width:
  457. self.write_indent()
  458. self.states.append(self.expect_flow_sequence_item)
  459. self.expect_node(sequence=True)
  460. def expect_flow_sequence_item(self) -> None:
  461. if isinstance(self.event, SequenceEndEvent):
  462. self.indent = self.indents.pop()
  463. popped = self.flow_context.pop()
  464. assert popped == '['
  465. if self.canonical:
  466. # ToDo: so-39595807, maybe add a space to the flow_seq_separator
  467. # and strip the last space, if space then indent, else do not
  468. # not sure that [1,2,3] is a valid YAML seq
  469. self.write_indicator(self.flow_seq_separator, False)
  470. self.write_indent()
  471. self.write_indicator(self.flow_seq_end, False)
  472. if self.event.comment and self.event.comment[0]:
  473. # eol comment on flow sequence
  474. self.write_post_comment(self.event)
  475. else:
  476. self.no_newline = False
  477. self.state = self.states.pop()
  478. else:
  479. self.write_indicator(self.flow_seq_separator, False)
  480. if self.canonical or self.column > self.best_width:
  481. self.write_indent()
  482. self.states.append(self.expect_flow_sequence_item)
  483. self.expect_node(sequence=True)
  484. # Flow mapping handlers.
  485. def expect_flow_mapping(
  486. self, single: Optional[bool] = False, force_flow_indent: Optional[bool] = False,
  487. ) -> None:
  488. if force_flow_indent:
  489. self.increase_indent(flow=True, sequence=False)
  490. ind = self.indents.seq_flow_align(
  491. self.best_sequence_indent, self.column, force_flow_indent,
  492. )
  493. map_init = self.flow_map_start
  494. if (
  495. single
  496. and self.flow_level
  497. and self.flow_context[-1] == '['
  498. and not self.canonical
  499. and not self.brace_single_entry_mapping_in_flow_sequence
  500. ):
  501. # single map item with flow context, no curly braces necessary
  502. map_init = ''
  503. self.write_indicator(' ' * ind + map_init, True, whitespace=True)
  504. self.flow_context.append(map_init)
  505. if not force_flow_indent:
  506. self.increase_indent(flow=True, sequence=False)
  507. self.state = self.expect_first_flow_mapping_key
  508. def expect_first_flow_mapping_key(self) -> None:
  509. if isinstance(self.event, MappingEndEvent):
  510. self.indent = self.indents.pop()
  511. popped = self.flow_context.pop()
  512. assert popped == '{' # empty flow mapping
  513. self.write_indicator(self.flow_map_end, False)
  514. if self.event.comment and self.event.comment[0]:
  515. # eol comment on empty mapping
  516. self.write_post_comment(self.event)
  517. elif self.flow_level == 0:
  518. self.write_line_break()
  519. self.state = self.states.pop()
  520. else:
  521. if self.canonical or self.column > self.best_width:
  522. self.write_indent()
  523. if not self.canonical and self.check_simple_key():
  524. self.states.append(self.expect_flow_mapping_simple_value)
  525. self.expect_node(mapping=True, simple_key=True)
  526. else:
  527. self.write_indicator('?', True)
  528. self.states.append(self.expect_flow_mapping_value)
  529. self.expect_node(mapping=True)
  530. def expect_flow_mapping_key(self) -> None:
  531. if isinstance(self.event, MappingEndEvent):
  532. # if self.event.comment and self.event.comment[1]:
  533. # self.write_pre_comment(self.event)
  534. self.indent = self.indents.pop()
  535. popped = self.flow_context.pop()
  536. assert popped in ['{', '']
  537. if self.canonical:
  538. self.write_indicator(self.flow_map_separator, False)
  539. self.write_indent()
  540. if popped != '':
  541. self.write_indicator(self.flow_map_end, False)
  542. if self.event.comment and self.event.comment[0]:
  543. # eol comment on flow mapping, never reached on empty mappings
  544. self.write_post_comment(self.event)
  545. else:
  546. self.no_newline = False
  547. self.state = self.states.pop()
  548. else:
  549. self.write_indicator(self.flow_map_separator, False)
  550. if self.canonical or self.column > self.best_width:
  551. self.write_indent()
  552. if not self.canonical and self.check_simple_key():
  553. self.states.append(self.expect_flow_mapping_simple_value)
  554. self.expect_node(mapping=True, simple_key=True)
  555. else:
  556. self.write_indicator('?', True)
  557. self.states.append(self.expect_flow_mapping_value)
  558. self.expect_node(mapping=True)
  559. def expect_flow_mapping_simple_value(self) -> None:
  560. if getattr(self.event, 'style', '?') != '-': # suppress for flow style sets
  561. self.write_indicator(self.prefixed_colon, False)
  562. self.states.append(self.expect_flow_mapping_key)
  563. self.expect_node(mapping=True)
  564. def expect_flow_mapping_value(self) -> None:
  565. if self.canonical or self.column > self.best_width:
  566. self.write_indent()
  567. self.write_indicator(self.prefixed_colon, True)
  568. self.states.append(self.expect_flow_mapping_key)
  569. self.expect_node(mapping=True)
  570. # Block sequence handlers.
  571. def expect_block_sequence(self) -> None:
  572. if self.mapping_context:
  573. indentless = not self.indention
  574. else:
  575. indentless = False
  576. if not self.compact_seq_seq and self.column != 0:
  577. self.write_line_break()
  578. self.increase_indent(flow=False, sequence=True, indentless=indentless)
  579. self.state = self.expect_first_block_sequence_item
  580. def expect_first_block_sequence_item(self) -> Any:
  581. return self.expect_block_sequence_item(first=True)
  582. def expect_block_sequence_item(self, first: bool = False) -> None:
  583. if not first and isinstance(self.event, SequenceEndEvent):
  584. if self.event.comment and self.event.comment[1]:
  585. # final comments on a block list e.g. empty line
  586. self.write_pre_comment(self.event)
  587. self.indent = self.indents.pop()
  588. self.state = self.states.pop()
  589. self.no_newline = False
  590. else:
  591. if self.event.comment and self.event.comment[1]:
  592. self.write_pre_comment(self.event)
  593. nonl = self.no_newline if self.column == 0 else False
  594. self.write_indent()
  595. ind = self.sequence_dash_offset # if len(self.indents) > 1 else 0
  596. self.write_indicator(' ' * ind + '-', True, indention=True)
  597. if nonl or self.sequence_dash_offset + 2 > self.best_sequence_indent:
  598. self.no_newline = True
  599. self.states.append(self.expect_block_sequence_item)
  600. self.expect_node(sequence=True)
  601. # Block mapping handlers.
  602. def expect_block_mapping(self) -> None:
  603. if not self.mapping_context and not (self.compact_seq_map or self.column == 0):
  604. self.write_line_break()
  605. self.increase_indent(flow=False, sequence=False)
  606. self.state = self.expect_first_block_mapping_key
  607. def expect_first_block_mapping_key(self) -> None:
  608. return self.expect_block_mapping_key(first=True)
  609. def expect_block_mapping_key(self, first: Any = False) -> None:
  610. if not first and isinstance(self.event, MappingEndEvent):
  611. if self.event.comment and self.event.comment[1]:
  612. # final comments from a doc
  613. self.write_pre_comment(self.event)
  614. self.indent = self.indents.pop()
  615. self.state = self.states.pop()
  616. else:
  617. if self.event.comment and self.event.comment[1]:
  618. # final comments from a doc
  619. self.write_pre_comment(self.event)
  620. self.write_indent()
  621. if self.check_simple_key():
  622. if not isinstance(
  623. self.event, (SequenceStartEvent, MappingStartEvent),
  624. ): # sequence keys
  625. try:
  626. if self.event.style == '?':
  627. self.write_indicator('?', True, indention=True)
  628. except AttributeError: # aliases have no style
  629. pass
  630. self.states.append(self.expect_block_mapping_simple_value)
  631. self.expect_node(mapping=True, simple_key=True)
  632. # test on style for alias in !!set
  633. if isinstance(self.event, AliasEvent) and not self.event.style == '?':
  634. self.stream.write(' ')
  635. else:
  636. self.write_indicator('?', True, indention=True)
  637. self.states.append(self.expect_block_mapping_value)
  638. self.expect_node(mapping=True)
  639. def expect_block_mapping_simple_value(self) -> None:
  640. if getattr(self.event, 'style', None) != '?':
  641. # prefix = ''
  642. if self.indent == 0 and self.top_level_colon_align is not None:
  643. # write non-prefixed colon
  644. c = ' ' * (self.top_level_colon_align - self.column) + self.colon
  645. else:
  646. c = self.prefixed_colon
  647. self.write_indicator(c, False)
  648. self.states.append(self.expect_block_mapping_key)
  649. self.expect_node(mapping=True)
  650. def expect_block_mapping_value(self) -> None:
  651. self.write_indent()
  652. self.write_indicator(self.prefixed_colon, True, indention=True)
  653. self.states.append(self.expect_block_mapping_key)
  654. self.expect_node(mapping=True)
  655. # Checkers.
  656. def check_empty_sequence(self) -> bool:
  657. return (
  658. isinstance(self.event, SequenceStartEvent)
  659. and bool(self.events)
  660. and isinstance(self.events[0], SequenceEndEvent)
  661. )
  662. def check_empty_mapping(self) -> bool:
  663. return (
  664. isinstance(self.event, MappingStartEvent)
  665. and bool(self.events)
  666. and isinstance(self.events[0], MappingEndEvent)
  667. )
  668. def check_empty_document(self) -> bool:
  669. if not isinstance(self.event, DocumentStartEvent) or not self.events:
  670. return False
  671. event = self.events[0]
  672. return (
  673. isinstance(event, ScalarEvent)
  674. and event.anchor is None
  675. and event.tag is None
  676. and event.implicit
  677. and event.value == ""
  678. )
  679. def check_simple_key(self) -> bool:
  680. length = 0
  681. if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
  682. if self.prepared_anchor is None:
  683. self.prepared_anchor = self.prepare_anchor(self.event.anchor)
  684. length += len(self.prepared_anchor)
  685. if (
  686. isinstance(self.event, (ScalarEvent, CollectionStartEvent))
  687. and self.event.tag is not None
  688. ):
  689. if self.prepared_tag is None:
  690. self.prepared_tag = self.prepare_tag(self.event.ctag)
  691. length += len(self.prepared_tag)
  692. if isinstance(self.event, ScalarEvent):
  693. if self.analysis is None:
  694. self.analysis = self.analyze_scalar(self.event.value)
  695. length += len(self.analysis.scalar)
  696. return length < self.MAX_SIMPLE_KEY_LENGTH and (
  697. isinstance(self.event, AliasEvent)
  698. or (isinstance(self.event, SequenceStartEvent) and self.event.flow_style is True)
  699. or (isinstance(self.event, MappingStartEvent) and self.event.flow_style is True)
  700. or (
  701. isinstance(self.event, ScalarEvent)
  702. # if there is an explicit style for an empty string, it is a simple key
  703. and not (self.analysis.empty and self.style and self.style not in '\'"')
  704. and not self.analysis.multiline
  705. )
  706. or self.check_empty_sequence()
  707. or self.check_empty_mapping()
  708. )
  709. # Anchor, Tag, and Scalar processors.
  710. def process_anchor(self, indicator: Any) -> bool:
  711. if self.event.anchor is None:
  712. self.prepared_anchor = None
  713. return False
  714. if self.prepared_anchor is None:
  715. self.prepared_anchor = self.prepare_anchor(self.event.anchor)
  716. if self.prepared_anchor:
  717. self.write_indicator(indicator + self.prepared_anchor, True)
  718. # issue 288
  719. self.no_newline = False
  720. self.prepared_anchor = None
  721. return True
  722. def process_tag(self) -> None:
  723. tag = self.event.tag
  724. if isinstance(self.event, ScalarEvent):
  725. if self.style is None:
  726. self.style = self.choose_scalar_style()
  727. if (
  728. self.event.value == ''
  729. and self.style == "'"
  730. and tag == 'tag:yaml.org,2002:null'
  731. and self.alt_null is not None
  732. ):
  733. self.event.value = self.alt_null
  734. self.analysis = None
  735. self.style = self.choose_scalar_style()
  736. if (not self.canonical or tag is None) and (
  737. (self.style == "" and self.event.implicit[0])
  738. or (self.style != "" and self.event.implicit[1])
  739. ):
  740. self.prepared_tag = None
  741. return
  742. if self.event.implicit[0] and tag is None:
  743. tag = '!'
  744. self.prepared_tag = None
  745. else:
  746. if (not self.canonical or tag is None) and self.event.implicit:
  747. self.prepared_tag = None
  748. return
  749. if tag is None:
  750. raise EmitterError('tag is not specified')
  751. if self.prepared_tag is None:
  752. self.prepared_tag = self.prepare_tag(self.event.ctag)
  753. if self.prepared_tag:
  754. self.write_indicator(self.prepared_tag, True)
  755. if (
  756. self.sequence_context
  757. and not self.flow_level
  758. and isinstance(self.event, ScalarEvent)
  759. ):
  760. self.no_newline = True
  761. self.prepared_tag = None
  762. def choose_scalar_style(self) -> Any:
  763. # issue 449 needs this otherwise emits single quoted empty string
  764. if self.event.value == '' and self.event.ctag.handle == '!!':
  765. return None
  766. if self.analysis is None:
  767. self.analysis = self.analyze_scalar(self.event.value)
  768. if self.event.style == '"' or self.canonical:
  769. return '"'
  770. if (not self.event.style or self.event.style == '?' or self.event.style == '-') and (
  771. self.event.implicit[0] or not self.event.implicit[2]
  772. ):
  773. if not (
  774. self.simple_key_context and (self.analysis.empty or self.analysis.multiline)
  775. ) and (
  776. self.flow_level
  777. and self.analysis.allow_flow_plain
  778. or (not self.flow_level and self.analysis.allow_block_plain)
  779. ):
  780. return ""
  781. if self.event.style == '-':
  782. return ""
  783. self.analysis.allow_block = True
  784. if self.event.style and self.event.style in '|>':
  785. if (
  786. not self.flow_level
  787. and not self.simple_key_context
  788. and self.analysis.allow_block
  789. ):
  790. return self.event.style
  791. if not self.event.style and self.analysis.allow_double_quoted:
  792. if "'" in self.event.value or '\n' in self.event.value:
  793. return '"'
  794. if not self.event.style or self.event.style == "'":
  795. if self.analysis.allow_single_quoted and not (
  796. self.simple_key_context and self.analysis.multiline
  797. ):
  798. return "'"
  799. return '"'
  800. def process_scalar(self) -> None:
  801. if self.analysis is None:
  802. self.analysis = self.analyze_scalar(self.event.value)
  803. if self.style is None:
  804. self.style = self.choose_scalar_style()
  805. split = not self.simple_key_context
  806. # if self.analysis.multiline and split \
  807. # and (not self.style or self.style in '\'\"'):
  808. # self.write_indent()
  809. # nprint('xx', self.sequence_context, self.flow_level)
  810. if self.sequence_context and not self.flow_level:
  811. self.write_indent()
  812. if self.style == '"':
  813. self.write_double_quoted(self.analysis.scalar, split)
  814. elif self.style == "'":
  815. self.write_single_quoted(self.analysis.scalar, split)
  816. elif self.style == '>':
  817. try:
  818. cmx = self.event.comment[1][0]
  819. except (IndexError, TypeError) as e: # NOQA
  820. cmx = ""
  821. self.write_folded(self.analysis.scalar, cmx)
  822. if (
  823. self.event.comment
  824. and self.event.comment[0]
  825. and self.event.comment[0].column >= self.indent
  826. ):
  827. # comment following a folded scalar must dedent (issue 376)
  828. self.event.comment[0].column = self.indent - 1 # type: ignore
  829. elif self.style == '|':
  830. # self.write_literal(self.analysis.scalar, self.event.comment)
  831. try:
  832. cmx = self.event.comment[1][0]
  833. except (IndexError, TypeError):
  834. cmx = ""
  835. self.write_literal(self.analysis.scalar, cmx)
  836. if (
  837. self.event.comment
  838. and self.event.comment[0]
  839. and self.event.comment[0].column >= self.indent
  840. ):
  841. # comment following a literal scalar must dedent (issue 376)
  842. self.event.comment[0].column = self.indent - 1 # type: ignore
  843. else:
  844. self.write_plain(self.analysis.scalar, split)
  845. self.analysis = None
  846. self.style = None
  847. if self.event.comment:
  848. self.write_post_comment(self.event)
  849. # Analyzers.
  850. def prepare_version(self, version: Any) -> Any:
  851. major, minor = version
  852. if major != 1:
  853. raise EmitterError(f'unsupported YAML version: {major:d}.{minor:d}')
  854. return f'{major:d}.{minor:d}'
  855. def prepare_tag_handle(self, handle: Any) -> Any:
  856. if not handle:
  857. raise EmitterError('tag handle must not be empty')
  858. if handle[0] != '!' or handle[-1] != '!':
  859. raise EmitterError(f"tag handle must start and end with '!': {handle!r}")
  860. for ch in handle[1:-1]:
  861. if not ('0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in '-_'):
  862. raise EmitterError(f'invalid character {ch!r} in the tag handle: {handle!r}')
  863. return handle
  864. def prepare_tag_prefix(self, prefix: Any) -> Any:
  865. if not prefix:
  866. raise EmitterError('tag prefix must not be empty')
  867. chunks: List[Any] = []
  868. start = end = 0
  869. if prefix[0] == '!':
  870. end = 1
  871. ch_set = "-;/?:@&=+$,_.~*'()[]"
  872. if self.dumper:
  873. version = getattr(self.dumper, 'version', (1, 2))
  874. if version is None or version >= (1, 2):
  875. ch_set += '#'
  876. while end < len(prefix):
  877. ch = prefix[end]
  878. if '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in ch_set:
  879. end += 1
  880. else:
  881. if start < end:
  882. chunks.append(prefix[start:end])
  883. start = end = end + 1
  884. data = ch
  885. for ch in data:
  886. chunks.append(f'%{ord(ch):02X}')
  887. if start < end:
  888. chunks.append(prefix[start:end])
  889. return "".join(chunks)
  890. def prepare_tag(self, tag: Any) -> Any:
  891. if not tag:
  892. raise EmitterError('tag must not be empty')
  893. tag = str(tag)
  894. if tag == '!' or tag == '!!':
  895. return tag
  896. handle = None
  897. suffix = tag
  898. prefixes = sorted(self.tag_prefixes.keys())
  899. for prefix in prefixes:
  900. if tag.startswith(prefix) and (prefix == '!' or len(prefix) < len(tag)):
  901. handle = self.tag_prefixes[prefix]
  902. suffix = tag[len(prefix) :]
  903. chunks: List[Any] = []
  904. start = end = 0
  905. ch_set = "-;/?:@&=+$,_.~*'()[]"
  906. if self.dumper:
  907. version = getattr(self.dumper, 'version', (1, 2))
  908. if version is None or version >= (1, 2):
  909. ch_set += '#'
  910. while end < len(suffix):
  911. ch = suffix[end]
  912. if (
  913. '0' <= ch <= '9'
  914. or 'A' <= ch <= 'Z'
  915. or 'a' <= ch <= 'z'
  916. or ch in ch_set
  917. or (ch == '!' and handle != '!')
  918. ):
  919. end += 1
  920. else:
  921. if start < end:
  922. chunks.append(suffix[start:end])
  923. start = end = end + 1
  924. data = ch
  925. for ch in data:
  926. chunks.append(f'%{ord(ch):02X}')
  927. if start < end:
  928. chunks.append(suffix[start:end])
  929. suffix_text = "".join(chunks)
  930. if handle:
  931. return f'{handle!s}{suffix_text!s}'
  932. else:
  933. return f'!<{suffix_text!s}>'
  934. def prepare_anchor(self, anchor: Any) -> Any:
  935. if not anchor:
  936. raise EmitterError('anchor must not be empty')
  937. for ch in anchor:
  938. if not check_anchorname_char(ch):
  939. raise EmitterError(f'invalid character {ch!r} in the anchor: {anchor!r}')
  940. return anchor
  941. def analyze_scalar(self, scalar: Any) -> Any:
  942. # Empty scalar is a special case.
  943. if not scalar:
  944. return ScalarAnalysis(
  945. scalar=scalar,
  946. empty=True,
  947. multiline=False,
  948. allow_flow_plain=False,
  949. allow_block_plain=True,
  950. allow_single_quoted=True,
  951. allow_double_quoted=True,
  952. allow_block=False,
  953. )
  954. # Indicators and special characters.
  955. block_indicators = False
  956. flow_indicators = False
  957. line_breaks = False
  958. special_characters = False
  959. # Important whitespace combinations.
  960. leading_space = False
  961. leading_break = False
  962. trailing_space = False
  963. trailing_break = False
  964. break_space = False
  965. space_break = False
  966. # Check document indicators.
  967. if scalar.startswith('---') or scalar.startswith('...'):
  968. block_indicators = True
  969. flow_indicators = True
  970. # First character or preceded by a whitespace.
  971. preceeded_by_whitespace = True
  972. # Last character or followed by a whitespace.
  973. followed_by_whitespace = len(scalar) == 1 or scalar[1] in '\0 \t\r\n\x85\u2028\u2029'
  974. # The previous character is a space.
  975. previous_space = False
  976. # The previous character is a break.
  977. previous_break = False
  978. index = 0
  979. while index < len(scalar):
  980. ch = scalar[index]
  981. # Check for indicators.
  982. if index == 0:
  983. # Leading indicators are special characters.
  984. if ch in '#,[]{}&*!|>\'"%@`':
  985. flow_indicators = True
  986. block_indicators = True
  987. if ch in '?:': # ToDo
  988. if self.serializer.use_version == (1, 1):
  989. flow_indicators = True
  990. elif len(scalar) == 1: # single character
  991. flow_indicators = True
  992. if followed_by_whitespace:
  993. block_indicators = True
  994. if ch == '-' and followed_by_whitespace:
  995. flow_indicators = True
  996. block_indicators = True
  997. else:
  998. # Some indicators cannot appear within a scalar as well.
  999. if ch in ',[]{}': # http://yaml.org/spec/1.2/spec.html#id2788859
  1000. flow_indicators = True
  1001. if ch == '?' and self.serializer.use_version == (1, 1):
  1002. flow_indicators = True
  1003. if ch == ':':
  1004. if followed_by_whitespace:
  1005. flow_indicators = True
  1006. block_indicators = True
  1007. if ch == '#' and preceeded_by_whitespace:
  1008. flow_indicators = True
  1009. block_indicators = True
  1010. # Check for line breaks, special, and unicode characters.
  1011. if ch in '\n\x85\u2028\u2029':
  1012. line_breaks = True
  1013. if not (ch == '\n' or '\x20' <= ch <= '\x7E'):
  1014. if (
  1015. ch == '\x85'
  1016. or '\xA0' <= ch <= '\uD7FF'
  1017. or '\uE000' <= ch <= '\uFFFD'
  1018. or (self.unicode_supplementary and ('\U00010000' <= ch <= '\U0010FFFF'))
  1019. ) and ch != '\uFEFF':
  1020. # unicode_characters = True
  1021. if not self.allow_unicode:
  1022. special_characters = True
  1023. else:
  1024. special_characters = True
  1025. # Detect important whitespace combinations.
  1026. if ch == ' ':
  1027. if index == 0:
  1028. leading_space = True
  1029. if index == len(scalar) - 1:
  1030. trailing_space = True
  1031. if previous_break:
  1032. break_space = True
  1033. previous_space = True
  1034. previous_break = False
  1035. elif ch in '\n\x85\u2028\u2029':
  1036. if index == 0:
  1037. leading_break = True
  1038. if index == len(scalar) - 1:
  1039. trailing_break = True
  1040. if previous_space:
  1041. space_break = True
  1042. previous_space = False
  1043. previous_break = True
  1044. else:
  1045. previous_space = False
  1046. previous_break = False
  1047. # Prepare for the next character.
  1048. index += 1
  1049. preceeded_by_whitespace = ch in '\0 \t\r\n\x85\u2028\u2029'
  1050. followed_by_whitespace = (
  1051. index + 1 >= len(scalar) or scalar[index + 1] in '\0 \t\r\n\x85\u2028\u2029'
  1052. )
  1053. # Let's decide what styles are allowed.
  1054. allow_flow_plain = True
  1055. allow_block_plain = True
  1056. allow_single_quoted = True
  1057. allow_double_quoted = True
  1058. allow_block = True
  1059. # Leading and trailing whitespaces are bad for plain scalars.
  1060. if leading_space or leading_break or trailing_space or trailing_break:
  1061. allow_flow_plain = allow_block_plain = False
  1062. # We do not permit trailing spaces for block scalars.
  1063. if trailing_space:
  1064. allow_block = False
  1065. # Spaces at the beginning of a new line are only acceptable for block
  1066. # scalars.
  1067. if break_space:
  1068. allow_flow_plain = allow_block_plain = allow_single_quoted = False
  1069. # Spaces followed by breaks, as well as special character are only
  1070. # allowed for double quoted scalars.
  1071. if special_characters:
  1072. allow_flow_plain = allow_block_plain = allow_single_quoted = allow_block = False
  1073. elif space_break:
  1074. allow_flow_plain = allow_block_plain = allow_single_quoted = False
  1075. if not self.allow_space_break:
  1076. allow_block = False
  1077. # Although the plain scalar writer supports breaks, we never emit
  1078. # multiline plain scalars.
  1079. if line_breaks:
  1080. allow_flow_plain = allow_block_plain = False
  1081. # Flow indicators are forbidden for flow plain scalars.
  1082. if flow_indicators:
  1083. allow_flow_plain = False
  1084. # Block indicators are forbidden for block plain scalars.
  1085. if block_indicators:
  1086. allow_block_plain = False
  1087. return ScalarAnalysis(
  1088. scalar=scalar,
  1089. empty=False,
  1090. multiline=line_breaks,
  1091. allow_flow_plain=allow_flow_plain,
  1092. allow_block_plain=allow_block_plain,
  1093. allow_single_quoted=allow_single_quoted,
  1094. allow_double_quoted=allow_double_quoted,
  1095. allow_block=allow_block,
  1096. )
  1097. # Writers.
  1098. def flush_stream(self) -> None:
  1099. if hasattr(self.stream, 'flush'):
  1100. self.stream.flush()
  1101. def write_stream_start(self) -> None:
  1102. # Write BOM if needed.
  1103. if self.encoding and self.encoding.startswith('utf-16'):
  1104. self.stream.write('\uFEFF'.encode(self.encoding))
  1105. def write_stream_end(self) -> None:
  1106. self.flush_stream()
  1107. def write_indicator(
  1108. self,
  1109. indicator: Any,
  1110. need_whitespace: Any,
  1111. whitespace: bool = False,
  1112. indention: bool = False,
  1113. ) -> None:
  1114. if self.whitespace or not need_whitespace:
  1115. data = indicator
  1116. else:
  1117. data = ' ' + indicator
  1118. self.whitespace = whitespace
  1119. self.indention = self.indention and indention
  1120. self.column += len(data)
  1121. self.open_ended = False
  1122. if bool(self.encoding):
  1123. data = data.encode(self.encoding)
  1124. self.stream.write(data)
  1125. def write_indent(self) -> None:
  1126. indent = self.indent or 0
  1127. if (
  1128. not self.indention
  1129. or self.column > indent
  1130. or (self.column == indent and not self.whitespace)
  1131. ):
  1132. if bool(self.no_newline):
  1133. self.no_newline = False
  1134. else:
  1135. self.write_line_break()
  1136. if self.column < indent:
  1137. self.whitespace = True
  1138. data = ' ' * (indent - self.column)
  1139. self.column = indent
  1140. if self.encoding:
  1141. data = data.encode(self.encoding) # type: ignore
  1142. self.stream.write(data)
  1143. def write_line_break(self, data: Any = None) -> None:
  1144. if data is None:
  1145. data = self.best_line_break
  1146. self.whitespace = True
  1147. self.indention = True
  1148. self.line += 1
  1149. self.column = 0
  1150. if bool(self.encoding):
  1151. data = data.encode(self.encoding)
  1152. self.stream.write(data)
  1153. def write_version_directive(self, version_text: Any) -> None:
  1154. data: Any = f'%YAML {version_text!s}'
  1155. if self.encoding:
  1156. data = data.encode(self.encoding)
  1157. self.stream.write(data)
  1158. self.write_line_break()
  1159. def write_tag_directive(self, handle_text: Any, prefix_text: Any) -> None:
  1160. data: Any = f'%TAG {handle_text!s} {prefix_text!s}'
  1161. if self.encoding:
  1162. data = data.encode(self.encoding)
  1163. self.stream.write(data)
  1164. self.write_line_break()
  1165. # Scalar streams.
  1166. def write_single_quoted(self, text: Any, split: Any = True) -> None:
  1167. if self.root_context:
  1168. if self.requested_indent is not None:
  1169. self.write_line_break()
  1170. if self.requested_indent != 0:
  1171. self.write_indent()
  1172. self.write_indicator("'", True)
  1173. spaces = False
  1174. breaks = False
  1175. start = end = 0
  1176. while end <= len(text):
  1177. ch = None
  1178. if end < len(text):
  1179. ch = text[end]
  1180. if spaces:
  1181. if ch is None or ch != ' ':
  1182. if (
  1183. start + 1 == end
  1184. and self.column > self.best_width
  1185. and split
  1186. and start != 0
  1187. and end != len(text)
  1188. ):
  1189. self.write_indent()
  1190. else:
  1191. data = text[start:end]
  1192. self.column += len(data)
  1193. if bool(self.encoding):
  1194. data = data.encode(self.encoding)
  1195. self.stream.write(data)
  1196. start = end
  1197. elif breaks:
  1198. if ch is None or ch not in '\n\x85\u2028\u2029':
  1199. if text[start] == '\n':
  1200. self.write_line_break()
  1201. for br in text[start:end]:
  1202. if br == '\n':
  1203. self.write_line_break()
  1204. else:
  1205. self.write_line_break(br)
  1206. self.write_indent()
  1207. start = end
  1208. else:
  1209. if ch is None or ch in ' \n\x85\u2028\u2029' or ch == "'":
  1210. if start < end:
  1211. data = text[start:end]
  1212. self.column += len(data)
  1213. if bool(self.encoding):
  1214. data = data.encode(self.encoding)
  1215. self.stream.write(data)
  1216. start = end
  1217. if ch == "'":
  1218. data = "''"
  1219. self.column += 2
  1220. if bool(self.encoding):
  1221. data = data.encode(self.encoding)
  1222. self.stream.write(data)
  1223. start = end + 1
  1224. if ch is not None:
  1225. spaces = ch == ' '
  1226. breaks = ch in '\n\x85\u2028\u2029'
  1227. end += 1
  1228. self.write_indicator("'", False)
  1229. ESCAPE_REPLACEMENTS = {
  1230. '\0': '0',
  1231. '\x07': 'a',
  1232. '\x08': 'b',
  1233. '\x09': 't',
  1234. '\x0A': 'n',
  1235. '\x0B': 'v',
  1236. '\x0C': 'f',
  1237. '\x0D': 'r',
  1238. '\x1B': 'e',
  1239. '"': '"',
  1240. '\\': '\\',
  1241. '\x85': 'N',
  1242. '\xA0': '_',
  1243. '\u2028': 'L',
  1244. '\u2029': 'P',
  1245. }
  1246. def write_double_quoted(self, text: Any, split: Any = True) -> None:
  1247. """
  1248. a newline, as written by self.write_indent(), might need to be escaped with a backslash
  1249. as on reading this will produce a possibly unwanted space.
  1250. """
  1251. if self.root_context:
  1252. if self.requested_indent is not None:
  1253. self.write_line_break()
  1254. if self.requested_indent != 0:
  1255. self.write_indent()
  1256. self.write_indicator('"', True)
  1257. start = end = 0
  1258. while end <= len(text):
  1259. ch = None
  1260. if end < len(text):
  1261. ch = text[end]
  1262. if (
  1263. ch is None
  1264. or ch in '"\\\x85\u2028\u2029\uFEFF'
  1265. or not (
  1266. '\x20' <= ch <= '\x7E'
  1267. or (
  1268. self.allow_unicode
  1269. and (
  1270. ('\xA0' <= ch <= '\uD7FF')
  1271. or ('\uE000' <= ch <= '\uFFFD')
  1272. or ('\U00010000' <= ch <= '\U0010FFFF')
  1273. )
  1274. )
  1275. )
  1276. ):
  1277. if start < end:
  1278. data = text[start:end]
  1279. self.column += len(data)
  1280. if bool(self.encoding):
  1281. data = data.encode(self.encoding)
  1282. self.stream.write(data)
  1283. start = end
  1284. if ch is not None:
  1285. if ch in self.ESCAPE_REPLACEMENTS:
  1286. data = '\\' + self.ESCAPE_REPLACEMENTS[ch]
  1287. elif ch <= '\xFF':
  1288. data = '\\x%02X' % ord(ch)
  1289. elif ch <= '\uFFFF':
  1290. data = '\\u%04X' % ord(ch)
  1291. else:
  1292. data = '\\U%08X' % ord(ch)
  1293. self.column += len(data)
  1294. if bool(self.encoding):
  1295. data = data.encode(self.encoding)
  1296. self.stream.write(data)
  1297. start = end + 1
  1298. if (
  1299. 0 < end < len(text) - 1
  1300. and (ch == ' ' or start >= end)
  1301. and self.column + (end - start) > self.best_width
  1302. and split
  1303. ):
  1304. # SO https://stackoverflow.com/a/75634614/1307905
  1305. # data = text[start:end] + u'\\' # <<< replaced with following lines
  1306. need_backslash = True
  1307. if len(text) > end:
  1308. try:
  1309. space_pos = text.index(' ', end)
  1310. try:
  1311. space_pos = text.index('\n', end, space_pos)
  1312. except (ValueError, IndexError):
  1313. pass
  1314. # nprint('backslash?', space_pos, repr(text[:space_pos]), repr(text[space_pos:]), (text[space_pos] == '\n' and text[space_pos+1] == ' ')) # NOQA
  1315. if (text[space_pos] == '\n' and text[space_pos + 1] != ' '):
  1316. pass
  1317. elif (
  1318. '"' not in text[end:space_pos]
  1319. and "'" not in text[end:space_pos]
  1320. # and text[space_pos + 1] != ' '
  1321. and text[space_pos + 1] not in ' \n'
  1322. and text[end - 1 : end + 1] != ' '
  1323. and start != end
  1324. ):
  1325. need_backslash = False
  1326. except (ValueError, IndexError):
  1327. pass
  1328. data = text[start:end] + ('\\' if need_backslash else '')
  1329. if start < end:
  1330. start = end
  1331. self.column += len(data)
  1332. if bool(self.encoding):
  1333. data = data.encode(self.encoding)
  1334. self.stream.write(data)
  1335. self.write_indent()
  1336. self.whitespace = False
  1337. self.indention = False
  1338. if text[start] == ' ':
  1339. if not need_backslash:
  1340. # remove leading space it will load from the newline
  1341. start += 1
  1342. # data = u'\\' # <<< replaced with following line
  1343. data = '\\' if need_backslash else ''
  1344. self.column += len(data)
  1345. if bool(self.encoding):
  1346. data = data.encode(self.encoding)
  1347. self.stream.write(data)
  1348. end += 1
  1349. self.write_indicator('"', False)
  1350. def determine_block_hints(self, text: Any) -> Any:
  1351. indent = 0
  1352. indicator = ''
  1353. hints = ''
  1354. if text:
  1355. if text[0] in ' \n\x85\u2028\u2029':
  1356. indent = 2
  1357. hints += str(indent)
  1358. elif self.root_context:
  1359. for end in ['\n---', '\n...']:
  1360. pos = 0
  1361. while True:
  1362. pos = text.find(end, pos)
  1363. if pos == -1:
  1364. break
  1365. try:
  1366. if text[pos + 4] in ' \r\n':
  1367. break
  1368. except IndexError:
  1369. pass
  1370. pos += 1
  1371. if pos > -1:
  1372. break
  1373. if pos > 0:
  1374. indent = 2
  1375. if text[-1] not in '\n\x85\u2028\u2029':
  1376. indicator = '-'
  1377. elif len(text) == 1 or text[-2] in '\n\x85\u2028\u2029':
  1378. indicator = '+'
  1379. hints += indicator
  1380. return hints, indent, indicator
  1381. def write_folded(self, text: Any, comment: Any) -> None:
  1382. hints, _indent, _indicator = self.determine_block_hints(text)
  1383. if not isinstance(comment, str):
  1384. comment = ''
  1385. self.write_indicator('>' + hints + comment, True)
  1386. if _indicator == '+':
  1387. self.open_ended = True
  1388. self.write_line_break()
  1389. leading_space = True
  1390. spaces = False
  1391. breaks = True
  1392. start = end = 0
  1393. while end <= len(text):
  1394. ch = None
  1395. if end < len(text):
  1396. ch = text[end]
  1397. if breaks:
  1398. if ch is None or ch not in '\n\x85\u2028\u2029\a':
  1399. if (
  1400. not leading_space
  1401. and ch is not None
  1402. and ch != ' '
  1403. and text[start] == '\n'
  1404. ):
  1405. self.write_line_break()
  1406. leading_space = ch == ' '
  1407. for br in text[start:end]:
  1408. if br == '\n':
  1409. self.write_line_break()
  1410. else:
  1411. self.write_line_break(br)
  1412. if ch is not None:
  1413. self.write_indent()
  1414. start = end
  1415. elif spaces:
  1416. if ch != ' ':
  1417. if start + 1 == end and self.column > self.best_width:
  1418. self.write_indent()
  1419. else:
  1420. data = text[start:end]
  1421. self.column += len(data)
  1422. if bool(self.encoding):
  1423. data = data.encode(self.encoding)
  1424. self.stream.write(data)
  1425. start = end
  1426. else:
  1427. if ch is None or ch in ' \n\x85\u2028\u2029\a':
  1428. data = text[start:end]
  1429. self.column += len(data)
  1430. if bool(self.encoding):
  1431. data = data.encode(self.encoding)
  1432. self.stream.write(data)
  1433. if ch == '\a':
  1434. if end < (len(text) - 1) and not text[end + 2].isspace():
  1435. self.write_line_break()
  1436. self.write_indent()
  1437. end += 2 # \a and the space that is inserted on the fold
  1438. else:
  1439. raise EmitterError('unexcpected fold indicator \\a before space')
  1440. if ch is None:
  1441. self.write_line_break()
  1442. start = end
  1443. if ch is not None:
  1444. breaks = ch in '\n\x85\u2028\u2029'
  1445. spaces = ch == ' '
  1446. end += 1
  1447. def write_literal(self, text: Any, comment: Any = None) -> None:
  1448. hints, _indent, _indicator = self.determine_block_hints(text)
  1449. # if comment is not None:
  1450. # try:
  1451. # hints += comment[1][0]
  1452. # except (TypeError, IndexError) as e:
  1453. # pass
  1454. if not isinstance(comment, str):
  1455. comment = ''
  1456. self.write_indicator('|' + hints + comment, True)
  1457. # try:
  1458. # nprintf('selfev', comment)
  1459. # cmx = comment[1][0]
  1460. # if cmx:
  1461. # self.stream.write(cmx)
  1462. # except (TypeError, IndexError) as e:
  1463. # pass
  1464. if _indicator == '+':
  1465. self.open_ended = True
  1466. self.write_line_break()
  1467. breaks = True
  1468. start = end = 0
  1469. while end <= len(text):
  1470. ch = None
  1471. if end < len(text):
  1472. ch = text[end]
  1473. if breaks:
  1474. if ch is None or ch not in '\n\x85\u2028\u2029':
  1475. for br in text[start:end]:
  1476. if br == '\n':
  1477. self.write_line_break()
  1478. else:
  1479. self.write_line_break(br)
  1480. if ch is not None:
  1481. if self.root_context:
  1482. idnx = self.indent if self.indent is not None else 0
  1483. self.stream.write(' ' * (_indent + idnx))
  1484. else:
  1485. self.write_indent()
  1486. start = end
  1487. else:
  1488. if ch is None or ch in '\n\x85\u2028\u2029':
  1489. data = text[start:end]
  1490. if bool(self.encoding):
  1491. data = data.encode(self.encoding)
  1492. self.stream.write(data)
  1493. if ch is None:
  1494. self.write_line_break()
  1495. start = end
  1496. if ch is not None:
  1497. breaks = ch in '\n\x85\u2028\u2029'
  1498. end += 1
  1499. def write_plain(self, text: Any, split: Any = True) -> None:
  1500. if self.root_context:
  1501. if self.requested_indent is not None:
  1502. self.write_line_break()
  1503. if self.requested_indent != 0:
  1504. self.write_indent()
  1505. else:
  1506. self.open_ended = True
  1507. if not text:
  1508. return
  1509. if not self.whitespace:
  1510. data = ' '
  1511. self.column += len(data)
  1512. if self.encoding:
  1513. data = data.encode(self.encoding) # type: ignore
  1514. self.stream.write(data)
  1515. self.whitespace = False
  1516. self.indention = False
  1517. spaces = False
  1518. breaks = False
  1519. start = end = 0
  1520. while end <= len(text):
  1521. # ToDo: there is an empty space at the end of the wrapped line, if that line
  1522. # does not exceed self.best_width, that space is superfluous if wrapping is on
  1523. ch = None
  1524. if end < len(text):
  1525. ch = text[end]
  1526. if spaces:
  1527. if ch != ' ':
  1528. if start + 1 == end and self.column >= self.best_width and split:
  1529. self.write_indent()
  1530. self.whitespace = False
  1531. self.indention = False
  1532. else:
  1533. data = text[start:end]
  1534. self.column += len(data)
  1535. if self.encoding:
  1536. data = data.encode(self.encoding) # type: ignore
  1537. self.stream.write(data)
  1538. start = end
  1539. elif breaks:
  1540. if ch not in '\n\x85\u2028\u2029': # type: ignore
  1541. if text[start] == '\n':
  1542. self.write_line_break()
  1543. for br in text[start:end]:
  1544. if br == '\n':
  1545. self.write_line_break()
  1546. else:
  1547. self.write_line_break(br)
  1548. self.write_indent()
  1549. self.whitespace = False
  1550. self.indention = False
  1551. start = end
  1552. else:
  1553. if ch is None or ch in ' \n\x85\u2028\u2029':
  1554. data = text[start:end]
  1555. if (
  1556. (len(data) + self.column) > self.best_width
  1557. and self.indent is not None
  1558. and self.column > self.indent
  1559. ):
  1560. # words longer than line length get a line of their own
  1561. self.write_indent()
  1562. self.column += len(data)
  1563. if self.encoding:
  1564. data = data.encode(self.encoding) # type: ignore
  1565. try:
  1566. self.stream.write(data)
  1567. except: # NOQA
  1568. sys.stdout.write(repr(data) + '\n')
  1569. raise
  1570. start = end
  1571. if ch is not None:
  1572. spaces = ch == ' '
  1573. breaks = ch in '\n\x85\u2028\u2029'
  1574. end += 1
  1575. def write_comment(self, comment: Any, pre: bool = False) -> None:
  1576. value = comment.value
  1577. # nprintf(f'{self.column:02d} {comment.start_mark.column:02d} {value!r}')
  1578. if not pre and value[-1] == '\n':
  1579. value = value[:-1]
  1580. try:
  1581. # get original column position
  1582. col = comment.start_mark.column
  1583. if comment.value and comment.value.startswith('\n'):
  1584. # never inject extra spaces if the comment starts with a newline
  1585. # and not a real comment (e.g. if you have an empty line following a key-value
  1586. col = self.column
  1587. elif col < self.column + 1:
  1588. ValueError
  1589. except ValueError:
  1590. col = self.column + 1
  1591. # nprint('post_comment', self.line, self.column, value)
  1592. try:
  1593. # at least one space if the current column >= the start column of the comment
  1594. # but not at the start of a line
  1595. nr_spaces = col - self.column
  1596. if self.column and value.strip() and nr_spaces < 1 and value[0] != '\n':
  1597. nr_spaces = 1
  1598. value = ' ' * nr_spaces + value
  1599. try:
  1600. if bool(self.encoding):
  1601. value = value.encode(self.encoding)
  1602. except UnicodeDecodeError:
  1603. pass
  1604. self.stream.write(value)
  1605. except TypeError:
  1606. raise
  1607. if not pre:
  1608. self.write_line_break()
  1609. def write_pre_comment(self, event: Any) -> bool:
  1610. if event.comment is None:
  1611. return False
  1612. comments = event.comment[1]
  1613. if comments is None:
  1614. return False
  1615. try:
  1616. start_events = (MappingStartEvent, SequenceStartEvent)
  1617. for comment in comments:
  1618. if isinstance(event, start_events) and getattr(comment, 'pre_done', None):
  1619. continue
  1620. if self.column != 0:
  1621. self.write_line_break()
  1622. self.write_comment(comment, pre=True)
  1623. if isinstance(event, start_events):
  1624. comment.pre_done = True
  1625. except TypeError:
  1626. sys.stdout.write(f'eventtt {type(event)} {event}')
  1627. raise
  1628. return True
  1629. def write_post_comment(self, event: Any) -> bool:
  1630. if self.event.comment[0] is None:
  1631. return False
  1632. comment = event.comment[0]
  1633. self.write_comment(comment)
  1634. return True
  1635. class RoundTripEmitter(Emitter):
  1636. def prepare_tag(self, ctag: Any) -> Any:
  1637. if not ctag:
  1638. raise EmitterError('tag must not be empty')
  1639. tag = str(ctag)
  1640. if tag == '!' or tag == '!!':
  1641. return tag
  1642. handle = ctag.handle
  1643. suffix = ctag.suffix
  1644. prefixes = sorted(self.tag_prefixes.keys())
  1645. # print('handling', repr(tag), repr(suffix), repr(handle))
  1646. if handle is None:
  1647. for prefix in prefixes:
  1648. if tag.startswith(prefix) and (prefix == '!' or len(prefix) < len(tag)):
  1649. handle = self.tag_prefixes[prefix]
  1650. suffix = suffix[len(prefix) :]
  1651. if handle:
  1652. return f'{handle!s}{suffix!s}'
  1653. else:
  1654. return f'!<{suffix!s}>'