Patchwork D3224: wireproto: client reactor support for receiving frames

login
register
mail settings
Submitter phabricator
Date April 11, 2018, 5 p.m.
Message ID <abe3f309d0fedc4cf21e6edeab55f2d6@localhost.localdomain>
Download mbox | patch
Permalink /patch/30716/
State Not Applicable
Headers show

Comments

phabricator - April 11, 2018, 5 p.m.
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG55b5ba8d4e68: wireproto: client reactor support for receiving frames (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3224?vs=7942&id=7990

REVISION DETAIL
  https://phab.mercurial-scm.org/D3224

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotoframing.py
  tests/test-wireproto-clientreactor.py

CHANGE DETAILS




To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel

Patch

diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py
--- a/tests/test-wireproto-clientreactor.py
+++ b/tests/test-wireproto-clientreactor.py
@@ -7,6 +7,21 @@ 
     wireprotoframing as framing,
 )
 
+ffs = framing.makeframefromhumanstring
+
+def sendframe(reactor, frame):
+    """Send a frame bytearray to a reactor."""
+    header = framing.parseheader(frame)
+    payload = frame[framing.FRAME_HEADER_SIZE:]
+    assert len(payload) == header.length
+
+    return reactor.onframerecv(framing.frame(header.requestid,
+                                             header.streamid,
+                                             header.streamflags,
+                                             header.typeid,
+                                             header.flags,
+                                             payload))
+
 class SingleSendTests(unittest.TestCase):
     """A reactor that can only send once rejects subsequent sends."""
     def testbasic(self):
@@ -61,6 +76,35 @@ 
 
         self.assertEqual(request.state, 'sent')
 
+class BadFrameRecvTests(unittest.TestCase):
+    def testoddstream(self):
+        reactor = framing.clientreactor()
+
+        action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
+        self.assertEqual(action, 'error')
+        self.assertEqual(meta['message'],
+                         'received frame with odd numbered stream ID: 1')
+
+    def testunknownstream(self):
+        reactor = framing.clientreactor()
+
+        action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
+        self.assertEqual(action, 'error')
+        self.assertEqual(meta['message'],
+                         'received frame on unknown stream without beginning '
+                         'of stream flag set')
+
+    def testunhandledframetype(self):
+        reactor = framing.clientreactor(buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for frame in meta['framegen']:
+            pass
+
+        with self.assertRaisesRegexp(error.ProgrammingError,
+                                     'unhandled frame type'):
+            sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -922,6 +922,7 @@ 
         self._outgoingstream = stream(1)
         self._pendingrequests = collections.deque()
         self._activerequests = {}
+        self._incomingstreams = {}
 
     def callcommand(self, name, args, datafh=None):
         """Request that a command be executed.
@@ -1007,3 +1008,63 @@ 
             yield frame
 
         request.state = 'sent'
+
+    def onframerecv(self, frame):
+        """Process a frame that has been received off the wire.
+
+        Returns a 2-tuple of (action, meta) describing further action the
+        caller needs to take as a result of receiving this frame.
+        """
+        if frame.streamid % 2:
+            return 'error', {
+                'message': (
+                    _('received frame with odd numbered stream ID: %d') %
+                    frame.streamid),
+            }
+
+        if frame.streamid not in self._incomingstreams:
+            if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+                return 'error', {
+                    'message': _('received frame on unknown stream '
+                                 'without beginning of stream flag set'),
+                }
+
+        if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+            raise error.ProgrammingError('support for decoding stream '
+                                         'payloads not yet implemneted')
+
+        if frame.streamflags & STREAM_FLAG_END_STREAM:
+            del self._incomingstreams[frame.streamid]
+
+        if frame.requestid not in self._activerequests:
+            return 'error', {
+                'message': (_('received frame for inactive request ID: %d') %
+                            frame.requestid),
+            }
+
+        request = self._activerequests[frame.requestid]
+        request.state = 'receiving'
+
+        handlers = {
+            FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
+        }
+
+        meth = handlers.get(frame.typeid)
+        if not meth:
+            raise error.ProgrammingError('unhandled frame type: %d' %
+                                         frame.typeid)
+
+        return meth(request, frame)
+
+    def _onbytesresponseframe(self, request, frame):
+        if frame.flags & FLAG_BYTES_RESPONSE_EOS:
+            request.state = 'received'
+            del self._activerequests[request.requestid]
+
+        return 'responsedata', {
+            'request': request,
+            'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
+            'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
+            'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
+            'data': frame.payload,
+        }
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -551,18 +551,19 @@ 
 
             self.ui.note(_('received %r\n') % frame)
 
-            if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
-                if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
-                    payload = util.bytesio(frame.payload)
+            action, meta = reactor.onframerecv(frame)
+
+            if action == 'responsedata':
+                if meta['cbor']:
+                    payload = util.bytesio(meta['data'])
 
                     decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(frame.payload):
+                    while payload.tell() + 1 < len(meta['data']):
                         results.append(decoder.decode())
                 else:
-                    results.append(frame.payload)
+                    results.append(meta['data'])
             else:
-                error.ProgrammingError('unhandled frame type: %d' %
-                                       frame.typeid)
+                error.ProgrammingError('unhandled action: %s' % action)
 
         return results