Patchwork D2858: wireproto: define and implement responses in framing protocol

login
register
mail settings
Submitter phabricator
Date March 15, 2018, 1:11 a.m.
Message ID <b1e6f383d948a0737af147c92388f09b@localhost.localdomain>
Download mbox | patch
Permalink /patch/29520/
State Not Applicable
Headers show

Comments

phabricator - March 15, 2018, 1:11 a.m.
indygreg updated this revision to Diff 7051.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2858?vs=7034&id=7051

REVISION DETAIL
  https://phab.mercurial-scm.org/D2858

AFFECTED FILES
  mercurial/help/internals/wireprotocol.txt
  mercurial/wireprotoframing.py
  mercurial/wireprotoserver.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -79,6 +79,10 @@ 
         self.assertIsInstance(res[1], dict)
         self.assertEqual(res[0], expected)
 
+    def assertframesequal(self, frames, framestrings):
+        expected = [ffs(s) for s in framestrings]
+        self.assertEqual(list(frames), expected)
+
     def test1framecommand(self):
         """Receiving a command in a single frame yields request to run it."""
         reactor = makereactor()
@@ -270,6 +274,42 @@ 
             'message': b'command data frame without flags',
         })
 
+    def testsimpleresponse(self):
+        """Bytes response to command sends result frames."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, b'mycommand', {}))
+
+        result = reactor.onbytesresponseready(b'response')
+        self.assertaction(result, 'sendframes')
+        self.assertframesequal(result[1]['framegen'], [
+            b'bytes-response eos response',
+        ])
+
+    def testmultiframeresponse(self):
+        """Bytes response spanning multiple frames is handled."""
+        first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
+        second = b'y' * 100
+
+        reactor = makereactor()
+        list(sendcommandframes(reactor, b'mycommand', {}))
+
+        result = reactor.onbytesresponseready(first + second)
+        self.assertaction(result, 'sendframes')
+        self.assertframesequal(result[1]['framegen'], [
+            b'bytes-response continuation %s' % first,
+            b'bytes-response eos %s' % second,
+        ])
+
+    def testapplicationerror(self):
+        reactor = makereactor()
+        list(sendcommandframes(reactor, b'mycommand', {}))
+
+        result = reactor.onapplicationerror(b'some message')
+        self.assertaction(result, 'sendframes')
+        self.assertframesequal(result[1]['framegen'], [
+            b'error-response application some message',
+        ])
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -180,10 +180,14 @@ 
   s>     HTTP/1.1 200 OK\r\n
   s>     Server: testing stub value\r\n
   s>     Date: $HTTP_DATE$\r\n
-  s>     Content-Type: text/plain\r\n
-  s>     Content-Length: *\r\n (glob)
+  s>     Content-Type: application/mercurial-exp-framing-0001\r\n
+  s>     Transfer-Encoding: chunked\r\n
   s>     \r\n
-  s>     lookup branchmap pushkey known getbundle unbundlehash streamreqs=generaldelta,revlogv1 $USUAL_BUNDLE2_CAPS_SERVER$ unbundle=HG10GZ,HG10BZ,HG10UN
+  s>     *\r\n (glob)
+  s>     Y\x01\x00Blookup branchmap pushkey known getbundle unbundlehash streamreqs=generaldelta,revlogv1 $USUAL_BUNDLE2_CAPS_SERVER$ unbundle=HG10GZ,HG10BZ,HG10UN
+  s>     \r\n
+  s>     0\r\n
+  s>     \r\n
 
 Request to read-write command fails because server is read-only by default
 
@@ -287,10 +291,14 @@ 
   s>     HTTP/1.1 200 OK\r\n
   s>     Server: testing stub value\r\n
   s>     Date: $HTTP_DATE$\r\n
-  s>     Content-Type: text/plain\r\n
-  s>     Content-Length: *\r\n (glob)
+  s>     Content-Type: application/mercurial-exp-framing-0001\r\n
+  s>     Transfer-Encoding: chunked\r\n
   s>     \r\n
-  s>     lookup branchmap pushkey known getbundle unbundlehash streamreqs=generaldelta,revlogv1 $USUAL_BUNDLE2_CAPS_SERVER$ unbundle=HG10GZ,HG10BZ,HG10UN
+  s>     *\r\n (glob)
+  s>     Y\x01\x00Blookup branchmap pushkey known getbundle unbundlehash streamreqs=generaldelta,revlogv1 $USUAL_BUNDLE2_CAPS_SERVER$ unbundle=HG10GZ,HG10BZ,HG10UN
+  s>     \r\n
+  s>     0\r\n
+  s>     \r\n
 
 Authorized request for unknown command is rejected
 
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -493,15 +493,20 @@ 
 
     rsp = wireproto.dispatch(repo, proto, command['command'])
 
-    # TODO use proper response format.
     res.status = b'200 OK'
-    res.headers[b'Content-Type'] = b'text/plain'
+    res.headers[b'Content-Type'] = FRAMINGTYPE
 
     if isinstance(rsp, wireprototypes.bytesresponse):
-        res.setbodybytes(rsp.data)
+        action, meta = reactor.onbytesresponseready(rsp.data)
     else:
-        res.setbodybytes(b'unhandled response type from wire proto '
-                         'command')
+        action, meta = reactor.onapplicationerror(
+            _('unhandled response type from wire proto command'))
+
+    if action == 'sendframes':
+        res.setbodygen(meta['framegen'])
+    else:
+        raise error.ProgrammingError('unhandled event from reactor: %s' %
+                                     action)
 
 # Maps API name to metadata so custom API can be registered.
 API_HANDLERS = {
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -25,11 +25,15 @@ 
 FRAME_TYPE_COMMAND_NAME = 0x01
 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
 FRAME_TYPE_COMMAND_DATA = 0x03
+FRAME_TYPE_BYTES_RESPONSE = 0x04
+FRAME_TYPE_ERROR_RESPONSE = 0x05
 
 FRAME_TYPES = {
     b'command-name': FRAME_TYPE_COMMAND_NAME,
     b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
     b'command-data': FRAME_TYPE_COMMAND_DATA,
+    b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
+    b'error-response': FRAME_TYPE_ERROR_RESPONSE,
 }
 
 FLAG_COMMAND_NAME_EOS = 0x01
@@ -58,11 +62,29 @@ 
     b'eos': FLAG_COMMAND_DATA_EOS,
 }
 
+FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
+FLAG_BYTES_RESPONSE_EOS = 0x02
+
+FLAGS_BYTES_RESPONSE = {
+    b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
+    b'eos': FLAG_BYTES_RESPONSE_EOS,
+}
+
+FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
+FLAG_ERROR_RESPONSE_APPLICATION = 0x02
+
+FLAGS_ERROR_RESPONSE = {
+    b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
+    b'application': FLAG_ERROR_RESPONSE_APPLICATION,
+}
+
 # Maps frame types to their available flags.
 FRAME_TYPE_FLAGS = {
     FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
     FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
     FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
+    FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
+    FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
 }
 
 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
@@ -202,6 +224,47 @@ 
             if done:
                 break
 
+def createbytesresponseframesfrombytes(data,
+                                       maxframesize=DEFAULT_MAX_FRAME_SIZE):
+    """Create a raw frame to send a bytes response from static bytes input.
+
+    Returns a generator of bytearrays.
+    """
+
+    # Simple case of a single frame.
+    if len(data) <= maxframesize:
+        yield makeframe(FRAME_TYPE_BYTES_RESPONSE,
+                        FLAG_BYTES_RESPONSE_EOS, data)
+        return
+
+    offset = 0
+    while True:
+        chunk = data[offset:offset + maxframesize]
+        offset += len(chunk)
+        done = offset == len(data)
+
+        if done:
+            flags = FLAG_BYTES_RESPONSE_EOS
+        else:
+            flags = FLAG_BYTES_RESPONSE_CONTINUATION
+
+        yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
+
+        if done:
+            break
+
+def createerrorframe(msg, protocol=False, application=False):
+    # TODO properly handle frame size limits.
+    assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
+
+    flags = 0
+    if protocol:
+        flags |= FLAG_ERROR_RESPONSE_PROTOCOL
+    if application:
+        flags |= FLAG_ERROR_RESPONSE_APPLICATION
+
+    yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg)
+
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
 
@@ -230,6 +293,11 @@ 
 
     Valid actions that consumers can be instructed to take are:
 
+    sendframes
+       Indicates that frames should be sent to the client. The ``framegen``
+       key contains a generator of frames that should be sent. The server
+       assumes that all frames are sent to the client.
+
     error
        Indicates that an error occurred. Consumer should probably abort.
 
@@ -271,6 +339,20 @@ 
 
         return meth(frametype, frameflags, payload)
 
+    def onbytesresponseready(self, data):
+        """Signal that a bytes response is ready to be sent to the client.
+
+        The raw bytes response is passed as an argument.
+        """
+        return 'sendframes', {
+            'framegen': createbytesresponseframesfrombytes(data),
+        }
+
+    def onapplicationerror(self, msg):
+        return 'sendframes', {
+            'framegen': createerrorframe(msg, application=True),
+        }
+
     def _makeerrorresult(self, msg):
         return 'error', {
             'message': msg,
diff --git a/mercurial/help/internals/wireprotocol.txt b/mercurial/help/internals/wireprotocol.txt
--- a/mercurial/help/internals/wireprotocol.txt
+++ b/mercurial/help/internals/wireprotocol.txt
@@ -591,6 +591,40 @@ 
    server. The command has been fully issued and no new data for this
    command will be sent. The next frame will belong to a new command.
 
+Bytes Response Data (``0x04``)
+------------------------------
+
+This frame contains raw bytes response data to an issued command.
+
+The following flag values are defined for this type:
+
+0x01
+   Data continuation. When set, an additional frame containing raw
+   response data will follow.
+0x02
+   End of data. When sent, the response data has been fully sent and
+   no additional frames for this response will be sent.
+
+The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
+
+Error Response (``0x05``)
+-------------------------
+
+An error occurred when processing a request. This could indicate
+a protocol-level failure or an application level failure depending
+on the flags for this message type.
+
+The payload for this type is an error message that should be
+displayed to the user.
+
+The following flag values are defined for this type:
+
+0x01
+   The error occurred at the transport/protocol level. If set, the
+   connection should be closed.
+0x02
+   The error occurred at the application level. e.g. invalid command.
+
 Issuing Commands
 ----------------