Patchwork D2852: wireproto: implement basic frame reading and processing

login
register
mail settings
Submitter phabricator
Date March 20, 2018, 12:01 a.m.
Message ID <e4be8c82242789107a6fbb4415b20c61@localhost.localdomain>
Download mbox | patch
Permalink /patch/29654/
State Not Applicable
Headers show

Comments

phabricator - March 20, 2018, 12:01 a.m.
indygreg updated this revision to Diff 7140.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2852?vs=7048&id=7140

REVISION DETAIL
  https://phab.mercurial-scm.org/D2852

AFFECTED FILES
  mercurial/configitems.py
  mercurial/util.py
  mercurial/wireprotoframing.py
  mercurial/wireprotoserver.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-wireproto-serverreactor.py
@@ -0,0 +1,275 @@ 
+from __future__ import absolute_import, print_function
+
+import unittest
+
+from mercurial import (
+    util,
+    wireprotoframing as framing,
+)
+
+ffs = framing.makeframefromhumanstring
+
+def makereactor():
+    return framing.serverreactor()
+
+def sendframes(reactor, gen):
+    """Send a generator of frame bytearray to a reactor.
+
+    Emits a generator of results from ``onframerecv()`` calls.
+    """
+    for frame in gen:
+        frametype, frameflags, framelength = framing.parseheader(frame)
+        payload = frame[framing.FRAME_HEADER_SIZE:]
+        assert len(payload) == framelength
+
+        yield reactor.onframerecv(frametype, frameflags, payload)
+
+def sendcommandframes(reactor, cmd, args, datafh=None):
+    """Generate frames to run a command and send them to a reactor."""
+    return sendframes(reactor, framing.createcommandframes(cmd, args, datafh))
+
+class FrameTests(unittest.TestCase):
+    def testdataexactframesize(self):
+        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
+
+        frames = list(framing.createcommandframes(b'command', {}, data))
+        self.assertEqual(frames, [
+            ffs(b'command-name have-data command'),
+            ffs(b'command-data continuation %s' % data.getvalue()),
+            ffs(b'command-data eos ')
+        ])
+
+    def testdatamultipleframes(self):
+        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
+        frames = list(framing.createcommandframes(b'command', {}, data))
+        self.assertEqual(frames, [
+            ffs(b'command-name have-data command'),
+            ffs(b'command-data continuation %s' % (
+                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
+            ffs(b'command-data eos x'),
+        ])
+
+    def testargsanddata(self):
+        data = util.bytesio(b'x' * 100)
+
+        frames = list(framing.createcommandframes(b'command', {
+            b'key1': b'key1value',
+            b'key2': b'key2value',
+            b'key3': b'key3value',
+        }, data))
+
+        self.assertEqual(frames, [
+            ffs(b'command-name have-args|have-data command'),
+            ffs(br'command-argument 0 \x04\x00\x09\x00key1key1value'),
+            ffs(br'command-argument 0 \x04\x00\x09\x00key2key2value'),
+            ffs(br'command-argument eoa \x04\x00\x09\x00key3key3value'),
+            ffs(b'command-data eos %s' % data.getvalue()),
+        ])
+
+class ServerReactorTests(unittest.TestCase):
+    def _sendsingleframe(self, reactor, s):
+        results = list(sendframes(reactor, [ffs(s)]))
+        self.assertEqual(len(results), 1)
+
+        return results[0]
+
+    def assertaction(self, res, expected):
+        self.assertIsInstance(res, tuple)
+        self.assertEqual(len(res), 2)
+        self.assertIsInstance(res[1], dict)
+        self.assertEqual(res[0], expected)
+
+    def test1framecommand(self):
+        """Receiving a command in a single frame yields request to run it."""
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand', {}))
+        self.assertEqual(len(results), 1)
+        self.assertaction(results[0], 'runcommand')
+        self.assertEqual(results[0][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': None,
+        })
+
+    def test1argument(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand',
+                                         {b'foo': b'bar'}))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'runcommand')
+        self.assertEqual(results[1][1], {
+            'command': b'mycommand',
+            'args': {b'foo': b'bar'},
+            'data': None,
+        })
+
+    def testmultiarguments(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand',
+                                         {b'foo': b'bar', b'biz': b'baz'}))
+        self.assertEqual(len(results), 3)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'wantframe')
+        self.assertaction(results[2], 'runcommand')
+        self.assertEqual(results[2][1], {
+            'command': b'mycommand',
+            'args': {b'foo': b'bar', b'biz': b'baz'},
+            'data': None,
+        })
+
+    def testsimplecommanddata(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand', {},
+                                         util.bytesio(b'data!')))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'runcommand')
+        self.assertEqual(results[1][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': b'data!',
+        })
+
+    def testmultipledataframes(self):
+        frames = [
+            ffs(b'command-name have-data mycommand'),
+            ffs(b'command-data continuation data1'),
+            ffs(b'command-data continuation data2'),
+            ffs(b'command-data eos data3'),
+        ]
+
+        reactor = makereactor()
+        results = list(sendframes(reactor, frames))
+        self.assertEqual(len(results), 4)
+        for i in range(3):
+            self.assertaction(results[i], 'wantframe')
+        self.assertaction(results[3], 'runcommand')
+        self.assertEqual(results[3][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': b'data1data2data3',
+        })
+
+    def testargumentanddata(self):
+        frames = [
+            ffs(b'command-name have-args|have-data command'),
+            ffs(br'command-argument 0 \x03\x00\x03\x00keyval'),
+            ffs(br'command-argument eoa \x03\x00\x03\x00foobar'),
+            ffs(b'command-data continuation value1'),
+            ffs(b'command-data eos value2'),
+        ]
+
+        reactor = makereactor()
+        results = list(sendframes(reactor, frames))
+
+        self.assertaction(results[-1], 'runcommand')
+        self.assertEqual(results[-1][1], {
+            'command': b'command',
+            'args': {
+                b'key': b'val',
+                b'foo': b'bar',
+            },
+            'data': b'value1value2',
+        })
+
+    def testunexpectedcommandargument(self):
+        """Command argument frame when not running a command is an error."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-argument 0 ignored')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'expected command frame; got 2',
+        })
+
+    def testunexpectedcommanddata(self):
+        """Command argument frame when not running a command is an error."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-data 0 ignored')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'expected command frame; got 3',
+        })
+
+    def testmissingcommandframeflags(self):
+        """Command name frame must have flags set."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-name 0 command')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'missing frame flags on command frame',
+        })
+
+    def testmissingargumentframe(self):
+        frames = [
+            ffs(b'command-name have-args command'),
+            ffs(b'command-name 0 ignored'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'expected command argument frame; got 1',
+        })
+
+    def testincompleteargumentname(self):
+        """Argument frame with incomplete name."""
+        frames = [
+            ffs(b'command-name have-args command1'),
+            ffs(br'command-argument eoa \x04\x00\xde\xadfoo'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'malformed argument frame: partial argument name',
+        })
+
+    def testincompleteargumentvalue(self):
+        """Argument frame with incomplete value."""
+        frames = [
+            ffs(b'command-name have-args command'),
+            ffs(br'command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'malformed argument frame: partial argument value',
+        })
+
+    def testmissingcommanddataframe(self):
+        frames = [
+            ffs(b'command-name have-data command1'),
+            ffs(b'command-name eos command2'),
+        ]
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'expected command data frame; got 1',
+        })
+
+    def testmissingcommanddataframeflags(self):
+        frames = [
+            ffs(b'command-name have-data command1'),
+            ffs(b'command-data 0 data'),
+        ]
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'command data frame without flags',
+        })
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -276,7 +276,7 @@ 
   > allow-push = *
   > EOF
 
-  $ hg -R server serve -p $HGPORT -d --pid-file hg.pid
+  $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log
   $ cat hg.pid > $DAEMON_PIDS
 
 Authorized request for valid read-write command works
@@ -329,3 +329,78 @@ 
   s>     Content-Length: 42\r\n
   s>     \r\n
   s>     unknown wire protocol command: badcommand\n
+
+debugreflect isn't enabled by default
+
+  $ send << EOF
+  > httprequest POST api/$HTTPV2/ro/debugreflect
+  >     user-agent: test
+  > EOF
+  using raw connection to peer
+  s>     POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     user-agent: test\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 404 Not Found\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: text/plain\r\n
+  s>     Content-Length: 34\r\n
+  s>     \r\n
+  s>     debugreflect service not available
+
+Restart server to get debugreflect endpoint
+
+  $ killdaemons.py
+  $ cat > server/.hg/hgrc << EOF
+  > [experimental]
+  > web.apiserver = true
+  > web.api.debugreflect = true
+  > web.api.http-v2 = true
+  > [web]
+  > push_ssl = false
+  > allow-push = *
+  > EOF
+
+  $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log
+  $ cat hg.pid > $DAEMON_PIDS
+
+Command frames can be reflected via debugreflect
+
+  $ send << EOF
+  > httprequest POST api/$HTTPV2/ro/debugreflect
+  >     accept: $MEDIATYPE
+  >     content-type: $MEDIATYPE
+  >     user-agent: test
+  >     frame command-name have-args command1
+  >     frame command-argument 0 \x03\x00\x04\x00fooval1
+  >     frame command-argument eoa \x04\x00\x03\x00bar1val
+  > EOF
+  using raw connection to peer
+  s>     POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-exp-framing-0001\r\n
+  s>     content-type: application/mercurial-exp-framing-0001\r\n
+  s>     user-agent: test\r\n
+  s>     content-length: 42\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     \r\n
+  s>     \x08\x00\x00\x12command1\x0b\x00\x00 \x03\x00\x04\x00fooval1\x0b\x00\x00"\x04\x00\x03\x00bar1val
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: text/plain\r\n
+  s>     Content-Length: 291\r\n
+  s>     \r\n
+  s>     received: 1 2 command1\n
+  s>     ["wantframe", {"state": "command-receiving-args"}]\n
+  s>     received: 2 0 \x03\x00\x04\x00fooval1\n
+  s>     ["wantframe", {"state": "command-receiving-args"}]\n
+  s>     received: 2 2 \x04\x00\x03\x00bar1val\n
+  s>     ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n
+  s>     received: <no frame>
+
+  $ cat error.log
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -19,6 +19,7 @@ 
     pycompat,
     util,
     wireproto,
+    wireprotoframing,
     wireprototypes,
 )
 
@@ -319,6 +320,11 @@ 
         res.setbodybytes('permission denied')
         return
 
+    # We have a special endpoint to reflect the request back at the client.
+    if command == b'debugreflect':
+        _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
+        return
+
     if command not in wireproto.commands:
         res.status = b'404 Not Found'
         res.headers[b'Content-Type'] = b'text/plain'
@@ -343,8 +349,7 @@ 
                            % FRAMINGTYPE)
         return
 
-    if (b'Content-Type' in req.headers
-        and req.headers[b'Content-Type'] != FRAMINGTYPE):
+    if req.headers.get(b'Content-Type') != FRAMINGTYPE:
         res.status = b'415 Unsupported Media Type'
         # TODO we should send a response with appropriate media type,
         # since client does Accept it.
@@ -358,6 +363,49 @@ 
     res.headers[b'Content-Type'] = b'text/plain'
     res.setbodybytes(b'/'.join(urlparts) + b'\n')
 
+def _processhttpv2reflectrequest(ui, repo, req, res):
+    """Reads unified frame protocol request and dumps out state to client.
+
+    This special endpoint can be used to help debug the wire protocol.
+
+    Instead of routing the request through the normal dispatch mechanism,
+    we instead read all frames, decode them, and feed them into our state
+    tracker. We then dump the log of all that activity back out to the
+    client.
+    """
+    import json
+
+    # Reflection APIs have a history of being abused, accidentally disclosing
+    # sensitive data, etc. So we have a config knob.
+    if not ui.configbool('experimental', 'web.api.debugreflect'):
+        res.status = b'404 Not Found'
+        res.headers[b'Content-Type'] = b'text/plain'
+        res.setbodybytes(_('debugreflect service not available'))
+        return
+
+    # We assume we have a unified framing protocol request body.
+
+    reactor = wireprotoframing.serverreactor()
+    states = []
+
+    while True:
+        frame = wireprotoframing.readframe(req.bodyfh)
+
+        if not frame:
+            states.append(b'received: <no frame>')
+            break
+
+        frametype, frameflags, payload = frame
+        states.append(b'received: %d %d %s' % (frametype, frameflags, payload))
+
+        action, meta = reactor.onframerecv(frametype, frameflags, payload)
+        states.append(json.dumps((action, meta), sort_keys=True,
+                                 separators=(', ', ': ')))
+
+    res.status = b'200 OK'
+    res.headers[b'Content-Type'] = b'text/plain'
+    res.setbodybytes(b'\n'.join(states))
+
 # Maps API name to metadata so custom API can be registered.
 API_HANDLERS = {
     HTTPV2: {
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -13,7 +13,9 @@ 
 
 import struct
 
+from .i18n import _
 from . import (
+    error,
     util,
 )
 
@@ -105,6 +107,51 @@ 
 
     return makeframe(frametype, finalflags, payload)
 
+def parseheader(data):
+    """Parse a unified framing protocol frame header from a buffer.
+
+    The header is expected to be in the buffer at offset 0 and the
+    buffer is expected to be large enough to hold a full header.
+    """
+    # 24 bits payload length (little endian)
+    # 4 bits frame type
+    # 4 bits frame flags
+    # ... payload
+    framelength = data[0] + 256 * data[1] + 16384 * data[2]
+    typeflags = data[3]
+
+    frametype = (typeflags & 0xf0) >> 4
+    frameflags = typeflags & 0x0f
+
+    return frametype, frameflags, framelength
+
+def readframe(fh):
+    """Read a unified framing protocol frame from a file object.
+
+    Returns a 3-tuple of (type, flags, payload) for the decoded frame or
+    None if no frame is available. May raise if a malformed frame is
+    seen.
+    """
+    header = bytearray(FRAME_HEADER_SIZE)
+
+    readcount = fh.readinto(header)
+
+    if readcount == 0:
+        return None
+
+    if readcount != FRAME_HEADER_SIZE:
+        raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
+                          (readcount, header))
+
+    frametype, frameflags, framelength = parseheader(header)
+
+    payload = fh.read(framelength)
+    if len(payload) != framelength:
+        raise error.Abort(_('frame length error: expected %d; got %d') %
+                          (framelength, len(payload)))
+
+    return frametype, frameflags, payload
+
 def createcommandframes(cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
 
@@ -154,3 +201,195 @@ 
 
             if done:
                 break
+
+class serverreactor(object):
+    """Holds state of a server handling frame-based protocol requests.
+
+    This class is the "brain" of the unified frame-based protocol server
+    component. While the protocol is stateless from the perspective of
+    requests/commands, something needs to track which frames have been
+    received, what frames to expect, etc. This class is that thing.
+
+    Instances are modeled as a state machine of sorts. Instances are also
+    reactionary to external events. The point of this class is to encapsulate
+    the state of the connection and the exchange of frames, not to perform
+    work. Instead, callers tell this class when something occurs, like a
+    frame arriving. If that activity is worthy of a follow-up action (say
+    *run a command*), the return value of that handler will say so.
+
+    I/O and CPU intensive operations are purposefully delegated outside of
+    this class.
+
+    Consumers are expected to tell instances when events occur. They do so by
+    calling the various ``on*`` methods. These methods return a 2-tuple
+    describing any follow-up action(s) to take. The first element is the
+    name of an action to perform. The second is a data structure (usually
+    a dict) specific to that action that contains more information. e.g.
+    if the server wants to send frames back to the client, the data structure
+    will contain a reference to those frames.
+
+    Valid actions that consumers can be instructed to take are:
+
+    error
+       Indicates that an error occurred. Consumer should probably abort.
+
+    runcommand
+       Indicates that the consumer should run a wire protocol command. Details
+       of the command to run are given in the data structure.
+
+    wantframe
+       Indicates that nothing of interest happened and the server is waiting on
+       more frames from the client before anything interesting can be done.
+    """
+
+    def __init__(self):
+        self._state = 'idle'
+        self._activecommand = None
+        self._activeargs = None
+        self._activedata = None
+        self._expectingargs = None
+        self._expectingdata = None
+        self._activeargname = None
+        self._activeargchunks = None
+
+    def onframerecv(self, frametype, frameflags, payload):
+        """Process a frame that has been received off the wire.
+
+        Returns a dict with an ``action`` key that details what action,
+        if any, the consumer should take next.
+        """
+        handlers = {
+            'idle': self._onframeidle,
+            'command-receiving-args': self._onframereceivingargs,
+            'command-receiving-data': self._onframereceivingdata,
+            'errored': self._onframeerrored,
+        }
+
+        meth = handlers.get(self._state)
+        if not meth:
+            raise error.ProgrammingError('unhandled state: %s' % self._state)
+
+        return meth(frametype, frameflags, payload)
+
+    def _makeerrorresult(self, msg):
+        return 'error', {
+            'message': msg,
+        }
+
+    def _makeruncommandresult(self):
+        return 'runcommand', {
+            'command': self._activecommand,
+            'args': self._activeargs,
+            'data': self._activedata.getvalue() if self._activedata else None,
+        }
+
+    def _makewantframeresult(self):
+        return 'wantframe', {
+            'state': self._state,
+        }
+
+    def _onframeidle(self, frametype, frameflags, payload):
+        # The only frame type that should be received in this state is a
+        # command request.
+        if frametype != FRAME_TYPE_COMMAND_NAME:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected command frame; got %d') % frametype)
+
+        self._activecommand = payload
+        self._activeargs = {}
+        self._activedata = None
+
+        if frameflags & FLAG_COMMAND_NAME_EOS:
+            return self._makeruncommandresult()
+
+        self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
+        self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+
+        if self._expectingargs:
+            self._state = 'command-receiving-args'
+            return self._makewantframeresult()
+        elif self._expectingdata:
+            self._activedata = util.bytesio()
+            self._state = 'command-receiving-data'
+            return self._makewantframeresult()
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(_('missing frame flags on '
+                                           'command frame'))
+
+    def _onframereceivingargs(self, frametype, frameflags, payload):
+        if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
+            self._state = 'errored'
+            return self._makeerrorresult(_('expected command argument '
+                                           'frame; got %d') % frametype)
+
+        offset = 0
+        namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
+        offset += ARGUMENT_FRAME_HEADER.size
+
+        # The argument name MUST fit inside the frame.
+        argname = bytes(payload[offset:offset + namesize])
+        offset += namesize
+
+        if len(argname) != namesize:
+            self._state = 'errored'
+            return self._makeerrorresult(_('malformed argument frame: '
+                                           'partial argument name'))
+
+        argvalue = bytes(payload[offset:])
+
+        # Argument value spans multiple frames. Record our active state
+        # and wait for the next frame.
+        if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
+            raise error.ProgrammingError('not yet implemented')
+            self._activeargname = argname
+            self._activeargchunks = [argvalue]
+            self._state = 'command-arg-continuation'
+            return self._makewantframeresult()
+
+        # Common case: the argument value is completely contained in this
+        # frame.
+
+        if len(argvalue) != valuesize:
+            self._state = 'errored'
+            return self._makeerrorresult(_('malformed argument frame: '
+                                           'partial argument value'))
+
+        self._activeargs[argname] = argvalue
+
+        if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
+            if self._expectingdata:
+                self._state = 'command-receiving-data'
+                self._activedata = util.bytesio()
+                # TODO signal request to run a command once we don't
+                # buffer data frames.
+                return self._makewantframeresult()
+            else:
+                self._state = 'waiting'
+                return self._makeruncommandresult()
+        else:
+            return self._makewantframeresult()
+
+    def _onframereceivingdata(self, frametype, frameflags, payload):
+        if frametype != FRAME_TYPE_COMMAND_DATA:
+            self._state = 'errored'
+            return self._makeerrorresult(_('expected command data frame; '
+                                           'got %d') % frametype)
+
+        # TODO support streaming data instead of buffering it.
+        self._activedata.write(payload)
+
+        if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
+            return self._makewantframeresult()
+        elif frameflags & FLAG_COMMAND_DATA_EOS:
+            self._activedata.seek(0)
+            self._state = 'idle'
+            return self._makeruncommandresult()
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(_('command data frame without '
+                                           'flags'))
+
+    def _onframeerrored(self, frametype, frameflags, payload):
+        return self._makeerrorresult(_('server already errored'))
diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2561,6 +2561,14 @@ 
 
         return data
 
+    def readinto(self, b):
+        res = self.read(len(b))
+        if res is None:
+            return None
+
+        b[0:len(res)] = res
+        return len(res)
+
 def stringmatcher(pattern, casesensitive=True):
     """
     accepts a string, possibly starting with 're:' or 'literal:' prefix.
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -583,6 +583,9 @@ 
 coreconfigitem('experimental', 'web.api.http-v2',
     default=False,
 )
+coreconfigitem('experimental', 'web.api.debugreflect',
+    default=False,
+)
 coreconfigitem('experimental', 'xdiff',
     default=False,
 )