Patchwork D3303: cborutil: implement support for indefinite length CBOR types

login
register
mail settings
Submitter phabricator
Date April 13, 2018, 6:26 a.m.
Message ID <differential-rev-PHID-DREV-vzwobvtqcptohfe2iugq-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/30846/
State Superseded
Headers show

Comments

phabricator - April 13, 2018, 6:26 a.m.
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  The vendored cbor2 package doesn't have support for streaming /
  indefinite length items when encoding. On the decoding side, it
  supported indefinite types. However, it waits for all data to arrive
  before emitting a result. This is kind of unfortunate because
  indefinite length items facilitate streaming without buffering.
  
  This commit implements support for encoding indefinite length
  bytestrings, arrays, and maps. It implements support for decoding
  indefinite length bytestrings.
  
  I strived to use generators for moving data around as much as
  possible because they are much efficient than read()/write()
  because no extra memory copying, allocation, concatenations,
  buffering, etc occur unless the producer/consumer needs it to.
  This helps keep things fast.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

AFFECTED FILES
  contrib/import-checker.py
  mercurial/utils/cborutil.py
  tests/test-cbor.py

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel
phabricator - April 13, 2018, 12:34 p.m.
yuja added inline comments.

INLINE COMMENTS

> cborutil.py:73
> +    beginindefinitearray(encoder)
> +    yield writeitem
> +    encoder.write(BREAK)

I don't think yielding `encoder.encode` would make much sense
because an array item can also be a nested indefinite array, in
which case, we can't use `writeitem()`.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 13, 2018, 8 p.m.
indygreg added inline comments.

INLINE COMMENTS

> yuja wrote in cborutil.py:73
> I don't think yielding `encoder.encode` would make much sense
> because an array item can also be a nested indefinite array, in
> which case, we can't use `writeitem()`.

Indeed.

Proper support for nesting will likely require a whole new high-level encoder API. Because state of the nesting needs to be tracked somewhere.

FWIW, the more I'm looking at the CBOR code, the more I'm thinking we will end up having to reinvent the full wheel. Not-yet-submitted commits to add wire protocol commands to do CBOR things are spending a *ton* of time in cbor2. The reason appears to be primarily driven by cbor2's insistence on using `write()`. There are a few places where we need to emit a generator of chunks. And the overhead from instantiating `io.BytesIO` instances to handle the `write()` from cbor2 only to call `getvalue()` to retrieve the data is non-trivial.

The next version of this may just invent a whole new CBOR encoder with only limited support for types. Or at least I'll change the API so a streaming array doesn't require an encoder be passed in.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 13, 2018, 9:35 p.m.
indygreg added a comment.


  I ended up implementing my own CBOR encoder for a starting subset of types. Profiling the wire protocol server inspired me to do this.
  
  I have a future wire protocol command that emits the fulltext data of every file in a revision. It was taking ~45s CPU to run. After plugging in the new generator based CBOR encoder, that drops to ~31s. All pure Python still too.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 14, 2018, 12:13 a.m.
indygreg planned changes to this revision.
indygreg added a comment.


  I'll probably give this one a little bit more love before I ask for review. We need to at least implement support encoding floats in order to support `-Tcbor`.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 14, 2018, 3:18 a.m.
yuja added a comment.


  > FWIW, the more I'm looking at the CBOR code, the more I'm thinking
  >  we will end up having to reinvent the full wheel.
  
  Sounds reasonable to me.
  
  > We need to at least implement support encoding floats in order to
  >  support -Tcbor.
  
  If it's just for date tuple, I have a patch to get rid of floating-point timestamps
  as our timestamps should be effectively ints.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 14, 2018, 3:15 p.m.
indygreg added a comment.


  In https://phab.mercurial-scm.org/D3303#53422, @yuja wrote:
  
  > If it's just for date tuple, I have a patch to get rid of floating-point timestamps
  >  as our timestamps should be effectively ints.
  
  
  I believe that's the only place we use floats. Or at least the only place I saw it in test failures.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel
phabricator - April 16, 2018, 1:06 p.m.
yuja added a comment.


  Queued, thanks.

INLINE COMMENTS

