You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

157 line
5.5 KiB

  1. import socket
  2. from abc import abstractmethod
  3. from io import IOBase
  4. from ipaddress import IPv4Address, IPv6Address
  5. from socket import AddressFamily
  6. from types import TracebackType
  7. from typing import (
  8. Any, AsyncContextManager, Callable, Collection, Dict, List, Mapping, Optional, Tuple, Type,
  9. TypeVar, Union)
  10. from .._core._typedattr import TypedAttributeProvider, TypedAttributeSet, typed_attribute
  11. from ._streams import ByteStream, Listener, T_Stream, UnreliableObjectStream
  12. from ._tasks import TaskGroup
  13. IPAddressType = Union[str, IPv4Address, IPv6Address]
  14. IPSockAddrType = Tuple[str, int]
  15. SockAddrType = Union[IPSockAddrType, str]
  16. UDPPacketType = Tuple[bytes, IPSockAddrType]
  17. T_Retval = TypeVar('T_Retval')
  18. class _NullAsyncContextManager:
  19. async def __aenter__(self) -> None:
  20. pass
  21. async def __aexit__(self, exc_type: Optional[Type[BaseException]],
  22. exc_val: Optional[BaseException],
  23. exc_tb: Optional[TracebackType]) -> Optional[bool]:
  24. return None
  25. class SocketAttribute(TypedAttributeSet):
  26. #: the address family of the underlying socket
  27. family: AddressFamily = typed_attribute()
  28. #: the local socket address of the underlying socket
  29. local_address: SockAddrType = typed_attribute()
  30. #: for IP addresses, the local port the underlying socket is bound to
  31. local_port: int = typed_attribute()
  32. #: the underlying stdlib socket object
  33. raw_socket: socket.socket = typed_attribute()
  34. #: the remote address the underlying socket is connected to
  35. remote_address: SockAddrType = typed_attribute()
  36. #: for IP addresses, the remote port the underlying socket is connected to
  37. remote_port: int = typed_attribute()
  38. class _SocketProvider(TypedAttributeProvider):
  39. @property
  40. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  41. from .._core._sockets import convert_ipv6_sockaddr as convert
  42. attributes: Dict[Any, Callable[[], Any]] = {
  43. SocketAttribute.family: lambda: self._raw_socket.family,
  44. SocketAttribute.local_address: lambda: convert(self._raw_socket.getsockname()),
  45. SocketAttribute.raw_socket: lambda: self._raw_socket
  46. }
  47. try:
  48. peername: Optional[Tuple[str, int]] = convert(self._raw_socket.getpeername())
  49. except OSError:
  50. peername = None
  51. # Provide the remote address for connected sockets
  52. if peername is not None:
  53. attributes[SocketAttribute.remote_address] = lambda: peername
  54. # Provide local and remote ports for IP based sockets
  55. if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
  56. attributes[SocketAttribute.local_port] = lambda: self._raw_socket.getsockname()[1]
  57. if peername is not None:
  58. remote_port = peername[1]
  59. attributes[SocketAttribute.remote_port] = lambda: remote_port
  60. return attributes
  61. @property
  62. @abstractmethod
  63. def _raw_socket(self) -> socket.socket:
  64. pass
  65. class SocketStream(ByteStream, _SocketProvider):
  66. """
  67. Transports bytes over a socket.
  68. Supports all relevant extra attributes from :class:`~SocketAttribute`.
  69. """
  70. class UNIXSocketStream(SocketStream):
  71. @abstractmethod
  72. async def send_fds(self, message: bytes, fds: Collection[Union[int, IOBase]]) -> None:
  73. """
  74. Send file descriptors along with a message to the peer.
  75. :param message: a non-empty bytestring
  76. :param fds: a collection of files (either numeric file descriptors or open file or socket
  77. objects)
  78. """
  79. @abstractmethod
  80. async def receive_fds(self, msglen: int, maxfds: int) -> Tuple[bytes, List[int]]:
  81. """
  82. Receive file descriptors along with a message from the peer.
  83. :param msglen: length of the message to expect from the peer
  84. :param maxfds: maximum number of file descriptors to expect from the peer
  85. :return: a tuple of (message, file descriptors)
  86. """
  87. class SocketListener(Listener[SocketStream], _SocketProvider):
  88. """
  89. Listens to incoming socket connections.
  90. Supports all relevant extra attributes from :class:`~SocketAttribute`.
  91. """
  92. @abstractmethod
  93. async def accept(self) -> SocketStream:
  94. """Accept an incoming connection."""
  95. async def serve(self, handler: Callable[[T_Stream], Any],
  96. task_group: Optional[TaskGroup] = None) -> None:
  97. from .. import create_task_group
  98. context_manager: AsyncContextManager
  99. if task_group is None:
  100. task_group = context_manager = create_task_group()
  101. else:
  102. # Can be replaced with AsyncExitStack once on py3.7+
  103. context_manager = _NullAsyncContextManager()
  104. async with context_manager:
  105. while True:
  106. stream = await self.accept()
  107. task_group.start_soon(handler, stream)
  108. class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
  109. """
  110. Represents an unconnected UDP socket.
  111. Supports all relevant extra attributes from :class:`~SocketAttribute`.
  112. """
  113. async def sendto(self, data: bytes, host: str, port: int) -> None:
  114. """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port)))."""
  115. return await self.send((data, (host, port)))
  116. class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
  117. """
  118. Represents an connected UDP socket.
  119. Supports all relevant extra attributes from :class:`~SocketAttribute`.
  120. """