You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

988 lines
31 KiB

  1. DEF __PREALLOCED_BUFS = 4
  2. @cython.no_gc_clear
  3. @cython.freelist(DEFAULT_FREELIST_SIZE)
  4. cdef class _StreamWriteContext:
  5. # used to hold additional write request information for uv_write
  6. cdef:
  7. uv.uv_write_t req
  8. list buffers
  9. uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS]
  10. Py_buffer py_bufs_sml[__PREALLOCED_BUFS]
  11. bint py_bufs_sml_inuse
  12. uv.uv_buf_t* uv_bufs
  13. Py_buffer* py_bufs
  14. size_t py_bufs_len
  15. uv.uv_buf_t* uv_bufs_start
  16. size_t uv_bufs_len
  17. UVStream stream
  18. bint closed
  19. cdef free_bufs(self):
  20. cdef size_t i
  21. if self.uv_bufs is not NULL:
  22. PyMem_RawFree(self.uv_bufs)
  23. self.uv_bufs = NULL
  24. if UVLOOP_DEBUG:
  25. if self.py_bufs_sml_inuse:
  26. raise RuntimeError(
  27. '_StreamWriteContext.close: uv_bufs != NULL and '
  28. 'py_bufs_sml_inuse is True')
  29. if self.py_bufs is not NULL:
  30. for i from 0 <= i < self.py_bufs_len:
  31. PyBuffer_Release(&self.py_bufs[i])
  32. PyMem_RawFree(self.py_bufs)
  33. self.py_bufs = NULL
  34. if UVLOOP_DEBUG:
  35. if self.py_bufs_sml_inuse:
  36. raise RuntimeError(
  37. '_StreamWriteContext.close: py_bufs != NULL and '
  38. 'py_bufs_sml_inuse is True')
  39. if self.py_bufs_sml_inuse:
  40. for i from 0 <= i < self.py_bufs_len:
  41. PyBuffer_Release(&self.py_bufs_sml[i])
  42. self.py_bufs_sml_inuse = 0
  43. self.py_bufs_len = 0
  44. self.buffers = None
  45. cdef close(self):
  46. if self.closed:
  47. return
  48. self.closed = 1
  49. self.free_bufs()
  50. Py_DECREF(self)
  51. cdef advance_uv_buf(self, size_t sent):
  52. # Advance the pointer to first uv_buf and the
  53. # pointer to first byte in that buffer.
  54. #
  55. # We do this after a "uv_try_write" call, which
  56. # sometimes sends only a portion of data.
  57. # We then call "advance_uv_buf" on the write
  58. # context, and reuse it in a "uv_write" call.
  59. cdef:
  60. uv.uv_buf_t* buf
  61. size_t idx
  62. for idx from 0 <= idx < self.uv_bufs_len:
  63. buf = &self.uv_bufs_start[idx]
  64. if buf.len > sent:
  65. buf.len -= sent
  66. buf.base = buf.base + sent
  67. self.uv_bufs_start = buf
  68. self.uv_bufs_len -= idx
  69. return
  70. else:
  71. sent -= self.uv_bufs_start[idx].len
  72. if UVLOOP_DEBUG:
  73. if sent < 0:
  74. raise RuntimeError('fatal: sent < 0 in advance_uv_buf')
  75. raise RuntimeError('fatal: Could not advance _StreamWriteContext')
  76. @staticmethod
  77. cdef _StreamWriteContext new(UVStream stream, list buffers):
  78. cdef:
  79. _StreamWriteContext ctx
  80. int uv_bufs_idx = 0
  81. size_t py_bufs_len = 0
  82. int i
  83. Py_buffer* p_pybufs
  84. uv.uv_buf_t* p_uvbufs
  85. ctx = _StreamWriteContext.__new__(_StreamWriteContext)
  86. ctx.stream = None
  87. ctx.closed = 1
  88. ctx.py_bufs_len = 0
  89. ctx.py_bufs_sml_inuse = 0
  90. ctx.uv_bufs = NULL
  91. ctx.py_bufs = NULL
  92. ctx.buffers = buffers
  93. ctx.stream = stream
  94. if len(buffers) <= __PREALLOCED_BUFS:
  95. # We've got a small number of buffers to write, don't
  96. # need to use malloc.
  97. ctx.py_bufs_sml_inuse = 1
  98. p_pybufs = <Py_buffer*>&ctx.py_bufs_sml
  99. p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml
  100. else:
  101. for buf in buffers:
  102. if UVLOOP_DEBUG:
  103. if not isinstance(buf, (bytes, bytearray, memoryview)):
  104. raise RuntimeError(
  105. 'invalid data in writebuf: an instance of '
  106. 'bytes, bytearray or memoryview was expected, '
  107. 'got {}'.format(type(buf)))
  108. if not PyBytes_CheckExact(buf):
  109. py_bufs_len += 1
  110. if py_bufs_len > 0:
  111. ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc(
  112. py_bufs_len * sizeof(Py_buffer))
  113. if ctx.py_bufs is NULL:
  114. raise MemoryError()
  115. ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc(
  116. len(buffers) * sizeof(uv.uv_buf_t))
  117. if ctx.uv_bufs is NULL:
  118. raise MemoryError()
  119. p_pybufs = ctx.py_bufs
  120. p_uvbufs = ctx.uv_bufs
  121. py_bufs_len = 0
  122. for buf in buffers:
  123. if PyBytes_CheckExact(buf):
  124. # We can only use this hack for bytes since it's
  125. # immutable. For everything else it is only safe to
  126. # use buffer protocol.
  127. p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf)
  128. p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf)
  129. else:
  130. try:
  131. PyObject_GetBuffer(
  132. buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
  133. except Exception:
  134. # This shouldn't ever happen, as `UVStream._write`
  135. # casts non-bytes objects to `memoryviews`.
  136. ctx.py_bufs_len = py_bufs_len
  137. ctx.free_bufs()
  138. raise
  139. p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf
  140. p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len
  141. py_bufs_len += 1
  142. uv_bufs_idx += 1
  143. ctx.uv_bufs_start = p_uvbufs
  144. ctx.uv_bufs_len = uv_bufs_idx
  145. ctx.py_bufs_len = py_bufs_len
  146. ctx.req.data = <void*> ctx
  147. if UVLOOP_DEBUG:
  148. stream._loop._debug_stream_write_ctx_total += 1
  149. stream._loop._debug_stream_write_ctx_cnt += 1
  150. # Do incref after everything else is done.
  151. # Under no circumstances we want `ctx` to be GCed while
  152. # libuv is still working with `ctx.uv_bufs`.
  153. Py_INCREF(ctx)
  154. ctx.closed = 0
  155. return ctx
  156. def __dealloc__(self):
  157. if not self.closed:
  158. # Because we do an INCREF in _StreamWriteContext.new,
  159. # __dealloc__ shouldn't ever happen with `self.closed == 1`
  160. raise RuntimeError(
  161. 'open _StreamWriteContext is being deallocated')
  162. if UVLOOP_DEBUG:
  163. if self.stream is not None:
  164. self.stream._loop._debug_stream_write_ctx_cnt -= 1
  165. self.stream = None
  166. @cython.no_gc_clear
  167. cdef class UVStream(UVBaseTransport):
  168. def __cinit__(self):
  169. self.__shutting_down = 0
  170. self.__reading = 0
  171. self.__read_error_close = 0
  172. self.__buffered = 0
  173. self._eof = 0
  174. self._buffer = []
  175. self._buffer_size = 0
  176. self._protocol_get_buffer = None
  177. self._protocol_buffer_updated = None
  178. self._read_pybuf_acquired = False
  179. cdef _set_protocol(self, object protocol):
  180. if protocol is None:
  181. raise TypeError('protocol is required')
  182. UVBaseTransport._set_protocol(self, protocol)
  183. if (hasattr(protocol, 'get_buffer') and
  184. not isinstance(protocol, aio_Protocol)):
  185. try:
  186. self._protocol_get_buffer = protocol.get_buffer
  187. self._protocol_buffer_updated = protocol.buffer_updated
  188. self.__buffered = 1
  189. except AttributeError:
  190. pass
  191. else:
  192. self.__buffered = 0
  193. cdef _clear_protocol(self):
  194. UVBaseTransport._clear_protocol(self)
  195. self._protocol_get_buffer = None
  196. self._protocol_buffer_updated = None
  197. self.__buffered = 0
  198. cdef inline _shutdown(self):
  199. cdef int err
  200. if self.__shutting_down:
  201. return
  202. self.__shutting_down = 1
  203. self._ensure_alive()
  204. self._shutdown_req.data = <void*> self
  205. err = uv.uv_shutdown(&self._shutdown_req,
  206. <uv.uv_stream_t*> self._handle,
  207. __uv_stream_on_shutdown)
  208. if err < 0:
  209. exc = convert_error(err)
  210. self._fatal_error(exc, True)
  211. return
  212. cdef inline _accept(self, UVStream server):
  213. cdef int err
  214. self._ensure_alive()
  215. err = uv.uv_accept(<uv.uv_stream_t*>server._handle,
  216. <uv.uv_stream_t*>self._handle)
  217. if err < 0:
  218. exc = convert_error(err)
  219. self._fatal_error(exc, True)
  220. return
  221. self._on_accept()
  222. cdef inline _close_on_read_error(self):
  223. self.__read_error_close = 1
  224. cdef bint _is_reading(self):
  225. return self.__reading
  226. cdef _start_reading(self):
  227. cdef int err
  228. if self._closing:
  229. return
  230. self._ensure_alive()
  231. if self.__reading:
  232. return
  233. if self.__buffered:
  234. err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
  235. __uv_stream_buffered_alloc,
  236. __uv_stream_buffered_on_read)
  237. else:
  238. err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
  239. __loop_alloc_buffer,
  240. __uv_stream_on_read)
  241. if err < 0:
  242. exc = convert_error(err)
  243. self._fatal_error(exc, True)
  244. return
  245. else:
  246. # UVStream must live until the read callback is called
  247. self.__reading_started()
  248. cdef inline __reading_started(self):
  249. if self.__reading:
  250. return
  251. self.__reading = 1
  252. Py_INCREF(self)
  253. cdef inline __reading_stopped(self):
  254. if not self.__reading:
  255. return
  256. self.__reading = 0
  257. Py_DECREF(self)
  258. cdef _stop_reading(self):
  259. cdef int err
  260. if not self.__reading:
  261. return
  262. self._ensure_alive()
  263. # From libuv docs:
  264. # This function is idempotent and may be safely
  265. # called on a stopped stream.
  266. err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle)
  267. if err < 0:
  268. exc = convert_error(err)
  269. self._fatal_error(exc, True)
  270. return
  271. else:
  272. self.__reading_stopped()
  273. cdef inline _try_write(self, object data):
  274. cdef:
  275. ssize_t written
  276. bint used_buf = 0
  277. Py_buffer py_buf
  278. void* buf
  279. size_t blen
  280. int saved_errno
  281. int fd
  282. if (<uv.uv_stream_t*>self._handle).write_queue_size != 0:
  283. raise RuntimeError(
  284. 'UVStream._try_write called with data in uv buffers')
  285. if PyBytes_CheckExact(data):
  286. # We can only use this hack for bytes since it's
  287. # immutable. For everything else it is only safe to
  288. # use buffer protocol.
  289. buf = <void*>PyBytes_AS_STRING(data)
  290. blen = Py_SIZE(data)
  291. else:
  292. PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE)
  293. used_buf = 1
  294. buf = py_buf.buf
  295. blen = py_buf.len
  296. if blen == 0:
  297. # Empty data, do nothing.
  298. return 0
  299. fd = self._fileno()
  300. # Use `unistd.h/write` directly, it's faster than
  301. # uv_try_write -- less layers of code. The error
  302. # checking logic is copied from libuv.
  303. written = system.write(fd, buf, blen)
  304. while written == -1 and (
  305. errno.errno == errno.EINTR or
  306. (system.PLATFORM_IS_APPLE and
  307. errno.errno == errno.EPROTOTYPE)):
  308. # From libuv code (unix/stream.c):
  309. # Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
  310. # EPROTOTYPE can be returned while trying to write to a socket
  311. # that is shutting down. If we retry the write, we should get
  312. # the expected EPIPE instead.
  313. written = system.write(fd, buf, blen)
  314. saved_errno = errno.errno
  315. if used_buf:
  316. PyBuffer_Release(&py_buf)
  317. if written < 0:
  318. if saved_errno == errno.EAGAIN or \
  319. saved_errno == system.EWOULDBLOCK:
  320. return -1
  321. else:
  322. exc = convert_error(-saved_errno)
  323. self._fatal_error(exc, True)
  324. return
  325. if UVLOOP_DEBUG:
  326. self._loop._debug_stream_write_tries += 1
  327. if <size_t>written == blen:
  328. return 0
  329. return written
  330. cdef inline _write(self, object data):
  331. cdef int dlen
  332. if not PyBytes_CheckExact(data):
  333. data = memoryview(data).cast('b')
  334. dlen = len(data)
  335. if not dlen:
  336. return
  337. self._buffer_size += dlen
  338. self._buffer.append(data)
  339. if (not self._protocol_paused and
  340. (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
  341. self._buffer_size > self._high_water):
  342. # Fast-path. If:
  343. # - the protocol isn't yet paused,
  344. # - there is no data in libuv buffers for this stream,
  345. # - the protocol will be paused if we continue to buffer data
  346. #
  347. # Then:
  348. # - Try to write all buffered data right now.
  349. all_sent = self._exec_write()
  350. if UVLOOP_DEBUG:
  351. if self._buffer_size != 0 or self._buffer != []:
  352. raise RuntimeError(
  353. '_buffer_size is not 0 after a successful _exec_write')
  354. # There is no need to call `_queue_write` anymore,
  355. # as `uv_write` should be called already.
  356. if not all_sent:
  357. # If not all of the data was sent successfully,
  358. # we might need to pause the protocol.
  359. self._maybe_pause_protocol()
  360. return
  361. self._maybe_pause_protocol()
  362. self._loop._queue_write(self)
  363. cdef inline _exec_write(self):
  364. cdef:
  365. int err
  366. int buf_len
  367. _StreamWriteContext ctx = None
  368. if self._closed:
  369. # If the handle is closed, just return, it's too
  370. # late to do anything.
  371. return
  372. buf_len = len(self._buffer)
  373. if not buf_len:
  374. return
  375. if (<uv.uv_stream_t*>self._handle).write_queue_size == 0:
  376. # libuv internal write buffers for this stream are empty.
  377. if buf_len == 1:
  378. # If we only have one piece of data to send, let's
  379. # use our fast implementation of try_write.
  380. data = self._buffer[0]
  381. sent = self._try_write(data)
  382. if sent is None:
  383. # A `self._fatal_error` was called.
  384. # It might not raise an exception under some
  385. # conditions.
  386. self._buffer_size = 0
  387. self._buffer.clear()
  388. if not self._closing:
  389. # This should never happen.
  390. raise RuntimeError(
  391. 'stream is open after UVStream._try_write '
  392. 'returned None')
  393. return
  394. if sent == 0:
  395. # All data was successfully written.
  396. self._buffer_size = 0
  397. self._buffer.clear()
  398. # on_write will call "maybe_resume_protocol".
  399. self._on_write()
  400. return True
  401. if sent > 0:
  402. if UVLOOP_DEBUG:
  403. if sent == len(data):
  404. raise RuntimeError(
  405. '_try_write sent all data and returned '
  406. 'non-zero')
  407. if PyBytes_CheckExact(data):
  408. # Cast bytes to memoryview to avoid copying
  409. # data that wasn't sent.
  410. data = memoryview(data)
  411. data = data[sent:]
  412. self._buffer_size -= sent
  413. self._buffer[0] = data
  414. # At this point it's either data was sent partially,
  415. # or an EAGAIN has happened.
  416. else:
  417. ctx = _StreamWriteContext.new(self, self._buffer)
  418. err = uv.uv_try_write(<uv.uv_stream_t*>self._handle,
  419. ctx.uv_bufs_start,
  420. ctx.uv_bufs_len)
  421. if err > 0:
  422. # Some data was successfully sent.
  423. if <size_t>err == self._buffer_size:
  424. # Everything was sent.
  425. ctx.close()
  426. self._buffer.clear()
  427. self._buffer_size = 0
  428. # on_write will call "maybe_resume_protocol".
  429. self._on_write()
  430. return True
  431. try:
  432. # Advance pointers to uv_bufs in `ctx`,
  433. # we will reuse it soon for a uv_write
  434. # call.
  435. ctx.advance_uv_buf(<ssize_t>err)
  436. except Exception as ex: # This should never happen.
  437. # Let's try to close the `ctx` anyways.
  438. ctx.close()
  439. self._fatal_error(ex, True)
  440. self._buffer.clear()
  441. self._buffer_size = 0
  442. return
  443. elif err != uv.UV_EAGAIN:
  444. ctx.close()
  445. exc = convert_error(err)
  446. self._fatal_error(exc, True)
  447. self._buffer.clear()
  448. self._buffer_size = 0
  449. return
  450. # fall through
  451. if ctx is None:
  452. ctx = _StreamWriteContext.new(self, self._buffer)
  453. err = uv.uv_write(&ctx.req,
  454. <uv.uv_stream_t*>self._handle,
  455. ctx.uv_bufs_start,
  456. ctx.uv_bufs_len,
  457. __uv_stream_on_write)
  458. self._buffer_size = 0
  459. # Can't use `_buffer.clear()` here: `ctx` holds a reference to
  460. # the `_buffer`.
  461. self._buffer = []
  462. if err < 0:
  463. # close write context
  464. ctx.close()
  465. exc = convert_error(err)
  466. self._fatal_error(exc, True)
  467. return
  468. self._maybe_resume_protocol()
  469. cdef size_t _get_write_buffer_size(self):
  470. if self._handle is NULL:
  471. return 0
  472. return ((<uv.uv_stream_t*>self._handle).write_queue_size +
  473. self._buffer_size)
  474. cdef _close(self):
  475. try:
  476. if self._read_pybuf_acquired:
  477. # Should never happen. libuv always calls uv_alloc/uv_read
  478. # in pairs.
  479. self._loop.call_exception_handler({
  480. 'transport': self,
  481. 'message': 'XXX: an allocated buffer in transport._close()'
  482. })
  483. self._read_pybuf_acquired = 0
  484. PyBuffer_Release(&self._read_pybuf)
  485. self._stop_reading()
  486. finally:
  487. UVSocketHandle._close(<UVHandle>self)
  488. cdef inline _on_accept(self):
  489. # Ultimately called by __uv_stream_on_listen.
  490. self._init_protocol()
  491. cdef inline _on_eof(self):
  492. # Any exception raised here will be caught in
  493. # __uv_stream_on_read.
  494. try:
  495. meth = self._protocol.eof_received
  496. except AttributeError:
  497. keep_open = False
  498. else:
  499. keep_open = meth()
  500. if keep_open:
  501. # We're keeping the connection open so the
  502. # protocol can write more, but we still can't
  503. # receive more, so remove the reader callback.
  504. self._stop_reading()
  505. else:
  506. self.close()
  507. cdef inline _on_write(self):
  508. self._maybe_resume_protocol()
  509. if not self._get_write_buffer_size():
  510. if self._closing:
  511. self._schedule_call_connection_lost(None)
  512. elif self._eof:
  513. self._shutdown()
  514. cdef inline _init(self, Loop loop, object protocol, Server server,
  515. object waiter):
  516. self._set_protocol(protocol)
  517. self._start_init(loop)
  518. if server is not None:
  519. self._set_server(server)
  520. if waiter is not None:
  521. self._set_waiter(waiter)
  522. cdef inline _on_connect(self, object exc):
  523. # Called from __tcp_connect_callback (tcp.pyx) and
  524. # __pipe_connect_callback (pipe.pyx).
  525. if exc is None:
  526. self._init_protocol()
  527. else:
  528. if self._waiter is None:
  529. self._fatal_error(exc, False, "connect failed")
  530. elif self._waiter.cancelled():
  531. # Connect call was cancelled; just close the transport
  532. # silently.
  533. self._close()
  534. elif self._waiter.done():
  535. self._fatal_error(exc, False, "connect failed")
  536. else:
  537. self._waiter.set_exception(exc)
  538. self._close()
  539. # === Public API ===
  540. def __repr__(self):
  541. return '<{} closed={} reading={} {:#x}>'.format(
  542. self.__class__.__name__,
  543. self._closed,
  544. self.__reading,
  545. id(self))
  546. def write(self, object buf):
  547. self._ensure_alive()
  548. if self._eof:
  549. raise RuntimeError('Cannot call write() after write_eof()')
  550. if not buf:
  551. return
  552. if self._conn_lost:
  553. self._conn_lost += 1
  554. return
  555. self._write(buf)
  556. def writelines(self, bufs):
  557. self._ensure_alive()
  558. if self._eof:
  559. raise RuntimeError('Cannot call writelines() after write_eof()')
  560. if self._conn_lost:
  561. self._conn_lost += 1
  562. return
  563. for buf in bufs:
  564. self._write(buf)
  565. def write_eof(self):
  566. self._ensure_alive()
  567. if self._eof:
  568. return
  569. self._eof = 1
  570. if not self._get_write_buffer_size():
  571. self._shutdown()
  572. def can_write_eof(self):
  573. return True
  574. def is_reading(self):
  575. return self._is_reading()
  576. def pause_reading(self):
  577. if self._closing or not self._is_reading():
  578. return
  579. self._stop_reading()
  580. def resume_reading(self):
  581. if self._is_reading() or self._closing:
  582. return
  583. self._start_reading()
  584. cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req,
  585. int status) with gil:
  586. # callback for uv_shutdown
  587. if req.data is NULL:
  588. aio_logger.error(
  589. 'UVStream.shutdown callback called with NULL req.data, status=%r',
  590. status)
  591. return
  592. cdef UVStream stream = <UVStream> req.data
  593. if status < 0 and status != uv.UV_ECANCELED:
  594. # From libuv source code:
  595. # The ECANCELED error code is a lie, the shutdown(2) syscall is a
  596. # fait accompli at this point. Maybe we should revisit this in
  597. # v0.11. A possible reason for leaving it unchanged is that it
  598. # informs the callee that the handle has been destroyed.
  599. if UVLOOP_DEBUG:
  600. stream._loop._debug_stream_shutdown_errors_total += 1
  601. exc = convert_error(status)
  602. stream._fatal_error(
  603. exc, False, "error status in uv_stream_t.shutdown callback")
  604. return
  605. cdef inline bint __uv_stream_on_read_common(UVStream sc, Loop loop,
  606. ssize_t nread):
  607. if sc._closed:
  608. # The stream was closed, there is no reason to
  609. # do any work now.
  610. sc.__reading_stopped() # Just in case.
  611. return True
  612. if nread == uv.UV_EOF:
  613. # From libuv docs:
  614. # The callee is responsible for stopping closing the stream
  615. # when an error happens by calling uv_read_stop() or uv_close().
  616. # Trying to read from the stream again is undefined.
  617. try:
  618. if UVLOOP_DEBUG:
  619. loop._debug_stream_read_eof_total += 1
  620. sc._stop_reading()
  621. sc._on_eof()
  622. except BaseException as ex:
  623. if UVLOOP_DEBUG:
  624. loop._debug_stream_read_eof_cb_errors_total += 1
  625. sc._fatal_error(ex, False)
  626. finally:
  627. return True
  628. if nread == 0:
  629. # From libuv docs:
  630. # nread might be 0, which does not indicate an error or EOF.
  631. # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
  632. return True
  633. if nread < 0:
  634. # From libuv docs:
  635. # The callee is responsible for stopping closing the stream
  636. # when an error happens by calling uv_read_stop() or uv_close().
  637. # Trying to read from the stream again is undefined.
  638. #
  639. # Therefore, we're closing the stream. Since "UVHandle._close()"
  640. # doesn't raise exceptions unless uvloop is built with DEBUG=1,
  641. # we don't need try...finally here.
  642. if UVLOOP_DEBUG:
  643. loop._debug_stream_read_errors_total += 1
  644. if sc.__read_error_close:
  645. # Used for getting notified when a pipe is closed.
  646. # See WriteUnixTransport for the explanation.
  647. sc._on_eof()
  648. return True
  649. exc = convert_error(nread)
  650. sc._fatal_error(
  651. exc, False, "error status in uv_stream_t.read callback")
  652. return True
  653. return False
  654. cdef inline void __uv_stream_on_read_impl(uv.uv_stream_t* stream,
  655. ssize_t nread,
  656. const uv.uv_buf_t* buf):
  657. cdef:
  658. UVStream sc = <UVStream>stream.data
  659. Loop loop = sc._loop
  660. # It's OK to free the buffer early, since nothing will
  661. # be able to touch it until this method is done.
  662. __loop_free_buffer(loop)
  663. if __uv_stream_on_read_common(sc, loop, nread):
  664. return
  665. try:
  666. if UVLOOP_DEBUG:
  667. loop._debug_stream_read_cb_total += 1
  668. sc._protocol_data_received(loop._recv_buffer[:nread])
  669. except BaseException as exc:
  670. if UVLOOP_DEBUG:
  671. loop._debug_stream_read_cb_errors_total += 1
  672. sc._fatal_error(exc, False)
  673. cdef inline void __uv_stream_on_write_impl(uv.uv_write_t* req, int status):
  674. cdef:
  675. _StreamWriteContext ctx = <_StreamWriteContext> req.data
  676. UVStream stream = <UVStream>ctx.stream
  677. ctx.close()
  678. if stream._closed:
  679. # The stream was closed, there is nothing to do.
  680. # Even if there is an error, like EPIPE, there
  681. # is no reason to report it.
  682. return
  683. if status < 0:
  684. if UVLOOP_DEBUG:
  685. stream._loop._debug_stream_write_errors_total += 1
  686. exc = convert_error(status)
  687. stream._fatal_error(
  688. exc, False, "error status in uv_stream_t.write callback")
  689. return
  690. try:
  691. stream._on_write()
  692. except BaseException as exc:
  693. if UVLOOP_DEBUG:
  694. stream._loop._debug_stream_write_cb_errors_total += 1
  695. stream._fatal_error(exc, False)
  696. cdef void __uv_stream_on_read(uv.uv_stream_t* stream,
  697. ssize_t nread,
  698. const uv.uv_buf_t* buf) with gil:
  699. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  700. "UVStream read callback") == 0:
  701. return
  702. # Don't need try-finally, __uv_stream_on_read_impl is void
  703. __uv_stream_on_read_impl(stream, nread, buf)
  704. cdef void __uv_stream_on_write(uv.uv_write_t* req, int status) with gil:
  705. if UVLOOP_DEBUG:
  706. if req.data is NULL:
  707. aio_logger.error(
  708. 'UVStream.write callback called with NULL req.data, status=%r',
  709. status)
  710. return
  711. # Don't need try-finally, __uv_stream_on_write_impl is void
  712. __uv_stream_on_write_impl(req, status)
  713. cdef void __uv_stream_buffered_alloc(uv.uv_handle_t* stream,
  714. size_t suggested_size,
  715. uv.uv_buf_t* uvbuf) with gil:
  716. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  717. "UVStream alloc buffer callback") == 0:
  718. return
  719. cdef:
  720. UVStream sc = <UVStream>stream.data
  721. Loop loop = sc._loop
  722. Py_buffer* pybuf = &sc._read_pybuf
  723. int got_buf = 0
  724. if sc._read_pybuf_acquired:
  725. uvbuf.len = 0
  726. uvbuf.base = NULL
  727. return
  728. sc._read_pybuf_acquired = 0
  729. try:
  730. buf = sc._protocol_get_buffer(suggested_size)
  731. PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE)
  732. got_buf = 1
  733. except BaseException as exc:
  734. # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
  735. # We'll do it later in __uv_stream_buffered_on_read when we
  736. # receive UV_ENOBUFS.
  737. uvbuf.len = 0
  738. uvbuf.base = NULL
  739. return
  740. if not pybuf.len:
  741. uvbuf.len = 0
  742. uvbuf.base = NULL
  743. if got_buf:
  744. PyBuffer_Release(pybuf)
  745. return
  746. sc._read_pybuf_acquired = 1
  747. uvbuf.base = <char*>pybuf.buf
  748. uvbuf.len = pybuf.len
  749. cdef void __uv_stream_buffered_on_read(uv.uv_stream_t* stream,
  750. ssize_t nread,
  751. const uv.uv_buf_t* buf) with gil:
  752. if __ensure_handle_data(<uv.uv_handle_t*>stream,
  753. "UVStream buffered read callback") == 0:
  754. return
  755. cdef:
  756. UVStream sc = <UVStream>stream.data
  757. Loop loop = sc._loop
  758. Py_buffer* pybuf = &sc._read_pybuf
  759. if nread == uv.UV_ENOBUFS:
  760. sc._fatal_error(
  761. RuntimeError(
  762. 'unhandled error (or an empty buffer) in get_buffer()'),
  763. False)
  764. return
  765. try:
  766. if nread > 0 and not sc._read_pybuf_acquired:
  767. # From libuv docs:
  768. # nread is > 0 if there is data available or < 0 on error. When
  769. # we’ve reached EOF, nread will be set to UV_EOF. When
  770. # nread < 0, the buf parameter might not point to a valid
  771. # buffer; in that case buf.len and buf.base are both set to 0.
  772. raise RuntimeError(
  773. f'no python buffer is allocated in on_read; nread={nread}')
  774. if nread == 0:
  775. # From libuv docs:
  776. # nread might be 0, which does not indicate an error or EOF.
  777. # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
  778. return
  779. if __uv_stream_on_read_common(sc, loop, nread):
  780. return
  781. if UVLOOP_DEBUG:
  782. loop._debug_stream_read_cb_total += 1
  783. sc._protocol_buffer_updated(nread)
  784. except BaseException as exc:
  785. if UVLOOP_DEBUG:
  786. loop._debug_stream_read_cb_errors_total += 1
  787. sc._fatal_error(exc, False)
  788. finally:
  789. sc._read_pybuf_acquired = 0
  790. PyBuffer_Release(pybuf)