|
- from .._receivebuffer import ReceiveBuffer
-
- def test_receivebuffer():
- b = ReceiveBuffer()
- assert not b
- assert len(b) == 0
- assert bytes(b) == b""
-
- b += b"123"
- assert b
- assert len(b) == 3
- assert bytes(b) == b"123"
-
- b.compress()
- assert bytes(b) == b"123"
-
- assert b.maybe_extract_at_most(2) == b"12"
- assert b
- assert len(b) == 1
- assert bytes(b) == b"3"
-
- b.compress()
- assert bytes(b) == b"3"
-
- assert b.maybe_extract_at_most(10) == b"3"
- assert bytes(b) == b""
-
- assert b.maybe_extract_at_most(10) is None
- assert not b
-
- ################################################################
- # maybe_extract_until_next
- ################################################################
-
- b += b"12345a6789aa"
-
- assert b.maybe_extract_until_next(b"a") == b"12345a"
- assert bytes(b) == b"6789aa"
-
- assert b.maybe_extract_until_next(b"aaa") is None
- assert bytes(b) == b"6789aa"
-
- b += b"a12"
- assert b.maybe_extract_until_next(b"aaa") == b"6789aaa"
- assert bytes(b) == b"12"
-
- # check repeated searches for the same needle, triggering the
- # pickup-where-we-left-off logic
- b += b"345"
- assert b.maybe_extract_until_next(b"aaa") is None
-
- b += b"6789aaa123"
- assert b.maybe_extract_until_next(b"aaa") == b"123456789aaa"
- assert bytes(b) == b"123"
-
- ################################################################
- # maybe_extract_lines
- ################################################################
-
- b += b"\r\na: b\r\nfoo:bar\r\n\r\ntrailing"
- lines = b.maybe_extract_lines()
- assert lines == [b"123", b"a: b", b"foo:bar"]
- assert bytes(b) == b"trailing"
-
- assert b.maybe_extract_lines() is None
-
- b += b"\r\n\r"
- assert b.maybe_extract_lines() is None
-
- assert b.maybe_extract_at_most(100) == b"trailing\r\n\r"
- assert not b
-
- # Empty body case (as happens at the end of chunked encoding if there are
- # no trailing headers, e.g.)
- b += b"\r\ntrailing"
- assert b.maybe_extract_lines() == []
- assert bytes(b) == b"trailing"
|