> cborutil.py:206
> +    """
> +    fn = STREAM_ENCODERS.get(v.__class__)
> +

Nit: We might have to support subtypes such as util.sortdict.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

To: indygreg, #hg-reviewers
Cc: yuja, mercurial-devel

Patch

diff --git a/tests/test-cbor.py b/tests/test-cbor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-cbor.py
@@ -0,0 +1,235 @@ 
+from __future__ import absolute_import
+
+import io
+import unittest
+
+from mercurial.thirdparty import (
+    cbor,
+)
+from mercurial.utils import (
+    cborutil,
+)
+
+class IndefiniteBytestringTests(unittest.TestCase):
+    def testitertoiter(self):
+        # This is the example from RFC 7049 Section 2.2.2.
+        source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
+
+        it = cborutil.itertoindefinitebytestring(source)
+
+        self.assertEqual(next(it), b'\x5f')
+        self.assertEqual(next(it), b'\x44')
+        self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+        self.assertEqual(next(it), b'\x43')
+        self.assertEqual(next(it), b'\xee\xff\x99')
+        self.assertEqual(next(it), b'\xff')
+
+        with self.assertRaises(StopIteration):
+            next(it)
+
+        dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+        self.assertEqual(cbor.loads(dest), b''.join(source))
+
+    def testreadtoiter(self):
+        source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff')
+
+        it = cborutil.readindefinitebytestringtoiter(source)
+        self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+        self.assertEqual(next(it), b'\xee\xff\x99')
+
+        with self.assertRaises(StopIteration):
+            next(it)
+
+    def testtoiterlarge(self):
+        source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
+
+        dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+        self.assertEqual(cbor.loads(dest), b''.join(source))
+
+    def testbuffertoindefinite(self):
+        source = b'\x00\x01\x02\x03' + b'\xff' * 16384
+
+        it = cborutil.buffertoindefinitebytestring(source, chunksize=2)
+
+        self.assertEqual(next(it), b'\x5f')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x00\x01')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x02\x03')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\xff\xff')
+
+    def testbuffertoindefiniteroundtrip(self):
+        source = b'x' * 1048576
+
+        chunks = list(cborutil.buffertoindefinitebytestring(source))
+        self.assertEqual(len(chunks), 34)
+
+        self.assertEqual(cbor.loads(b''.join(chunks)), source)
+
+class StreamArrayTests(unittest.TestCase):
+    def testempty(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder):
+            pass
+
+        self.assertEqual(b.getvalue(), '\x9f\xff')
+        self.assertEqual(cbor.loads(b.getvalue()), [])
+
+    def testone(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder) as fn:
+            fn(b'foo')
+
+        self.assertEqual(cbor.loads(b.getvalue()), [b'foo'])
+
+    def testmultiple(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder) as fn:
+            fn(0)
+            fn(True)
+            fn(b'foo')
+            fn(None)
+
+        self.assertEqual(cbor.loads(b.getvalue()), [0, True, b'foo', None])
+
+    def testnested(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder):
+            with cborutil.streamarray(encoder) as fn:
+                fn(b'foo')
+                fn(b'bar')
+
+        self.assertEqual(cbor.loads(b.getvalue()), [[b'foo', b'bar']])
+
+    def testitemslist(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        orig = [b'foo', b'bar', None, True, 42]
+
+        cborutil.streamarrayitems(encoder, orig)
+        self.assertEqual(cbor.loads(b.getvalue()), orig)
+
+    def testitemsgen(self):
+        def makeitems():
+            yield b'foo'
+            yield b'bar'
+            yield None
+            yield 42
+
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        cborutil.streamarrayitems(encoder, makeitems())
+        self.assertEqual(cbor.loads(b.getvalue()), [b'foo', b'bar', None, 42])
+
+class StreamMapTests(unittest.TestCase):
+    def testempty(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder):
+            pass
+
+        self.assertEqual(b.getvalue(), '\xbf\xff')
+        self.assertEqual(cbor.loads(b.getvalue()), {})
+
+    def testone(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(b'key1', b'value1')
+
+        self.assertEqual(cbor.loads(b.getvalue()), {b'key1': b'value1'})
+
+    def testmultiple(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(0, 1)
+            fn(b'key1', b'value1')
+            fn(True, None)
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            0: 1,
+            b'key1': b'value1',
+            True: None,
+        })
+
+    def testcomplex(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(b'key1', b'value1')
+            fn(b'map', {b'inner1key': b'inner1value'})
+            fn(b'array', [0, 1, 2])
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'key1': b'value1',
+            b'map': {b'inner1key': b'inner1value'},
+            b'array': [0, 1, 2],
+        })
+
+    def testnested(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder):
+            encoder.encode(b'streamkey')
+            with cborutil.streammap(encoder) as fn2:
+                fn2(b'inner1key', b'inner1value')
+                fn2(0, 1)
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'streamkey': {
+                b'inner1key': b'inner1value',
+                0: 1,
+            },
+        })
+
+    def testitemsdict(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        orig = [
+            (b'foo', b'bar'),
+            (42, 19),
+            (None, True),
+        ]
+
+        cborutil.streammapitems(encoder, orig)
+        self.assertEqual(cbor.loads(b.getvalue()), dict(orig))
+
+    def testitemsgen(self):
+        def makeitems():
+            yield b'foo', b'bar'
+            yield None, True
+            yield 42, 19
+
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        cborutil.streammapitems(encoder, makeitems())
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'foo': b'bar',
+            None: True,
+            42: 19,
+        })
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
diff --git a/mercurial/utils/cborutil.py b/mercurial/utils/cborutil.py
new file mode 100644
--- /dev/null
+++ b/mercurial/utils/cborutil.py
@@ -0,0 +1,206 @@ 
+# cborutil.py - CBOR extensions
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import contextlib
+import struct
+
+from ..thirdparty.cbor.cbor2 import (
+    decoder as decodermod,
+    encoder as encodermod,
+)
+
+# Very short very of RFC 7049...
+#
+# Each item begins with a byte. The 3 high bits of that byte denote the
+# "major type." The lower 5 bits denote the "subtype." Each major type
+# has its own encoding mechanism.
+#
+# Most types have lengths. However, bytestring, string, array, and map
+# can be indefinite length. These are denotes by a subtype with value 31.
+# Sub-components of those types then come afterwards and are terminated
+# by a "break" byte.
+
+MAJOR_TYPE_UINT = 0
+MAJOR_TYPE_NEGINT = 1
+MAJOR_TYPE_BYTESTRING = 2
+MAJOR_TYPE_STRING = 3
+MAJOR_TYPE_ARRAY = 4
+MAJOR_TYPE_MAP = 5
+MAJOR_TYPE_SEMANTIC = 6
+MAJOR_TYPE_SPECIAL = 7
+
+SUBTYPE_MASK = 0b00011111
+
+SUBTYPE_INDEFINITE = 31
+
+# Indefinite types begin with their major type ORd with information value 31.
+BEGIN_INDEFINITE_BYTESTRING = struct.pack(
+    r'>B', MAJOR_TYPE_BYTESTRING << 5 | SUBTYPE_INDEFINITE)
+BEGIN_INDEFINITE_ARRAY = struct.pack(
+    r'>B', MAJOR_TYPE_ARRAY << 5 | SUBTYPE_INDEFINITE)
+BEGIN_INDEFINITE_MAP = struct.pack(
+    r'>B', MAJOR_TYPE_MAP << 5 | SUBTYPE_INDEFINITE)
+
+# The break ends an indefinite length item.
+BREAK = b'\xff'
+BREAK_INT = 255
+
+def beginindefinitearray(encoder):
+    encoder.write(BEGIN_INDEFINITE_ARRAY)
+
+def beginindefinitemap(encoder):
+    encoder.write(BEGIN_INDEFINITE_MAP)
+
+@contextlib.contextmanager
+def streamarray(encoder):
+    """Write an array in a streaming manner.
+
+    Used as a context manager, the context manager resolves to a function
+    that should be called for each item to write to the array.
+
+    When the context manager exits, the indefinite length array is ended.
+    """
+    def writeitem(value):
+        encoder.encode(value)
+
+    beginindefinitearray(encoder)
+    yield writeitem
+    encoder.write(BREAK)
+
+def streamarrayitems(encoder, items):
+    """Write out an iterable of items to a streaming array."""
+    with streamarray(encoder) as fn:
+        for value in items:
+            fn(value)
+
+@contextlib.contextmanager
+def streammap(encoder):
+    """Write a map in a streaming manner.
+
+    Used as a context manager, the context manager resolves to a function
+    that should be called with a key and value of each map item to write.
+
+    When the context manager exits, the indefinite length map is ended.
+
+    If is possible to nest streaming data structures. If the caller writes
+    out 2 values, the first value will be interpreted as a key and the second
+    a value. So a caller could do something like::
+
+        with streammap(encoder):
+            encoder.encode(b'mykey')
+            with streammap(encoder) as fn:
+                fn(b'innerkey', b'value')
+
+    This would decode to ``{b'mykey': {b'innerkey': b'value'}}``.
+    """
+    def writeitem(key, value):
+        encoder.encode(key)
+        encoder.encode(value)
+
+    beginindefinitemap(encoder)
+    yield writeitem
+    encoder.write(BREAK)
+
+def streammapitems(encoder, items):
+    """Write out an iterable of (key, value) items to a streaming map."""
+    with streammap(encoder) as fn:
+        for key, value in items:
+            fn(key, value)
+
+def itertoindefinitebytestring(it):
+    """Convert an iterator of chunks to an indefinite bytestring.
+
+    Given an input that is iterable and each element in the iterator is
+    representable as bytes, emit an indefinite length bytestring.
+    """
+    # Alias for performance.
+    encodelen = encodermod.encode_length
+    bytestringmajor = MAJOR_TYPE_BYTESTRING << 5
+
+    yield BEGIN_INDEFINITE_BYTESTRING
+
+    for chunk in it:
+        yield encodelen(bytestringmajor, len(chunk))
+        yield chunk
+
+    yield BREAK
+
+def buffertoindefinitebytestring(source, chunksize=65536):
+    """Given a large source buffer, emit as an indefinite length bytestring.
+
+    This is a generator of chunks constituting the encoded CBOR data.
+    """
+    # Alias for performance.
+    encodelen = encodermod.encode_length
+    bytestringmajor = MAJOR_TYPE_BYTESTRING << 5
+
+    yield BEGIN_INDEFINITE_BYTESTRING
+
+    i = 0
+    l = len(source)
+
+    while True:
+        chunk = source[i:i + chunksize]
+        i += len(chunk)
+
+        yield encodelen(bytestringmajor, len(chunk))
+        yield chunk
+
+        if i >= l:
+            break
+
+    yield BREAK
+
+def readindefinitebytestringtoiter(fh, expectheader=True):
+    """Read an indefinite bytestring to a generator.
+
+    Receives an object with a ``read(X)`` method to read N bytes.
+
+    If ``expectheader`` is True, it is expected that the first byte read
+    will represent an indefinite length bytestring. Otherwise, we
+    expect the first byte to be part of the first bytestring chunk.
+    """
+    read = fh.read
+    decodeuint = decodermod.decode_uint
+    byteasinteger = decodermod.byte_as_integer
+
+    if expectheader:
+        initial = decodermod.byte_as_integer(read(1))
+
+        majortype = initial >> 5
+        subtype = initial & SUBTYPE_MASK
+
+        if majortype != MAJOR_TYPE_BYTESTRING:
+            raise decodermod.CBORDecodeError(
+                'expected major type %d; got %d' % (MAJOR_TYPE_BYTESTRING,
+                                                    majortype))
+
+        if subtype != SUBTYPE_INDEFINITE:
+            raise decodermod.CBORDecodeError(
+                'expected indefinite subtype; got %d' % subtype)
+
+    # The indefinite bytestring is composed of chunks of normal bytestrings.
+    # Read chunks until we hit a BREAK byte.
+
+    while True:
+        # We need to sniff for the BREAK byte.
+        initial = byteasinteger(read(1))
+
+        if initial == BREAK_INT:
+            break
+
+        length = decodeuint(fh, initial & SUBTYPE_MASK)
+        chunk = read(length)
+
+        if len(chunk) != length:
+            raise decodermod.CBORDecodeError(
+                'failed to read bytestring chunk: got %d bytes; expected %d' % (
+                    len(chunk), length))
+
+        yield chunk
diff --git a/contrib/import-checker.py b/contrib/import-checker.py
--- a/contrib/import-checker.py
+++ b/contrib/import-checker.py
@@ -36,6 +36,8 @@ 
     'mercurial.pure.parsers',
     # third-party imports should be directly imported
     'mercurial.thirdparty',
+    'mercurial.thirdparty.cbor',
+    'mercurial.thirdparty.cbor.cbor2',
     'mercurial.thirdparty.zope',
     'mercurial.thirdparty.zope.interface',
 )