You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

131 rivejä
4.8 KiB

  1. import codecs
  2. from dataclasses import InitVar, dataclass, field
  3. from typing import Any, Callable, Mapping, Tuple
  4. from ..abc import (
  5. AnyByteReceiveStream, AnyByteSendStream, AnyByteStream, ObjectReceiveStream, ObjectSendStream,
  6. ObjectStream)
  7. @dataclass(eq=False)
  8. class TextReceiveStream(ObjectReceiveStream[str]):
  9. """
  10. Stream wrapper that decodes bytes to strings using the given encoding.
  11. Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any completely
  12. received unicode characters as soon as they come in.
  13. :param transport_stream: any bytes-based receive stream
  14. :param encoding: character encoding to use for decoding bytes to strings (defaults to
  15. ``utf-8``)
  16. :param errors: handling scheme for decoding errors (defaults to ``strict``; see the
  17. `codecs module documentation`_ for a comprehensive list of options)
  18. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  19. """
  20. transport_stream: AnyByteReceiveStream
  21. encoding: InitVar[str] = 'utf-8'
  22. errors: InitVar[str] = 'strict'
  23. _decoder: codecs.IncrementalDecoder = field(init=False)
  24. def __post_init__(self, encoding: str, errors: str) -> None:
  25. decoder_class = codecs.getincrementaldecoder(encoding)
  26. self._decoder = decoder_class(errors=errors)
  27. async def receive(self) -> str:
  28. while True:
  29. chunk = await self.transport_stream.receive()
  30. decoded = self._decoder.decode(chunk)
  31. if decoded:
  32. return decoded
  33. async def aclose(self) -> None:
  34. await self.transport_stream.aclose()
  35. self._decoder.reset()
  36. @property
  37. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  38. return self.transport_stream.extra_attributes
  39. @dataclass(eq=False)
  40. class TextSendStream(ObjectSendStream[str]):
  41. """
  42. Sends strings to the wrapped stream as bytes using the given encoding.
  43. :param AnyByteSendStream transport_stream: any bytes-based send stream
  44. :param str encoding: character encoding to use for encoding strings to bytes (defaults to
  45. ``utf-8``)
  46. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
  47. `codecs module documentation`_ for a comprehensive list of options)
  48. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  49. """
  50. transport_stream: AnyByteSendStream
  51. encoding: InitVar[str] = 'utf-8'
  52. errors: str = 'strict'
  53. _encoder: Callable[..., Tuple[bytes, int]] = field(init=False)
  54. def __post_init__(self, encoding: str) -> None:
  55. self._encoder = codecs.getencoder(encoding)
  56. async def send(self, item: str) -> None:
  57. encoded = self._encoder(item, self.errors)[0]
  58. await self.transport_stream.send(encoded)
  59. async def aclose(self) -> None:
  60. await self.transport_stream.aclose()
  61. @property
  62. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  63. return self.transport_stream.extra_attributes
  64. @dataclass(eq=False)
  65. class TextStream(ObjectStream[str]):
  66. """
  67. A bidirectional stream that decodes bytes to strings on receive and encodes strings to bytes on
  68. send.
  69. Extra attributes will be provided from both streams, with the receive stream providing the
  70. values in case of a conflict.
  71. :param AnyByteStream transport_stream: any bytes-based stream
  72. :param str encoding: character encoding to use for encoding/decoding strings to/from bytes
  73. (defaults to ``utf-8``)
  74. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
  75. `codecs module documentation`_ for a comprehensive list of options)
  76. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  77. """
  78. transport_stream: AnyByteStream
  79. encoding: InitVar[str] = 'utf-8'
  80. errors: InitVar[str] = 'strict'
  81. _receive_stream: TextReceiveStream = field(init=False)
  82. _send_stream: TextSendStream = field(init=False)
  83. def __post_init__(self, encoding: str, errors: str) -> None:
  84. self._receive_stream = TextReceiveStream(self.transport_stream, encoding=encoding,
  85. errors=errors)
  86. self._send_stream = TextSendStream(self.transport_stream, encoding=encoding, errors=errors)
  87. async def receive(self) -> str:
  88. return await self._receive_stream.receive()
  89. async def send(self, item: str) -> None:
  90. await self._send_stream.send(item)
  91. async def send_eof(self) -> None:
  92. await self.transport_stream.send_eof()
  93. async def aclose(self) -> None:
  94. await self._send_stream.aclose()
  95. await self._receive_stream.aclose()
  96. @property
  97. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  98. return {**self._send_stream.extra_attributes, **self._receive_stream.extra_attributes}