Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

852 řádky
29 KiB

  1. cdef _create_transport_context(server_side, server_hostname):
  2. if server_side:
  3. raise ValueError('Server side SSL needs a valid SSLContext')
  4. # Client side may pass ssl=True to use a default
  5. # context; in that case the sslcontext passed is None.
  6. # The default is secure for client connections.
  7. # Python 3.4+: use up-to-date strong settings.
  8. sslcontext = ssl_create_default_context()
  9. if not server_hostname:
  10. sslcontext.check_hostname = False
  11. return sslcontext
  12. cdef class _SSLProtocolTransport:
  13. # TODO:
  14. # _sendfile_compatible = constants._SendfileMode.FALLBACK
  15. def __cinit__(self, loop, ssl_protocol):
  16. self._loop = loop
  17. # SSLProtocol instance
  18. self._ssl_protocol = ssl_protocol
  19. self._closed = False
  20. def get_extra_info(self, name, default=None):
  21. """Get optional transport information."""
  22. return self._ssl_protocol._get_extra_info(name, default)
  23. def set_protocol(self, protocol):
  24. self._ssl_protocol._set_app_protocol(protocol)
  25. def get_protocol(self):
  26. return self._ssl_protocol._app_protocol
  27. def is_closing(self):
  28. return self._closed
  29. def close(self):
  30. """Close the transport.
  31. Buffered data will be flushed asynchronously. No more data
  32. will be received. After all buffered data is flushed, the
  33. protocol's connection_lost() method will (eventually) called
  34. with None as its argument.
  35. """
  36. self._closed = True
  37. self._ssl_protocol._start_shutdown()
  38. def __dealloc__(self):
  39. if not self._closed:
  40. self._closed = True
  41. warnings_warn(
  42. "unclosed transport <uvloop.loop._SSLProtocolTransport "
  43. "object>", ResourceWarning)
  44. def is_reading(self):
  45. return not self._ssl_protocol._app_reading_paused
  46. def pause_reading(self):
  47. """Pause the receiving end.
  48. No data will be passed to the protocol's data_received()
  49. method until resume_reading() is called.
  50. """
  51. self._ssl_protocol._pause_reading()
  52. def resume_reading(self):
  53. """Resume the receiving end.
  54. Data received will once again be passed to the protocol's
  55. data_received() method.
  56. """
  57. self._ssl_protocol._resume_reading()
  58. def set_write_buffer_limits(self, high=None, low=None):
  59. """Set the high- and low-water limits for write flow control.
  60. These two values control when to call the protocol's
  61. pause_writing() and resume_writing() methods. If specified,
  62. the low-water limit must be less than or equal to the
  63. high-water limit. Neither value can be negative.
  64. The defaults are implementation-specific. If only the
  65. high-water limit is given, the low-water limit defaults to an
  66. implementation-specific value less than or equal to the
  67. high-water limit. Setting high to zero forces low to zero as
  68. well, and causes pause_writing() to be called whenever the
  69. buffer becomes non-empty. Setting low to zero causes
  70. resume_writing() to be called only once the buffer is empty.
  71. Use of zero for either limit is generally sub-optimal as it
  72. reduces opportunities for doing I/O and computation
  73. concurrently.
  74. """
  75. self._ssl_protocol._set_write_buffer_limits(high, low)
  76. self._ssl_protocol._control_app_writing()
  77. def get_write_buffer_limits(self):
  78. return (self._ssl_protocol._outgoing_low_water,
  79. self._ssl_protocol._outgoing_high_water)
  80. def get_write_buffer_size(self):
  81. """Return the current size of the write buffers."""
  82. return self._ssl_protocol._get_write_buffer_size()
  83. def set_read_buffer_limits(self, high=None, low=None):
  84. """Set the high- and low-water limits for read flow control.
  85. These two values control when to call the upstream transport's
  86. pause_reading() and resume_reading() methods. If specified,
  87. the low-water limit must be less than or equal to the
  88. high-water limit. Neither value can be negative.
  89. The defaults are implementation-specific. If only the
  90. high-water limit is given, the low-water limit defaults to an
  91. implementation-specific value less than or equal to the
  92. high-water limit. Setting high to zero forces low to zero as
  93. well, and causes pause_reading() to be called whenever the
  94. buffer becomes non-empty. Setting low to zero causes
  95. resume_reading() to be called only once the buffer is empty.
  96. Use of zero for either limit is generally sub-optimal as it
  97. reduces opportunities for doing I/O and computation
  98. concurrently.
  99. """
  100. self._ssl_protocol._set_read_buffer_limits(high, low)
  101. self._ssl_protocol._control_ssl_reading()
  102. def get_read_buffer_limits(self):
  103. return (self._ssl_protocol._incoming_low_water,
  104. self._ssl_protocol._incoming_high_water)
  105. def get_read_buffer_size(self):
  106. """Return the current size of the read buffer."""
  107. return self._ssl_protocol._get_read_buffer_size()
  108. @property
  109. def _protocol_paused(self):
  110. # Required for sendfile fallback pause_writing/resume_writing logic
  111. return self._ssl_protocol._app_writing_paused
  112. def write(self, data):
  113. """Write some data bytes to the transport.
  114. This does not block; it buffers the data and arranges for it
  115. to be sent out asynchronously.
  116. """
  117. if not isinstance(data, (bytes, bytearray, memoryview)):
  118. raise TypeError(f"data: expecting a bytes-like instance, "
  119. f"got {type(data).__name__}")
  120. if not data:
  121. return
  122. self._ssl_protocol._write_appdata((data,))
  123. def writelines(self, list_of_data):
  124. """Write a list (or any iterable) of data bytes to the transport.
  125. The default implementation concatenates the arguments and
  126. calls write() on the result.
  127. """
  128. self._ssl_protocol._write_appdata(list_of_data)
  129. def write_eof(self):
  130. """Close the write end after flushing buffered data.
  131. This raises :exc:`NotImplementedError` right now.
  132. """
  133. raise NotImplementedError
  134. def can_write_eof(self):
  135. """Return True if this transport supports write_eof(), False if not."""
  136. return False
  137. def abort(self):
  138. """Close the transport immediately.
  139. Buffered data will be lost. No more data will be received.
  140. The protocol's connection_lost() method will (eventually) be
  141. called with None as its argument.
  142. """
  143. self._force_close(None)
  144. def _force_close(self, exc):
  145. self._closed = True
  146. self._ssl_protocol._abort(exc)
  147. def _test__append_write_backlog(self, data):
  148. # for test only
  149. self._ssl_protocol._write_backlog.append(data)
  150. self._ssl_protocol._write_buffer_size += len(data)
  151. cdef class SSLProtocol:
  152. """SSL protocol.
  153. Implementation of SSL on top of a socket using incoming and outgoing
  154. buffers which are ssl.MemoryBIO objects.
  155. """
  156. def __cinit__(self, *args, **kwargs):
  157. self._ssl_buffer_len = SSL_READ_MAX_SIZE
  158. self._ssl_buffer = <char*>PyMem_RawMalloc(self._ssl_buffer_len)
  159. if not self._ssl_buffer:
  160. raise MemoryError()
  161. self._ssl_buffer_view = PyMemoryView_FromMemory(
  162. self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE)
  163. def __dealloc__(self):
  164. self._ssl_buffer_view = None
  165. PyMem_RawFree(self._ssl_buffer)
  166. self._ssl_buffer = NULL
  167. self._ssl_buffer_len = 0
  168. def __init__(self, loop, app_protocol, sslcontext, waiter,
  169. server_side=False, server_hostname=None,
  170. call_connection_made=True,
  171. ssl_handshake_timeout=None,
  172. ssl_shutdown_timeout=None):
  173. if ssl_handshake_timeout is None:
  174. ssl_handshake_timeout = SSL_HANDSHAKE_TIMEOUT
  175. elif ssl_handshake_timeout <= 0:
  176. raise ValueError(
  177. f"ssl_handshake_timeout should be a positive number, "
  178. f"got {ssl_handshake_timeout}")
  179. if ssl_shutdown_timeout is None:
  180. ssl_shutdown_timeout = SSL_SHUTDOWN_TIMEOUT
  181. elif ssl_shutdown_timeout <= 0:
  182. raise ValueError(
  183. f"ssl_shutdown_timeout should be a positive number, "
  184. f"got {ssl_shutdown_timeout}")
  185. if not sslcontext:
  186. sslcontext = _create_transport_context(
  187. server_side, server_hostname)
  188. self._server_side = server_side
  189. if server_hostname and not server_side:
  190. self._server_hostname = server_hostname
  191. else:
  192. self._server_hostname = None
  193. self._sslcontext = sslcontext
  194. # SSL-specific extra info. More info are set when the handshake
  195. # completes.
  196. self._extra = dict(sslcontext=sslcontext)
  197. # App data write buffering
  198. self._write_backlog = col_deque()
  199. self._write_buffer_size = 0
  200. self._waiter = waiter
  201. self._loop = loop
  202. self._set_app_protocol(app_protocol)
  203. self._app_transport = None
  204. self._app_transport_created = False
  205. # transport, ex: SelectorSocketTransport
  206. self._transport = None
  207. self._call_connection_made = call_connection_made
  208. self._ssl_handshake_timeout = ssl_handshake_timeout
  209. self._ssl_shutdown_timeout = ssl_shutdown_timeout
  210. # SSL and state machine
  211. self._sslobj = None
  212. self._incoming = ssl_MemoryBIO()
  213. self._incoming_write = self._incoming.write
  214. self._outgoing = ssl_MemoryBIO()
  215. self._outgoing_read = self._outgoing.read
  216. self._state = UNWRAPPED
  217. self._conn_lost = 0 # Set when connection_lost called
  218. self._eof_received = False
  219. # Flow Control
  220. self._ssl_writing_paused = False
  221. self._app_reading_paused = False
  222. self._ssl_reading_paused = False
  223. self._incoming_high_water = 0
  224. self._incoming_low_water = 0
  225. self._set_read_buffer_limits()
  226. self._app_writing_paused = False
  227. self._outgoing_high_water = 0
  228. self._outgoing_low_water = 0
  229. self._set_write_buffer_limits()
  230. cdef _set_app_protocol(self, app_protocol):
  231. self._app_protocol = app_protocol
  232. if (hasattr(app_protocol, 'get_buffer') and
  233. not isinstance(app_protocol, aio_Protocol)):
  234. self._app_protocol_get_buffer = app_protocol.get_buffer
  235. self._app_protocol_buffer_updated = app_protocol.buffer_updated
  236. self._app_protocol_is_buffer = True
  237. else:
  238. self._app_protocol_is_buffer = False
  239. cdef _wakeup_waiter(self, exc=None):
  240. if self._waiter is None:
  241. return
  242. if not self._waiter.cancelled():
  243. if exc is not None:
  244. self._waiter.set_exception(exc)
  245. else:
  246. self._waiter.set_result(None)
  247. self._waiter = None
  248. def _get_app_transport(self):
  249. if self._app_transport is None:
  250. if self._app_transport_created:
  251. raise RuntimeError('Creating _SSLProtocolTransport twice')
  252. self._app_transport = _SSLProtocolTransport(self._loop, self)
  253. self._app_transport_created = True
  254. return self._app_transport
  255. def connection_made(self, transport):
  256. """Called when the low-level connection is made.
  257. Start the SSL handshake.
  258. """
  259. self._transport = transport
  260. self._start_handshake()
  261. def connection_lost(self, exc):
  262. """Called when the low-level connection is lost or closed.
  263. The argument is an exception object or None (the latter
  264. meaning a regular EOF is received or the connection was
  265. aborted or closed).
  266. """
  267. self._write_backlog.clear()
  268. self._outgoing_read()
  269. self._conn_lost += 1
  270. # Just mark the app transport as closed so that its __dealloc__
  271. # doesn't complain.
  272. if self._app_transport is not None:
  273. self._app_transport._closed = True
  274. if self._state != DO_HANDSHAKE:
  275. self._loop.call_soon(self._app_protocol.connection_lost, exc)
  276. self._set_state(UNWRAPPED)
  277. self._transport = None
  278. self._app_transport = None
  279. self._app_protocol = None
  280. self._wakeup_waiter(exc)
  281. if self._shutdown_timeout_handle:
  282. self._shutdown_timeout_handle.cancel()
  283. if self._handshake_timeout_handle:
  284. self._handshake_timeout_handle.cancel()
  285. def get_buffer(self, n):
  286. cdef size_t want = n
  287. if want > SSL_READ_MAX_SIZE:
  288. want = SSL_READ_MAX_SIZE
  289. if self._ssl_buffer_len < want:
  290. self._ssl_buffer = <char*>PyMem_RawRealloc(self._ssl_buffer, want)
  291. if not self._ssl_buffer:
  292. raise MemoryError()
  293. self._ssl_buffer_len = want
  294. self._ssl_buffer_view = PyMemoryView_FromMemory(
  295. self._ssl_buffer, want, PyBUF_WRITE)
  296. return self._ssl_buffer_view
  297. def buffer_updated(self, nbytes):
  298. self._incoming_write(PyMemoryView_FromMemory(
  299. self._ssl_buffer, nbytes, PyBUF_WRITE))
  300. if self._state == DO_HANDSHAKE:
  301. self._do_handshake()
  302. elif self._state == WRAPPED:
  303. self._do_read()
  304. elif self._state == FLUSHING:
  305. self._do_flush()
  306. elif self._state == SHUTDOWN:
  307. self._do_shutdown()
  308. def eof_received(self):
  309. """Called when the other end of the low-level stream
  310. is half-closed.
  311. If this returns a false value (including None), the transport
  312. will close itself. If it returns a true value, closing the
  313. transport is up to the protocol.
  314. """
  315. try:
  316. if self._loop.get_debug():
  317. aio_logger.debug("%r received EOF", self)
  318. if self._state == DO_HANDSHAKE:
  319. self._on_handshake_complete(ConnectionResetError)
  320. elif self._state == WRAPPED:
  321. self._set_state(FLUSHING)
  322. self._do_write()
  323. self._set_state(SHUTDOWN)
  324. self._do_shutdown()
  325. elif self._state == FLUSHING:
  326. self._do_write()
  327. self._set_state(SHUTDOWN)
  328. self._do_shutdown()
  329. elif self._state == SHUTDOWN:
  330. self._do_shutdown()
  331. finally:
  332. self._transport.close()
  333. cdef _get_extra_info(self, name, default=None):
  334. if name in self._extra:
  335. return self._extra[name]
  336. elif self._transport is not None:
  337. return self._transport.get_extra_info(name, default)
  338. else:
  339. return default
  340. cdef _set_state(self, SSLProtocolState new_state):
  341. cdef bint allowed = False
  342. if new_state == UNWRAPPED:
  343. allowed = True
  344. elif self._state == UNWRAPPED and new_state == DO_HANDSHAKE:
  345. allowed = True
  346. elif self._state == DO_HANDSHAKE and new_state == WRAPPED:
  347. allowed = True
  348. elif self._state == WRAPPED and new_state == FLUSHING:
  349. allowed = True
  350. elif self._state == FLUSHING and new_state == SHUTDOWN:
  351. allowed = True
  352. if allowed:
  353. self._state = new_state
  354. else:
  355. raise RuntimeError(
  356. 'cannot switch state from {} to {}'.format(
  357. self._state, new_state))
  358. # Handshake flow
  359. cdef _start_handshake(self):
  360. if self._loop.get_debug():
  361. aio_logger.debug("%r starts SSL handshake", self)
  362. self._handshake_start_time = self._loop.time()
  363. else:
  364. self._handshake_start_time = None
  365. self._set_state(DO_HANDSHAKE)
  366. # start handshake timeout count down
  367. self._handshake_timeout_handle = \
  368. self._loop.call_later(self._ssl_handshake_timeout,
  369. lambda: self._check_handshake_timeout())
  370. try:
  371. self._sslobj = self._sslcontext.wrap_bio(
  372. self._incoming, self._outgoing,
  373. server_side=self._server_side,
  374. server_hostname=self._server_hostname)
  375. self._sslobj_read = self._sslobj.read
  376. self._sslobj_write = self._sslobj.write
  377. except Exception as ex:
  378. self._on_handshake_complete(ex)
  379. else:
  380. self._do_handshake()
  381. cdef _check_handshake_timeout(self):
  382. if self._state == DO_HANDSHAKE:
  383. msg = (
  384. f"SSL handshake is taking longer than "
  385. f"{self._ssl_handshake_timeout} seconds: "
  386. f"aborting the connection"
  387. )
  388. self._fatal_error(ConnectionAbortedError(msg))
  389. cdef _do_handshake(self):
  390. try:
  391. self._sslobj.do_handshake()
  392. except ssl_SSLAgainErrors as exc:
  393. self._process_outgoing()
  394. except ssl_SSLError as exc:
  395. self._on_handshake_complete(exc)
  396. else:
  397. self._on_handshake_complete(None)
  398. cdef _on_handshake_complete(self, handshake_exc):
  399. self._handshake_timeout_handle.cancel()
  400. sslobj = self._sslobj
  401. try:
  402. if handshake_exc is None:
  403. self._set_state(WRAPPED)
  404. else:
  405. raise handshake_exc
  406. peercert = sslobj.getpeercert()
  407. except Exception as exc:
  408. self._set_state(UNWRAPPED)
  409. if isinstance(exc, ssl_CertificateError):
  410. msg = 'SSL handshake failed on verifying the certificate'
  411. else:
  412. msg = 'SSL handshake failed'
  413. self._fatal_error(exc, msg)
  414. return
  415. if self._loop.get_debug():
  416. dt = self._loop.time() - self._handshake_start_time
  417. aio_logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
  418. # Add extra info that becomes available after handshake.
  419. self._extra.update(peercert=peercert,
  420. cipher=sslobj.cipher(),
  421. compression=sslobj.compression(),
  422. ssl_object=sslobj)
  423. if self._call_connection_made:
  424. self._app_protocol.connection_made(self._get_app_transport())
  425. self._wakeup_waiter()
  426. self._do_read()
  427. # Shutdown flow
  428. cdef _start_shutdown(self):
  429. if self._state in (FLUSHING, SHUTDOWN, UNWRAPPED):
  430. return
  431. if self._app_transport is not None:
  432. self._app_transport._closed = True
  433. if self._state == DO_HANDSHAKE:
  434. self._abort(None)
  435. else:
  436. self._set_state(FLUSHING)
  437. self._shutdown_timeout_handle = \
  438. self._loop.call_later(self._ssl_shutdown_timeout,
  439. lambda: self._check_shutdown_timeout())
  440. self._do_flush()
  441. cdef _check_shutdown_timeout(self):
  442. if self._state in (FLUSHING, SHUTDOWN):
  443. self._transport._force_close(
  444. aio_TimeoutError('SSL shutdown timed out'))
  445. cdef _do_flush(self):
  446. if self._write_backlog:
  447. try:
  448. while True:
  449. # data is discarded when FLUSHING
  450. chunk_size = len(self._sslobj_read(SSL_READ_MAX_SIZE))
  451. if not chunk_size:
  452. # close_notify
  453. break
  454. except ssl_SSLAgainErrors as exc:
  455. pass
  456. except ssl_SSLError as exc:
  457. self._on_shutdown_complete(exc)
  458. return
  459. try:
  460. self._do_write()
  461. except Exception as exc:
  462. self._on_shutdown_complete(exc)
  463. return
  464. if not self._write_backlog:
  465. self._set_state(SHUTDOWN)
  466. self._do_shutdown()
  467. cdef _do_shutdown(self):
  468. try:
  469. self._sslobj.unwrap()
  470. except ssl_SSLAgainErrors as exc:
  471. self._process_outgoing()
  472. except ssl_SSLError as exc:
  473. self._on_shutdown_complete(exc)
  474. else:
  475. self._process_outgoing()
  476. self._call_eof_received()
  477. self._on_shutdown_complete(None)
  478. cdef _on_shutdown_complete(self, shutdown_exc):
  479. self._shutdown_timeout_handle.cancel()
  480. if shutdown_exc:
  481. self._fatal_error(shutdown_exc)
  482. else:
  483. self._loop.call_soon(self._transport.close)
  484. cdef _abort(self, exc):
  485. self._set_state(UNWRAPPED)
  486. if self._transport is not None:
  487. self._transport._force_close(exc)
  488. # Outgoing flow
  489. cdef _write_appdata(self, list_of_data):
  490. if self._state in (FLUSHING, SHUTDOWN, UNWRAPPED):
  491. if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  492. aio_logger.warning('SSL connection is closed')
  493. self._conn_lost += 1
  494. return
  495. for data in list_of_data:
  496. self._write_backlog.append(data)
  497. self._write_buffer_size += len(data)
  498. try:
  499. if self._state == WRAPPED:
  500. self._do_write()
  501. except Exception as ex:
  502. self._fatal_error(ex, 'Fatal error on SSL protocol')
  503. cdef _do_write(self):
  504. cdef size_t data_len, count
  505. try:
  506. while self._write_backlog:
  507. data = self._write_backlog[0]
  508. count = self._sslobj_write(data)
  509. data_len = len(data)
  510. if count < data_len:
  511. if not PyMemoryView_Check(data):
  512. data = PyMemoryView_FromObject(data)
  513. self._write_backlog[0] = data[count:]
  514. self._write_buffer_size -= count
  515. else:
  516. del self._write_backlog[0]
  517. self._write_buffer_size -= data_len
  518. except ssl_SSLAgainErrors as exc:
  519. pass
  520. self._process_outgoing()
  521. cdef _process_outgoing(self):
  522. if not self._ssl_writing_paused:
  523. data = self._outgoing_read()
  524. if len(data):
  525. self._transport.write(data)
  526. self._control_app_writing()
  527. # Incoming flow
  528. cdef _do_read(self):
  529. if self._state != WRAPPED:
  530. return
  531. try:
  532. if not self._app_reading_paused:
  533. if self._app_protocol_is_buffer:
  534. self._do_read__buffered()
  535. else:
  536. self._do_read__copied()
  537. if self._write_backlog:
  538. self._do_write()
  539. else:
  540. self._process_outgoing()
  541. self._control_ssl_reading()
  542. except Exception as ex:
  543. self._fatal_error(ex, 'Fatal error on SSL protocol')
  544. cdef _do_read__buffered(self):
  545. cdef:
  546. Py_buffer pybuf
  547. bint pybuf_inited = False
  548. size_t wants, offset = 0
  549. int count = 1
  550. object buf
  551. buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
  552. wants = len(buf)
  553. try:
  554. count = self._sslobj_read(wants, buf)
  555. if count > 0:
  556. offset = count
  557. if offset < wants:
  558. PyObject_GetBuffer(buf, &pybuf, PyBUF_WRITABLE)
  559. pybuf_inited = True
  560. while offset < wants:
  561. buf = PyMemoryView_FromMemory(
  562. (<char*>pybuf.buf) + offset,
  563. wants - offset,
  564. PyBUF_WRITE)
  565. count = self._sslobj_read(wants - offset, buf)
  566. if count > 0:
  567. offset += count
  568. else:
  569. break
  570. else:
  571. self._loop.call_soon(lambda: self._do_read())
  572. except ssl_SSLAgainErrors as exc:
  573. pass
  574. finally:
  575. if pybuf_inited:
  576. PyBuffer_Release(&pybuf)
  577. if offset > 0:
  578. self._app_protocol_buffer_updated(offset)
  579. if not count:
  580. # close_notify
  581. self._call_eof_received()
  582. self._start_shutdown()
  583. cdef _do_read__copied(self):
  584. cdef:
  585. list data
  586. bytes first, chunk = b'1'
  587. bint zero = True, one = False
  588. try:
  589. while True:
  590. chunk = self._sslobj_read(SSL_READ_MAX_SIZE)
  591. if not chunk:
  592. break
  593. if zero:
  594. zero = False
  595. one = True
  596. first = chunk
  597. elif one:
  598. one = False
  599. data = [first, chunk]
  600. else:
  601. data.append(chunk)
  602. except ssl_SSLAgainErrors as exc:
  603. pass
  604. if one:
  605. self._app_protocol.data_received(first)
  606. elif not zero:
  607. self._app_protocol.data_received(b''.join(data))
  608. if not chunk:
  609. # close_notify
  610. self._call_eof_received()
  611. self._start_shutdown()
  612. cdef _call_eof_received(self):
  613. try:
  614. if not self._eof_received:
  615. self._eof_received = True
  616. keep_open = self._app_protocol.eof_received()
  617. if keep_open:
  618. aio_logger.warning('returning true from eof_received() '
  619. 'has no effect when using ssl')
  620. except Exception as ex:
  621. self._fatal_error(ex, 'Error calling eof_received()')
  622. # Flow control for writes from APP socket
  623. cdef _control_app_writing(self):
  624. cdef size_t size = self._get_write_buffer_size()
  625. if size >= self._outgoing_high_water and not self._app_writing_paused:
  626. self._app_writing_paused = True
  627. try:
  628. self._app_protocol.pause_writing()
  629. except Exception as exc:
  630. self._loop.call_exception_handler({
  631. 'message': 'protocol.pause_writing() failed',
  632. 'exception': exc,
  633. 'transport': self._app_transport,
  634. 'protocol': self,
  635. })
  636. elif size <= self._outgoing_low_water and self._app_writing_paused:
  637. self._app_writing_paused = False
  638. try:
  639. self._app_protocol.resume_writing()
  640. except Exception as exc:
  641. self._loop.call_exception_handler({
  642. 'message': 'protocol.resume_writing() failed',
  643. 'exception': exc,
  644. 'transport': self._app_transport,
  645. 'protocol': self,
  646. })
  647. cdef size_t _get_write_buffer_size(self):
  648. return self._outgoing.pending + self._write_buffer_size
  649. cdef _set_write_buffer_limits(self, high=None, low=None):
  650. high, low = add_flowcontrol_defaults(
  651. high, low, FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
  652. self._outgoing_high_water = high
  653. self._outgoing_low_water = low
  654. # Flow control for reads to APP socket
  655. cdef _pause_reading(self):
  656. self._app_reading_paused = True
  657. cdef _resume_reading(self):
  658. if self._app_reading_paused:
  659. self._app_reading_paused = False
  660. def resume():
  661. if self._state == WRAPPED:
  662. self._do_read()
  663. elif self._state == FLUSHING:
  664. self._do_flush()
  665. elif self._state == SHUTDOWN:
  666. self._do_shutdown()
  667. self._loop.call_soon(resume)
  668. # Flow control for reads from SSL socket
  669. cdef _control_ssl_reading(self):
  670. cdef size_t size = self._get_read_buffer_size()
  671. if size >= self._incoming_high_water and not self._ssl_reading_paused:
  672. self._ssl_reading_paused = True
  673. self._transport.pause_reading()
  674. elif size <= self._incoming_low_water and self._ssl_reading_paused:
  675. self._ssl_reading_paused = False
  676. self._transport.resume_reading()
  677. cdef _set_read_buffer_limits(self, high=None, low=None):
  678. high, low = add_flowcontrol_defaults(
  679. high, low, FLOW_CONTROL_HIGH_WATER_SSL_READ)
  680. self._incoming_high_water = high
  681. self._incoming_low_water = low
  682. cdef size_t _get_read_buffer_size(self):
  683. return self._incoming.pending
  684. # Flow control for writes to SSL socket
  685. def pause_writing(self):
  686. """Called when the low-level transport's buffer goes over
  687. the high-water mark.
  688. """
  689. assert not self._ssl_writing_paused
  690. self._ssl_writing_paused = True
  691. def resume_writing(self):
  692. """Called when the low-level transport's buffer drains below
  693. the low-water mark.
  694. """
  695. assert self._ssl_writing_paused
  696. self._ssl_writing_paused = False
  697. self._process_outgoing()
  698. cdef _fatal_error(self, exc, message='Fatal error on transport'):
  699. if self._transport:
  700. self._transport._force_close(exc)
  701. if isinstance(exc, OSError):
  702. if self._loop.get_debug():
  703. aio_logger.debug("%r: %s", self, message, exc_info=True)
  704. elif not isinstance(exc, aio_CancelledError):
  705. self._loop.call_exception_handler({
  706. 'message': message,
  707. 'exception': exc,
  708. 'transport': self._transport,
  709. 'protocol': self,
  710. })