You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

278 lines
8.2 KiB

  1. cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
  2. cdef int err
  3. handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t))
  4. if handle._handle is NULL:
  5. handle._abort_init()
  6. raise MemoryError()
  7. # Initialize pipe handle with ipc=0.
  8. # ipc=1 means that libuv will use recvmsg/sendmsg
  9. # instead of recv/send.
  10. err = uv.uv_pipe_init(handle._loop.uvloop,
  11. <uv.uv_pipe_t*>handle._handle,
  12. 0)
  13. if err < 0:
  14. handle._abort_init()
  15. raise convert_error(err)
  16. handle._finish_init()
  17. cdef __pipe_open(UVStream handle, int fd):
  18. cdef int err
  19. err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle,
  20. <uv.uv_file>fd)
  21. if err < 0:
  22. exc = convert_error(err)
  23. raise exc
  24. cdef __pipe_get_socket(UVSocketHandle handle):
  25. fileno = handle._fileno()
  26. return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno)
  27. @cython.no_gc_clear
  28. cdef class UnixServer(UVStreamServer):
  29. @staticmethod
  30. cdef UnixServer new(Loop loop, object protocol_factory, Server server,
  31. object backlog,
  32. object ssl,
  33. object ssl_handshake_timeout,
  34. object ssl_shutdown_timeout):
  35. cdef UnixServer handle
  36. handle = UnixServer.__new__(UnixServer)
  37. handle._init(loop, protocol_factory, server, backlog,
  38. ssl, ssl_handshake_timeout, ssl_shutdown_timeout)
  39. __pipe_init_uv_handle(<UVStream>handle, loop)
  40. return handle
  41. cdef _new_socket(self):
  42. return __pipe_get_socket(<UVSocketHandle>self)
  43. cdef _open(self, int sockfd):
  44. self._ensure_alive()
  45. __pipe_open(<UVStream>self, sockfd)
  46. self._mark_as_open()
  47. cdef bind(self, str path):
  48. cdef int err
  49. self._ensure_alive()
  50. err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle,
  51. path.encode())
  52. if err < 0:
  53. exc = convert_error(err)
  54. self._fatal_error(exc, True)
  55. return
  56. self._mark_as_open()
  57. cdef UVStream _make_new_transport(self, object protocol, object waiter):
  58. cdef UnixTransport tr
  59. tr = UnixTransport.new(self._loop, protocol, self._server, waiter)
  60. return <UVStream>tr
  61. @cython.no_gc_clear
  62. cdef class UnixTransport(UVStream):
  63. @staticmethod
  64. cdef UnixTransport new(Loop loop, object protocol, Server server,
  65. object waiter):
  66. cdef UnixTransport handle
  67. handle = UnixTransport.__new__(UnixTransport)
  68. handle._init(loop, protocol, server, waiter)
  69. __pipe_init_uv_handle(<UVStream>handle, loop)
  70. return handle
  71. cdef _new_socket(self):
  72. return __pipe_get_socket(<UVSocketHandle>self)
  73. cdef _open(self, int sockfd):
  74. __pipe_open(<UVStream>self, sockfd)
  75. cdef connect(self, char* addr):
  76. cdef _PipeConnectRequest req
  77. req = _PipeConnectRequest(self._loop, self)
  78. req.connect(addr)
  79. @cython.no_gc_clear
  80. cdef class ReadUnixTransport(UVStream):
  81. @staticmethod
  82. cdef ReadUnixTransport new(Loop loop, object protocol, Server server,
  83. object waiter):
  84. cdef ReadUnixTransport handle
  85. handle = ReadUnixTransport.__new__(ReadUnixTransport)
  86. handle._init(loop, protocol, server, waiter)
  87. __pipe_init_uv_handle(<UVStream>handle, loop)
  88. return handle
  89. cdef _new_socket(self):
  90. return __pipe_get_socket(<UVSocketHandle>self)
  91. cdef _open(self, int sockfd):
  92. __pipe_open(<UVStream>self, sockfd)
  93. def get_write_buffer_limits(self):
  94. raise NotImplementedError
  95. def set_write_buffer_limits(self, high=None, low=None):
  96. raise NotImplementedError
  97. def get_write_buffer_size(self):
  98. raise NotImplementedError
  99. def write(self, data):
  100. raise NotImplementedError
  101. def writelines(self, list_of_data):
  102. raise NotImplementedError
  103. def write_eof(self):
  104. raise NotImplementedError
  105. def can_write_eof(self):
  106. raise NotImplementedError
  107. def abort(self):
  108. raise NotImplementedError
  109. @cython.no_gc_clear
  110. cdef class WriteUnixTransport(UVStream):
  111. def __cinit__(self):
  112. self.disconnect_listener_inited = False
  113. self.disconnect_listener.data = NULL
  114. @staticmethod
  115. cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
  116. object waiter):
  117. cdef WriteUnixTransport handle
  118. handle = WriteUnixTransport.__new__(WriteUnixTransport)
  119. # We listen for read events on write-end of the pipe. When
  120. # the read-end is close, the uv_stream_t.read callback will
  121. # receive an error -- we want to silence that error, and just
  122. # close the transport.
  123. handle._close_on_read_error()
  124. handle._init(loop, protocol, server, waiter)
  125. __pipe_init_uv_handle(<UVStream>handle, loop)
  126. return handle
  127. cdef _start_reading(self):
  128. # A custom implementation for monitoring for EOF:
  129. # libuv since v1.23.1 prohibits using uv_read_start on
  130. # write-only FDs, so we use a throw-away uv_poll_t handle
  131. # for that purpose, as suggested in
  132. # https://github.com/libuv/libuv/issues/2058.
  133. cdef int err
  134. if not self.disconnect_listener_inited:
  135. err = uv.uv_poll_init(self._loop.uvloop,
  136. &self.disconnect_listener,
  137. self._fileno())
  138. if err < 0:
  139. raise convert_error(err)
  140. self.disconnect_listener.data = <void*>self
  141. self.disconnect_listener_inited = True
  142. err = uv.uv_poll_start(&self.disconnect_listener,
  143. uv.UV_READABLE | uv.UV_DISCONNECT,
  144. __on_write_pipe_poll_event)
  145. if err < 0:
  146. raise convert_error(err)
  147. cdef _stop_reading(self):
  148. cdef int err
  149. if not self.disconnect_listener_inited:
  150. return
  151. err = uv.uv_poll_stop(&self.disconnect_listener)
  152. if err < 0:
  153. raise convert_error(err)
  154. cdef _close(self):
  155. if self.disconnect_listener_inited:
  156. self.disconnect_listener.data = NULL
  157. uv.uv_close(<uv.uv_handle_t *>(&self.disconnect_listener), NULL)
  158. self.disconnect_listener_inited = False
  159. UVStream._close(self)
  160. cdef _new_socket(self):
  161. return __pipe_get_socket(<UVSocketHandle>self)
  162. cdef _open(self, int sockfd):
  163. __pipe_open(<UVStream>self, sockfd)
  164. def pause_reading(self):
  165. raise NotImplementedError
  166. def resume_reading(self):
  167. raise NotImplementedError
  168. cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle,
  169. int status, int events) with gil:
  170. cdef WriteUnixTransport tr
  171. if handle.data is NULL:
  172. return
  173. tr = <WriteUnixTransport>handle.data
  174. if tr._closed:
  175. return
  176. if events & uv.UV_DISCONNECT:
  177. try:
  178. tr._stop_reading()
  179. tr._on_eof()
  180. except BaseException as ex:
  181. tr._fatal_error(ex, False)
  182. cdef class _PipeConnectRequest(UVRequest):
  183. cdef:
  184. UnixTransport transport
  185. uv.uv_connect_t _req_data
  186. def __cinit__(self, loop, transport):
  187. self.request = <uv.uv_req_t*> &self._req_data
  188. self.request.data = <void*>self
  189. self.transport = transport
  190. cdef connect(self, char* addr):
  191. # uv_pipe_connect returns void
  192. uv.uv_pipe_connect(<uv.uv_connect_t*>self.request,
  193. <uv.uv_pipe_t*>self.transport._handle,
  194. addr,
  195. __pipe_connect_callback)
  196. cdef void __pipe_connect_callback(uv.uv_connect_t* req, int status) with gil:
  197. cdef:
  198. _PipeConnectRequest wrapper
  199. UnixTransport transport
  200. wrapper = <_PipeConnectRequest> req.data
  201. transport = wrapper.transport
  202. if status < 0:
  203. exc = convert_error(status)
  204. else:
  205. exc = None
  206. try:
  207. transport._on_connect(exc)
  208. except BaseException as ex:
  209. wrapper.transport._fatal_error(ex, False)
  210. finally:
  211. wrapper.on_done()