Patchwork D2901: wireproto: explicitly track which requests are active

login
register
mail settings
Submitter phabricator
Date March 19, 2018, 9:42 p.m.
Message ID <differential-rev-PHID-DREV-z2ha4qkvddgr5kfyvl4d-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/29646/
State Superseded
Headers show

Comments

phabricator - March 19, 2018, 9:42 p.m.
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  We previously only tracked which requests are receiving. A
  misbehaving client could accidentally have multiple requests with
  the same ID in flight.
  
  We now explicitly track which request IDs are currently active.
  We make it illegal to receive a frame associated with a request
  ID that has already been dispatched.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2901

AFFECTED FILES
  mercurial/wireprotoframing.py
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -478,11 +478,11 @@ 
         results = list(sendframes(makereactor(), [
             ffs(b'1 command-name eos command1'),
             ffs(b'3 command-name have-data command3'),
-            ffs(b'1 command-argument eoa ignored'),
+            ffs(b'5 command-argument eoa ignored'),
         ]))
         self.assertaction(results[2], 'error')
         self.assertEqual(results[2][1], {
-            'message': b'received frame for request that is not receiving: 1',
+            'message': b'received frame for request that is not receiving: 5',
         })
 
     def testsimpleresponse(self):
@@ -571,6 +571,56 @@ 
             b'5 bytes-response eos response5',
         ])
 
+    def testduplicaterequestonactivecommand(self):
+        """Receiving a request ID that matches a request that isn't finished."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+
+        self.assertaction(results[0], 'error')
+        self.assertEqual(results[0][1], {
+            'message': b'request with ID 1 is already active',
+        })
+
+    def testduplicaterequestonactivecommandnosend(self):
+        """Same as above but we've registered a response but haven't sent it."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        reactor.onbytesresponseready(1, b'response')
+
+        # We've registered the response but haven't sent it. From the
+        # perspective of the reactor, the command is still active.
+
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        self.assertaction(results[0], 'error')
+        self.assertEqual(results[0][1], {
+            'message': b'request with ID 1 is already active',
+        })
+
+    def testduplicaterequestargumentframe(self):
+        """Variant on above except we sent an argument frame instead of name."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command', {}))
+        results = list(sendframes(reactor, [
+            ffs(b'3 command-name have-args command'),
+            ffs(b'1 command-argument 0 ignored'),
+        ]))
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': 'received frame for request that is still active: 1',
+        })
+
+    def testduplicaterequestaftersend(self):
+        """We can use a duplicate request ID after we've sent the response."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        res = reactor.onbytesresponseready(1, b'response')
+        list(res[1]['framegen'])
+
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        self.assertaction(results[0], 'runcommand')
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -472,6 +472,10 @@ 
         self._bufferedframegens = []
         # request id -> dict of commands that are actively being received.
         self._receivingcommands = {}
+        # Request IDs that have been received and are actively being processed.
+        # Once all output for a request has been sent, it is removed from this
+        # set.
+        self._activecommands = set()
 
     def onframerecv(self, frame):
         """Process a frame that has been received off the wire.
@@ -496,14 +500,20 @@ 
 
         The raw bytes response is passed as an argument.
         """
-        framegen = createbytesresponseframesfrombytes(requestid, data)
+        def sendframes():
+            for frame in createbytesresponseframesfrombytes(requestid, data):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        result = sendframes()
 
         if self._deferoutput:
-            self._bufferedframegens.append(framegen)
+            self._bufferedframegens.append(result)
             return 'noop', {}
         else:
             return 'sendframes', {
-                'framegen': framegen,
+                'framegen': result,
             }
 
     def oninputeof(self):
@@ -546,6 +556,9 @@ 
         else:
             self._state = 'idle'
 
+        assert requestid not in self._activecommands
+        self._activecommands.add(requestid)
+
         return 'runcommand', {
             'requestid': requestid,
             'command': entry['command'],
@@ -571,6 +584,11 @@ 
             return self._makeerrorresult(
                 _('request with ID %d already received') % frame.requestid)
 
+        if frame.requestid in self._activecommands:
+            self._state = 'errored'
+            return self._makeerrorresult((
+                _('request with ID %d is already active') % frame.requestid))
+
         expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
         expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
 
@@ -599,7 +617,13 @@ 
             return self._onframeidle(frame)
 
         # All other frames should be related to a command that is currently
-        # receiving.
+        # receiving but is not active.
+        if frame.requestid in self._activecommands:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received frame for request that is still active: %d') %
+                frame.requestid)
+
         if frame.requestid not in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(