您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

1020 行
31 KiB

  1. cdef extern from *:
  2. '''
  3. enum {__PREALLOCED_BUFS = 4};
  4. '''
  5. const bint __PREALLOCED_BUFS
  6. @cython.no_gc_clear
  7. @cython.freelist(DEFAULT_FREELIST_SIZE)
  8. cdef class _StreamWriteContext:
  9. # used to hold additional write request information for uv_write
  10. cdef:
  11. uv.uv_write_t req
  12. list buffers
  13. uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS]
  14. Py_buffer py_bufs_sml[__PREALLOCED_BUFS]
  15. bint py_bufs_sml_inuse
  16. uv.uv_buf_t* uv_bufs
  17. Py_buffer* py_bufs
  18. size_t py_bufs_len
  19. uv.uv_buf_t* uv_bufs_start
  20. size_t uv_bufs_len
  21. UVStream stream
  22. bint closed
  23. cdef free_bufs(self):
  24. cdef size_t i
  25. if self.uv_bufs is not NULL:
  26. PyMem_RawFree(self.uv_bufs)
  27. self.uv_bufs = NULL
  28. if UVLOOP_DEBUG:
  29. if self.py_bufs_sml_inuse:
  30. raise RuntimeError(
  31. '_StreamWriteContext.close: uv_bufs != NULL and '
  32. 'py_bufs_sml_inuse is True')
  33. if self.py_bufs is not NULL:
  34. for i from 0 <= i < self.py_bufs_len:
  35. PyBuffer_Release(&self.py_bufs[i])
  36. PyMem_RawFree(self.py_bufs)
  37. self.py_bufs = NULL
  38. if UVLOOP_DEBUG:
  39. if self.py_bufs_sml_inuse:
  40. raise RuntimeError(
  41. '_StreamWriteContext.close: py_bufs != NULL and '
  42. 'py_bufs_sml_inuse is True')
  43. if self.py_bufs_sml_inuse:
  44. for i from 0 <= i < self.py_bufs_len:
  45. PyBuffer_Release(&self.py_bufs_sml[i])
  46. self.py_bufs_sml_inuse = 0
  47. self.py_bufs_len = 0
  48. self.buffers = None
  49. cdef close(self):
  50. if self.closed:
  51. return
  52. self.closed = 1
  53. self.free_bufs()
  54. Py_DECREF(self)
  55. cdef advance_uv_buf(self, size_t sent):
  56. # Advance the pointer to first uv_buf and the
  57. # pointer to first byte in that buffer.
  58. #
  59. # We do this after a "uv_try_write" call, which
  60. # sometimes sends only a portion of data.
  61. # We then call "advance_uv_buf" on the write
  62. # context, and reuse it in a "uv_write" call.
  63. cdef:
  64. uv.uv_buf_t* buf
  65. size_t idx
  66. for idx from 0 <= idx < self.uv_bufs_len:
  67. buf = &self.uv_bufs_start[idx]
  68. if buf.len > sent:
  69. buf.len -= sent
  70. buf.base = buf.base + sent
  71. self.uv_bufs_start = buf
  72. self.uv_bufs_len -= idx
  73. return
  74. else:
  75. sent -= self.uv_bufs_start[idx].len
  76. if UVLOOP_DEBUG:
  77. if sent < 0:
  78. raise RuntimeError('fatal: sent < 0 in advance_uv_buf')
  79. raise RuntimeError('fatal: Could not advance _StreamWriteContext')
  80. @staticmethod
  81. cdef _StreamWriteContext new(UVStream stream, list buffers):
  82. cdef:
  83. _StreamWriteContext ctx
  84. int uv_bufs_idx = 0
  85. size_t py_bufs_len = 0
  86. int i
  87. Py_buffer* p_pybufs
  88. uv.uv_buf_t* p_uvbufs
  89. ctx = _StreamWriteContext.__new__(_StreamWriteContext)
  90. ctx.stream = None
  91. ctx.closed = 1
  92. ctx.py_bufs_len = 0
  93. ctx.py_bufs_sml_inuse = 0
  94. ctx.uv_bufs = NULL
  95. ctx.py_bufs = NULL
  96. ctx.buffers = buffers
  97. ctx.stream = stream
  98. if len(buffers) <= __PREALLOCED_BUFS:
  99. # We've got a small number of buffers to write, don't
  100. # need to use malloc.
  101. ctx.py_bufs_sml_inuse = 1
  102. p_pybufs = <Py_buffer*>&ctx.py_bufs_sml
  103. p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml
  104. else:
  105. for buf in buffers:
  106. if UVLOOP_DEBUG:
  107. if not isinstance(buf, (bytes, bytearray, memoryview)):
  108. raise RuntimeError(
  109. 'invalid data in writebuf: an instance of '
  110. 'bytes, bytearray or memoryview was expected, '
  111. 'got {}'.format(type(buf)))
  112. if not PyBytes_CheckExact(buf):
  113. py_bufs_len += 1
  114. if py_bufs_len > 0:
  115. ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc(
  116. py_bufs_len * sizeof(Py_buffer))
  117. if ctx.py_bufs is NULL:
  118. raise MemoryError()
  119. ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc(
  120. len(buffers) * sizeof(uv.uv_buf_t))
  121. if ctx.uv_bufs is NULL:
  122. raise MemoryError()
  123. p_pybufs = ctx.py_bufs
  124. p_uvbufs = ctx.uv_bufs
  125. py_bufs_len = 0
  126. for buf in buffers:
  127. if PyBytes_CheckExact(buf):
  128. # We can only use this hack for bytes since it's
  129. # immutable. For everything else it is only safe to
  130. # use buffer protocol.
  131. p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf)
  132. p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf)
  133. else:
  134. try:
  135. PyObject_GetBuffer(
  136. buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
  137. except Exception:
  138. # This shouldn't ever happen, as `UVStream._buffer_write`
  139. # casts non-bytes objects to `memoryviews`.
  140. ctx.py_bufs_len = py_bufs_len
  141. ctx.free_bufs()
  142. raise
  143. p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf
  144. p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len
  145. py_bufs_len += 1
  146. uv_bufs_idx += 1
  147. ctx.uv_bufs_start = p_uvbufs
  148. ctx.uv_bufs_len = uv_bufs_idx
  149. ctx.py_bufs_len = py_bufs_len
  150. ctx.req.data = <void*> ctx
  151. if UVLOOP_DEBUG:
  152. stream._loop._debug_stream_write_ctx_total += 1
  153. stream._loop._debug_stream_write_ctx_cnt += 1
  154. # Do incref after everything else is done.
  155. # Under no circumstances we want `ctx` to be GCed while
  156. # libuv is still working with `ctx.uv_bufs`.
  157. Py_INCREF(ctx)
  158. ctx.closed = 0
  159. return ctx
  160. def __dealloc__(self):
  161. if not self.closed:
  162. # Because we do an INCREF in _StreamWriteContext.new,
  163. # __dealloc__ shouldn't ever happen with `self.closed == 1`
  164. raise RuntimeError(
  165. 'open _StreamWriteContext is being deallocated')
  166. if UVLOOP_DEBUG:
  167. if self.stream is not None:
  168. self.stream._loop._debug_stream_write_ctx_cnt -= 1
  169. self.stream = None
  170. @cython.no_gc_clear
  171. cdef class UVStream(UVBaseTransport):
  172. def __cinit__(self):
  173. self.__shutting_down = 0
  174. self.__reading = 0
  175. self.__read_error_close = 0
  176. self.__buffered = 0
  177. self._eof = 0
  178. self._buffer = []
  179. self._buffer_size = 0
  180. self._protocol_get_buffer = None
  181. self._protocol_buffer_updated = None
  182. self._read_pybuf_acquired = False
  183. cdef _set_protocol(self, object protocol):
  184. if protocol is None:
  185. raise TypeError('protocol is required')
  186. UVBaseTransport._set_protocol(self, protocol)
  187. if (hasattr(protocol, 'get_buffer') and
  188. not isinstance(protocol, aio_Protocol)):
  189. try:
  190. self._protocol_get_buffer = protocol.get_buffer
  191. self._protocol_buffer_updated = protocol.buffer_updated
  192. self.__buffered = 1
  193. except AttributeError:
  194. pass
  195. else:
  196. self.__buffered = 0
  197. cdef _clear_protocol(self):
  198. UVBaseTransport._clear_protocol(self)
  199. self._protocol_get_buffer = None
  200. self._protocol_buffer_updated = None
  201. self.__buffered = 0
  202. cdef inline _shutdown(self):
  203. cdef int err
  204. if self.__shutting_down:
  205. return
  206. self.__shutting_down = 1
  207. self._ensure_alive()
  208. self._shutdown_req.data = <void*> self
  209. err = uv.uv_shutdown(&self._shutdown_req,
  210. <uv.uv_stream_t*> self._handle,
  211. __uv_stream_on_shutdown)
  212. if err < 0:
  213. exc = convert_error(err)
  214. self._fatal_error(exc, True)
  215. return
  216. cdef inline _accept(self, UVStream server):
  217. cdef int err
  218. self._ensure_alive()
  219. err = uv.uv_accept(<uv.uv_stream_t*>server._handle,
  220. <uv.uv_stream_t*>self._handle)
  221. if err < 0:
  222. exc = convert_error(err)
  223. self._fatal_error(exc, True)
  224. return
  225. self._on_accept()
  226. cdef inline _close_on_read_error(self):
  227. self.__read_error_close = 1
  228. cdef bint _is_reading(self):
  229. return self.__reading
  230. cdef _start_reading(self):
  231. cdef int err
  232. if self._closing:
  233. return
  234. self._ensure_alive()
  235. if self.__reading:
  236. return
  237. if self.__buffered:
  238. err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
  239. __uv_stream_buffered_alloc,
  240. __uv_stream_buffered_on_read)
  241. else:
  242. err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
  243. __loop_alloc_buffer,
  244. __uv_stream_on_read)
  245. if err < 0:
  246. exc = convert_error(err)
  247. self._fatal_error(exc, True)
  248. return
  249. else:
  250. # UVStream must live until the read callback is called
  251. self.__reading_started()
  252. cdef inline __reading_started(self):
  253. if self.__reading:
  254. return
  255. self.__reading = 1
  256. Py_INCREF(self)
  257. cdef inline __reading_stopped(self):
  258. if not self.__reading:
  259. return
  260. self.__reading = 0
  261. Py_DECREF(self)
  262. cdef _stop_reading(self):
  263. cdef int err
  264. if not self.__reading:
  265. return
  266. self._ensure_alive()
  267. # From libuv docs:
  268. # This function is idempotent and may be safely
  269. # called on a stopped stream.
  270. err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle)
  271. if err < 0:
  272. exc = convert_error(err)
  273. self._fatal_error(exc, True)
  274. return
  275. else:
  276. self.__reading_stopped()
  277. cdef inline _try_write(self, object data):
  278. cdef:
  279. ssize_t written
  280. bint used_buf = 0
  281. Py_buffer py_buf
  282. void* buf
  283. size_t blen
  284. int saved_errno
  285. int fd
  286. if (<uv.uv_stream_t*>self._handle).write_queue_size != 0:
  287. raise RuntimeError(
  288. 'UVStream._try_write called with data in uv buffers')
  289. if PyBytes_CheckExact(data):
  290. # We can only use this hack for bytes since it's
  291. # immutable. For everything else it is only safe to
  292. # use buffer protocol.
  293. buf = <void*>PyBytes_AS_STRING(data)
  294. blen = Py_SIZE(data)
  295. else:
  296. PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE)
  297. used_buf = 1
  298. buf = py_buf.buf
  299. blen = py_buf.len
  300. if blen == 0:
  301. # Empty data, do nothing.
  302. return 0
  303. fd = self._fileno()
  304. # Use `unistd.h/write` directly, it's faster than
  305. # uv_try_write -- less layers of code. The error
  306. # checking logic is copied from libuv.
  307. written = system.write(fd, buf, blen)
  308. while written == -1 and (
  309. errno.errno == errno.EINTR or
  310. (system.PLATFORM_IS_APPLE and
  311. errno.errno == errno.EPROTOTYPE)):
  312. # From libuv code (unix/stream.c):
  313. # Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
  314. # EPROTOTYPE can be returned while trying to write to a socket
  315. # that is shutting down. If we retry the write, we should get
  316. # the expected EPIPE instead.
  317. written = system.write(fd, buf, blen)
  318. saved_errno = errno.errno
  319. if used_buf:
  320. PyBuffer_Release(&py_buf)
  321. if written < 0:
  322. if saved_errno == errno.EAGAIN or \
  323. saved_errno == system.EWOULDBLOCK:
  324. return -1
  325. else:
  326. exc = convert_error(-saved_errno)
  327. self._fatal_error(exc, True)
  328. return
  329. if UVLOOP_DEBUG:
  330. self._loop._debug_stream_write_tries += 1
  331. if <size_t>written == blen:
  332. return 0
  333. return written
  334. cdef inline _buffer_write(self, object data):
  335. cdef int dlen
  336. if not PyBytes_CheckExact(data):
  337. data = memoryview(data).cast('b')
  338. dlen = len(data)
  339. if not dlen:
  340. return
  341. self._buffer_size += dlen
  342. self._buffer.append(data)
  343. cdef inline _initiate_write(self):
  344. if (not self._protocol_paused and
  345. (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
  346. self._buffer_size > self._high_water):
  347. # Fast-path. If:
  348. # - the protocol isn't yet paused,
  349. # - there is no data in libuv buffers for this stream,
  350. # - the protocol will be paused if we continue to buffer data
  351. #
  352. # Then:
  353. # - Try to write all buffered data right now.
  354. all_sent = self._exec_write()
  355. if UVLOOP_DEBUG:
  356. if self._buffer_size != 0 or self._buffer != []:
  357. raise RuntimeError(
  358. '_buffer_size is not 0 after a successful _exec_write')
  359. # There is no need to call `_queue_write` anymore,
  360. # as `uv_write` should be called already.
  361. if not all_sent:
  362. # If not all of the data was sent successfully,
  363. # we might need to pause the protocol.
  364. self._maybe_pause_protocol()
  365. elif self._buffer_size > 0:
  366. self._maybe_pause_protocol()
  367. self._loop._queue_write(self)
  368. cdef inline _exec_write(self):
  369. cdef:
  370. int err
  371. int buf_len
  372. _StreamWriteContext ctx = None
  373. if self._closed:
  374. # If the handle is closed, just return, it's too
  375. # late to do anything.
  376. return
  377. buf_len = len(self._buffer)
  378. if not buf_len:
  379. return
  380. if (<uv.uv_stream_t*>self._handle).write_queue_size == 0:
  381. # libuv internal write buffers for this stream are empty.
  382. if buf_len == 1:
  383. # If we only have one piece of data to send, let's
  384. # use our fast implementation of try_write.
  385. data = self._buffer[0]
  386. sent = self._try_write(data)
  387. if sent is None:
  388. # A `self._fatal_error` was called.
  389. # It might not raise an exception under some
  390. # conditions.
  391. self._buffer_size = 0
  392. self._buffer.clear()
  393. if not self._closing:
  394. # This should never happen.
  395. raise RuntimeError(
  396. 'stream is open after UVStream._try_write '
  397. 'returned None')
  398. return
  399. if sent == 0:
  400. # All data was successfully written.
  401. self._buffer_size = 0
  402. self._buffer.clear()
  403. # on_write will call "maybe_resume_protocol".
  404. self._on_write()
  405. return True
  406. if sent > 0:
  407. if UVLOOP_DEBUG:
  408. if sent == len(data):
  409. raise RuntimeError(
  410. '_try_write sent all data and returned '
  411. 'non-zero')
  412. if PyBytes_CheckExact(data):
  413. # Cast bytes to memoryview to avoid copying
  414. # data that wasn't sent.
  415. data = memoryview(data)
  416. data = data[sent:]
  417. self._buffer_size -= sent
  418. self._buffer[0] = data
  419. # At this point it's either data was sent partially,
  420. # or an EAGAIN has happened.
  421. else:
  422. ctx = _StreamWriteContext.new(self, self._buffer)
  423. err = uv.uv_try_write(<uv.uv_stream_t*>self._handle,
  424. ctx.uv_bufs_start,
  425. ctx.uv_bufs_len)
  426. if err > 0:
  427. # Some data was successfully sent.
  428. if <size_t>err == self._buffer_size:
  429. # Everything was sent.
  430. ctx.close()
  431. self._buffer.clear()
  432. self._buffer_size = 0
  433. # on_write will call "maybe_resume_protocol".
  434. self._on_write()
  435. return True
  436. try:
  437. # Advance pointers to uv_bufs in `ctx`,
  438. # we will reuse it soon for a uv_write
  439. # call.
  440. ctx.advance_uv_buf(<ssize_t>err)
  441. except Exception as ex: # This should never happen.
  442. # Let's try to close the `ctx` anyways.
  443. ctx.close()
  444. self._fatal_error(ex, True)
  445. self._buffer.clear()
  446. self._buffer_size = 0
  447. return
  448. elif err != uv.UV_EAGAIN:
  449. ctx.close()
  450. exc = convert_error(err)
  451. self._fatal_error(exc, True)
  452. self._buffer.clear()
  453. self._buffer_size = 0
  454. return
  455. # fall through
  456. if ctx is None:
  457. ctx = _StreamWriteContext.new(self, self._buffer)
  458. err = uv.uv_write(&ctx.req,
  459. <uv.uv_stream_t*>self._handle,
  460. ctx.uv_bufs_start,
  461. ctx.uv_bufs_len,
  462. __uv_stream_on_write)
  463. self._buffer_size = 0
  464. # Can't use `_buffer.clear()` here: `ctx` holds a reference to
  465. # the `_buffer`.
  466. self._buffer = []
  467. if err < 0:
  468. # close write context
  469. ctx.close()
  470. exc = convert_error(err)
  471. self._fatal_error(exc, True)
  472. return
  473. self._maybe_resume_protocol()
  474. cdef size_t _get_write_buffer_size(self):
  475. if self._handle is NULL:
  476. return 0
  477. return ((<uv.uv_stream_t*>self._handle).write_queue_size +
  478. self._buffer_size)
  479. cdef _close(self):
  480. try:
  481. if self._read_pybuf_acquired:
  482. # Should never happen. libuv always calls uv_alloc/uv_read
  483. # in pairs.
  484. self._loop.call_exception_handler({
  485. 'transport': self,
  486. 'message': 'XXX: an allocated buffer in transport._close()'
  487. })
  488. self._read_pybuf_acquired = 0
  489. PyBuffer_Release(&self._read_pybuf)
  490. self._stop_reading()
  491. finally:
  492. UVSocketHandle._close(<UVHandle>self)
  493. cdef inline _on_accept(self):
  494. # Ultimately called by __uv_stream_on_listen.
  495. self._init_protocol()
  496. cdef inline _on_eof(self):
  497. # Any exception raised here will be caught in
  498. # __uv_stream_on_read.
  499. try:
  500. meth = self._protocol.eof_received
  501. except AttributeError:
  502. keep_open = False
  503. else:
  504. keep_open = run_in_context(self.context, meth)
  505. if keep_open:
  506. # We're keeping the connection open so the
  507. # protocol can write more, but we still can't
  508. # receive more, so remove the reader callback.
  509. self._stop_reading()
  510. else:
  511. self.close()
  512. cdef inline _on_write(self):
  513. self._maybe_resume_protocol()
  514. if not self._get_write_buffer_size():
  515. if self._closing:
  516. self._schedule_call_connection_lost(None)
  517. elif self._eof:
  518. self._shutdown()
  519. cdef inline _init(self, Loop loop, object protocol, Server server,
  520. object waiter, object context):
  521. self.context = context
  522. self._set_protocol(protocol)
  523. self._start_init(loop)
  524. if server is not None:
  525. self._set_server(server)
  526. if waiter is not None:
  527. self._set_waiter(waiter)
  528. cdef inline _on_connect(self, object exc):
  529. # Called from __tcp_connect_callback (tcp.pyx) and
  530. # __pipe_connect_callback (pipe.pyx).
  531. if exc is None:
  532. self._init_protocol()
  533. else:
  534. if self._waiter is None:
  535. self._fatal_error(exc, False, "connect failed")
  536. elif self._waiter.cancelled():
  537. # Connect call was cancelled; just close the transport
  538. # silently.
  539. self._close()
  540. elif self._waiter.done():
  541. self._fatal_error(exc, False, "connect failed")
  542. else:
  543. self._waiter.set_exception(exc)
  544. self._close()
  545. # === Public API ===
  546. def __repr__(self):
  547. return '<{} closed={} reading={} {:#x}>'.format(
  548. self.__class__.__name__,
  549. self._closed,
  550. self.__reading,
  551. id(self))
  552. def write(self, object buf):
  553. self._ensure_alive()
  554. if self._eof:
  555. raise RuntimeError('Cannot call write() after write_eof()')
  556. if not buf:
  557. return
  558. if self._conn_lost:
  559. self._conn_lost += 1
  560. return
  561. self._buffer_write(buf)
  562. self._initiate_write()
  563. def writelines(self, bufs):
  564. self._ensure_alive()
  565. if self._eof:
  566. raise RuntimeError('Cannot call writelines() after write_eof()')
  567. if self._conn_lost:
  568. self._conn_lost += 1
  569. return
  570. for buf in bufs:
  571. self._buffer_write(buf)
  572. self._initiate_write()
  573. def write_eof(self):
  574. self._ensure_alive()
  575. if self._eof:
  576. return
  577. self._eof = 1
  578. if not self._get_write_buffer_size():
  579. self._shutdown()
  580. def can_write_eof(self):
  581. return True
  582. def is_reading(self):
  583. return self._is_reading()
  584. def pause_reading(self):
  585. if self._closing or not self._is_reading():
  586. return
  587. self._stop_reading()
  588. def resume_reading(self):
  589. if self._is_reading() or self._closing:
  590. return
  591. self._start_reading()
  592. cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req,
  593. int status) noexcept with gil:
  594. # callback for uv_shutdown
  595. if req.data is NULL:
  596. aio_logger.error(
  597. 'UVStream.shutdown callback called with NULL req.data, status=%r',
  598. status)
  599. return
  600. cdef UVStream stream = <UVStream> req.data
  601. if status < 0 and status != uv.UV_ECANCELED:
  602. # From libuv source code:
  603. # The ECANCELED error code is a lie, the shutdown(2) syscall is a
  604. # fait accompli at this point. Maybe we should revisit this in
  605. # v0.11. A possible reason for leaving it unchanged is that it
  606. # informs the callee that the handle has been destroyed.
  607. if UVLOOP_DEBUG:
  608. stream._loop._debug_stream_shutdown_errors_total += 1
  609. exc = convert_error(status)
  610. stream._fatal_error(
  611. exc, False, "error status in uv_stream_t.shutdown callback")
  612. return
  613. cdef inline bint __uv_stream_on_read_common(
  614. UVStream sc,
  615. Loop loop,
  616. ssize_t nread,
  617. ):
  618. if sc._closed:
  619. # The stream was closed, there is no reason to
  620. # do any work now.
  621. sc.__reading_stopped() # Just in case.
  622. return True
  623. if nread == uv.UV_EOF:
  624. # From libuv docs:
  625. # The callee is responsible for stopping closing the stream
  626. # when an error happens by calling uv_read_stop() or uv_close().
  627. # Trying to read from the stream again is undefined.
  628. try:
  629. if UVLOOP_DEBUG:
  630. loop._debug_stream_read_eof_total += 1
  631. sc._stop_reading()
  632. sc._on_eof()
  633. except BaseException as ex:
  634. if UVLOOP_DEBUG:
  635. loop._debug_stream_read_eof_cb_errors_total += 1
  636. sc._fatal_error(ex, False)
  637. finally:
  638. return True
  639. if nread == 0:
  640. # From libuv docs:
  641. # nread might be 0, which does not indicate an error or EOF.
  642. # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
  643. return True
  644. if nread < 0:
  645. # From libuv docs:
  646. # The callee is responsible for stopping closing the stream
  647. # when an error happens by calling uv_read_stop() or uv_close().
  648. # Trying to read from the stream again is undefined.
  649. #
  650. # Therefore, we're closing the stream. Since "UVHandle._close()"
  651. # doesn't raise exceptions unless uvloop is built with DEBUG=1,
  652. # we don't need try...finally here.
  653. if UVLOOP_DEBUG:
  654. loop._debug_stream_read_errors_total += 1
  655. if sc.__read_error_close:
  656. # Used for getting notified when a pipe is closed.
  657. # See WriteUnixTransport for the explanation.
  658. sc._on_eof()
  659. return True
  660. exc = convert_error(nread)
  661. sc._fatal_error(
  662. exc, False, "error status in uv_stream_t.read callback")
  663. return True
  664. return False
  665. cdef inline void __uv_stream_on_read_impl(
  666. uv.uv_stream_t* stream,
  667. ssize_t nread,
  668. const uv.uv_buf_t* buf,
  669. ):
  670. cdef:
  671. UVStream sc = <UVStream>stream.data
  672. Loop loop = sc._loop
  673. # It's OK to free the buffer early, since nothing will
  674. # be able to touch it until this method is done.
  675. __loop_free_buffer(loop)
  676. if __uv_stream_on_read_common(sc, loop, nread):
  677. return
  678. try:
  679. if UVLOOP_DEBUG:
  680. loop._debug_stream_read_cb_total += 1
  681. run_in_context1(
  682. sc.context,
  683. sc._protocol_data_received,
  684. loop._recv_buffer[:nread],
  685. )
  686. except BaseException as exc:
  687. if UVLOOP_DEBUG:
  688. loop._debug_stream_read_cb_errors_total += 1
  689. sc._fatal_error(exc, False)
  690. cdef inline void __uv_stream_on_write_impl(
  691. uv.uv_write_t* req,
  692. int status,
  693. ):
  694. cdef:
  695. _StreamWriteContext ctx = <_StreamWriteContext> req.data
  696. UVStream stream = <UVStream>ctx.stream
  697. ctx.close()
  698. if stream._closed:
  699. # The stream was closed, there is nothing to do.
  700. # Even if there is an error, like EPIPE, there
  701. # is no reason to report it.
  702. return
  703. if status < 0:
  704. if UVLOOP_DEBUG:
  705. stream._loop._debug_stream_write_errors_total += 1
  706. exc = convert_error(status)
  707. stream._fatal_error(
  708. exc, False, "error status in uv_stream_t.write callback")
  709. return
  710. try:
  711. stream._on_write()
  712. except BaseException as exc:
  713. if UVLOOP_DEBUG:
  714. stream._loop._debug_stream_write_cb_errors_total += 1
  715. stream._fatal_error(exc, False)
  716. cdef void __uv_stream_on_read(
  717. uv.uv_stream_t* stream,
  718. ssize_t nread,
  719. const uv.uv_buf_t* buf,
  720. ) noexcept with gil:
  721. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  722. "UVStream read callback") == 0:
  723. return
  724. # Don't need try-finally, __uv_stream_on_read_impl is void
  725. __uv_stream_on_read_impl(stream, nread, buf)
  726. cdef void __uv_stream_on_write(
  727. uv.uv_write_t* req,
  728. int status,
  729. ) noexcept with gil:
  730. if UVLOOP_DEBUG:
  731. if req.data is NULL:
  732. aio_logger.error(
  733. 'UVStream.write callback called with NULL req.data, status=%r',
  734. status)
  735. return
  736. # Don't need try-finally, __uv_stream_on_write_impl is void
  737. __uv_stream_on_write_impl(req, status)
  738. cdef void __uv_stream_buffered_alloc(
  739. uv.uv_handle_t* stream,
  740. size_t suggested_size,
  741. uv.uv_buf_t* uvbuf,
  742. ) noexcept with gil:
  743. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  744. "UVStream alloc buffer callback") == 0:
  745. return
  746. cdef:
  747. UVStream sc = <UVStream>stream.data
  748. Loop loop = sc._loop
  749. Py_buffer* pybuf = &sc._read_pybuf
  750. int got_buf = 0
  751. if sc._read_pybuf_acquired:
  752. uvbuf.len = 0
  753. uvbuf.base = NULL
  754. return
  755. sc._read_pybuf_acquired = 0
  756. try:
  757. buf = run_in_context1(
  758. sc.context,
  759. sc._protocol_get_buffer,
  760. suggested_size,
  761. )
  762. PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE)
  763. got_buf = 1
  764. except BaseException as exc:
  765. # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
  766. # We'll do it later in __uv_stream_buffered_on_read when we
  767. # receive UV_ENOBUFS.
  768. uvbuf.len = 0
  769. uvbuf.base = NULL
  770. return
  771. if not pybuf.len:
  772. uvbuf.len = 0
  773. uvbuf.base = NULL
  774. if got_buf:
  775. PyBuffer_Release(pybuf)
  776. return
  777. sc._read_pybuf_acquired = 1
  778. uvbuf.base = <char*>pybuf.buf
  779. uvbuf.len = pybuf.len
  780. cdef void __uv_stream_buffered_on_read(
  781. uv.uv_stream_t* stream,
  782. ssize_t nread,
  783. const uv.uv_buf_t* buf,
  784. ) noexcept with gil:
  785. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  786. "UVStream buffered read callback") == 0:
  787. return
  788. cdef:
  789. UVStream sc = <UVStream>stream.data
  790. Loop loop = sc._loop
  791. Py_buffer* pybuf = &sc._read_pybuf
  792. if nread == uv.UV_ENOBUFS:
  793. sc._fatal_error(
  794. RuntimeError(
  795. 'unhandled error (or an empty buffer) in get_buffer()'),
  796. False)
  797. return
  798. try:
  799. if nread > 0 and not sc._read_pybuf_acquired:
  800. # From libuv docs:
  801. # nread is > 0 if there is data available or < 0 on error. When
  802. # we’ve reached EOF, nread will be set to UV_EOF. When
  803. # nread < 0, the buf parameter might not point to a valid
  804. # buffer; in that case buf.len and buf.base are both set to 0.
  805. raise RuntimeError(
  806. f'no python buffer is allocated in on_read; nread={nread}')
  807. if nread == 0:
  808. # From libuv docs:
  809. # nread might be 0, which does not indicate an error or EOF.
  810. # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
  811. return
  812. if __uv_stream_on_read_common(sc, loop, nread):
  813. return
  814. if UVLOOP_DEBUG:
  815. loop._debug_stream_read_cb_total += 1
  816. run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
  817. except BaseException as exc:
  818. if UVLOOP_DEBUG:
  819. loop._debug_stream_read_cb_errors_total += 1
  820. sc._fatal_error(exc, False)
  821. finally:
  822. sc._read_pybuf_acquired = 0
  823. PyBuffer_Release(pybuf)