Patchwork D3297: httppeer: implement command executor for version 2 peer

login
register
mail settings
Submitter phabricator
Date April 12, 2018, 8 p.m.
Message ID <differential-rev-PHID-DREV-66emonq24c7ljbcjysoh-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/30830/
State Superseded
Headers show

Comments

phabricator - April 12, 2018, 8 p.m.
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Now that we have a new API for issuing commands which is compatible
  with wire protocol version 2, we can start using it with wire protocol
  version 2.
  
  This commit replaces our hacky implementation of _call() with something
  a bit more robust based on the new command executor interface.
  
  We now have proper support for issuing multiple commands per HTTP
  request. Each HTTP request maintains its own client reactor.
  
  The implementation is similar to the one in the legacy wire protocol.
  We use a ThreadPoolExecutor for spinning up a thread to read the HTTP
  response in the background. This allows responses to resolve in any
  order. While not implemented on the server yet, a client could use
  concurrent.futures.as_completed() with a collection of futures and
  handle responses as they arrive from the server.
  
  The return value from issued commands is still a simple list of raw
  or decoded CBOR data. This is still super hacky. We will want a rich
  data type for representing command responses. But at least this
  commit gets us one step closer to a proper peer implementation.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3297

AFFECTED FILES
  mercurial/httppeer.py

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel
phabricator - April 13, 2018, 7:22 p.m.
indygreg planned changes to this revision.
indygreg added a comment.


  This needs some revisions to adapt to the tweaked command executor interface semantics.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3297

To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -13,7 +13,9 @@ 
 import os
 import socket
 import struct
+import sys
 import tempfile
+import weakref
 
 from .i18n import _
 from .thirdparty import (
@@ -31,7 +33,6 @@ 
     statichttprepo,
     url as urlmod,
     util,
-    wireproto,
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
@@ -517,8 +518,237 @@ 
     def _abort(self, exception):
         raise exception
 
+def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
+    reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+                                             buffersends=True)
+
+    url = '%s/%s' % (apiurl, permission)
+
+    if len(requests) > 1:
+        url += '/multirequest'
+    else:
+        url += '/%s' % requests[0][0]
+
+    # Request ID to (request, future)
+    requestmap = {}
+
+    for command, args, f in requests:
+        request, action, meta = reactor.callcommand(command, args)
+        assert action == 'noop'
+
+        requestmap[request.requestid] = (request, f)
+
+    action, meta = reactor.flushcommands()
+    assert action == 'sendframes'
+
+    # TODO stream this.
+    body = b''.join(map(bytes, meta['framegen']))
+
+    # TODO modify user-agent to reflect v2
+    headers = {
+        r'Accept': wireprotov2server.FRAMINGTYPE,
+        r'Content-Type': wireprotov2server.FRAMINGTYPE,
+    }
+
+    req = requestbuilder(pycompat.strurl(url), body, headers)
+    req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
+
+    try:
+        res = opener.open(req)
+    except urlerr.httpeeror as e:
+        if e.code == 401:
+            raise error.Abort(_('authorization failed'))
+
+        raise
+    except httplib.HTTPException as e:
+        ui.traceback()
+        raise IOError(None, e)
+
+    return reactor, requestmap, res
+
+@zi.implementer(repository.ipeercommandexecutor)
+class httpv2executor(object):
+    def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
+        self._ui = ui
+        self._opener = opener
+        self._requestbuilder = requestbuilder
+        self._apiurl = apiurl
+        self._descriptor = descriptor
+        self._sent = False
+        self._closed = False
+        self._neededpermissions = set()
+        self._calls = []
+        self._futures = weakref.WeakSet()
+        self._responseexecutor = None
+        self._responsef = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalue, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'sendcommands()')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'close()')
+
+        # The service advertises which commands are available. So if we attempt
+        # to call an unknown command or pass an unknown argument, we can screen
+        # for this.
+        if command not in self._descriptor['commands']:
+            raise error.ProgrammingError(
+                'wire protocol command %s is not available' % command)
+
+        cmdinfo = self._descriptor['commands'][command]
+        unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
+
+        if unknownargs:
+            raise error.ProgrammingError(
+                'wire protocol command %s does not accept argument: %s' % (
+                    command, ', '.join(sorted(unknownargs))))
+
+        self._neededpermissions |= set(cmdinfo['permissions'])
+
+        # TODO we /could/ also validate types here, since the API descriptor
+        # includes types...
+
+        f = pycompat.futures.Future()
+        self._futures.add(f)
+
+        self._calls.append((command, args, f))
+
+        return f
+
+    def sendcommands(self):
+        if self._sent:
+            return
+
+        if not self._calls:
+            return
+
+        self._sent = True
+
+        # Mark the future as running and filter out cancelled futures.
+        self._calls = [(command, args, f)
+                       for command, args, f in self._calls
+                       if f.set_running_or_notify_cancel()]
+
+        if not self._calls:
+            return
+
+        permissions = set(self._neededpermissions)
+
+        if 'push' in permissions and 'pull' in permissions:
+            permissions.remove('pull')
+
+        if len(permissions) > 1:
+            raise error.RepoError(_('cannot make request requiring multiple '
+                                    'permissions: %s') %
+                                  _(', ').join(sorted(permissions)))
+
+        permission = {
+            'push': 'rw',
+            'pull': 'ro',
+        }[permissions.pop()]
+
+        reactor, requests, resp = sendv2request(
+            self._ui, self._opener, self._requestbuilder, self._apiurl,
+            permission, self._calls)
+
+        # Need to nuke so a reference to the future isn't held.
+        self._calls = None
+
+        # TODO we probably want to validate the HTTP code, media type, etc.
+
+        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+        self._responsef = self._responseexecutor.submit(self._handleresponse,
+                                                        reactor,
+                                                        requests,
+                                                        resp)
+
+    def close(self):
+        if self._closed:
+            return
+
+        self.sendcommands()
+
+        self._closed = True
+
+        if not self._responsef:
+            return
+
+        try:
+            self._responsef.result()
+        finally:
+            self._responseexecutor.shutdown(wait=True)
+            self._responsef = None
+            self._responseexecutor = None
+
+            # If any of our futures are still in progress, mark them as
+            # errored, otherwise a result() could wait indefinitely.
+            for f in self._futures:
+                if not f.done():
+                    f.set_exception(error.ResponseError(
+                        _('unfulfilled command response')))
+
+            self._futures = None
+
+    def _handleresponse(self, reactor, requests, resp):
+        # Called in a thread to read the response.
+
+        results = {k: [] for k in requests}
+
+        while True:
+            frame = wireprotoframing.readframe(resp)
+            if frame is None:
+                break
+
+            self._ui.note(_('received %r\n') % frame)
+
+            # Guard against receiving a frame with a request ID that we
+            # didn't issue. This should never happen.
+            request, f = requests.get(frame.requestid, [None, None])
+
+            action, meta = reactor.onframerecv(frame)
+
+            if action == 'responsedata':
+                assert request.requestid == meta['request'].requestid
+
+                result = results[request.requestid]
+
+                if meta['cbor']:
+                    payload = util.bytesio(meta['data'])
+
+                    decoder = cbor.CBORDecoder(payload)
+                    while payload.tell() + 1 < len(meta['data']):
+                        try:
+                            result.append(decoder.decode())
+                        except Exception:
+                            f.set_exception_info(*sys.exc_info()[1:])
+                            continue
+                else:
+                    result.append(meta['data'])
+
+                if meta['eos']:
+                    f.set_result(result)
+                    del results[request.requestid]
+
+            else:
+                e = error.ProgrammingError('unhandled action: %s' % action)
+
+                if f:
+                    f.set_exception(e)
+                else:
+                    raise e
+
 # TODO implement interface for version 2 peers
-@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
+@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
+                repository.ipeerrequests)
 class httpv2peer(object):
     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
                  apidescriptor):
@@ -529,6 +759,7 @@ 
 
         self._url = repourl
         self._apipath = apipath
+        self._apiurl = '%s/%s' % (repourl, apipath)
         self._opener = opener
         self._requestbuilder = requestbuilder
         self._descriptor = apidescriptor
@@ -580,85 +811,15 @@ 
 
     # End of ipeercapabilities.
 
-    # TODO require to be part of a batched primitive, use futures.
     def _call(self, name, **args):
-        """Call a wire protocol command with arguments."""
-
-        # Having this early has a side-effect of importing wireprotov2server,
-        # which has the side-effect of ensuring commands are registered.
-
-        # TODO modify user-agent to reflect v2.
-        headers = {
-            r'Accept': wireprotov2server.FRAMINGTYPE,
-            r'Content-Type': wireprotov2server.FRAMINGTYPE,
-        }
-
-        # TODO permissions should come from capabilities results.
-        permission = wireproto.commandsv2[name].permission
-        if permission not in ('push', 'pull'):
-            raise error.ProgrammingError('unknown permission type: %s' %
-                                         permission)
-
-        permission = {
-            'push': 'rw',
-            'pull': 'ro',
-        }[permission]
-
-        url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
-
-        # TODO this should be part of a generic peer for the frame-based
-        # protocol.
-        reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
-                                                 buffersends=True)
-
-        request, action, meta = reactor.callcommand(name, args)
-        assert action == 'noop'
-
-        action, meta = reactor.flushcommands()
-        assert action == 'sendframes'
+        with self.commandexecutor() as e:
+            f = e.callcommand(name, args)
 
-        body = b''.join(map(bytes, meta['framegen']))
-        req = self._requestbuilder(pycompat.strurl(url), body, headers)
-        req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
-
-        # TODO unify this code with httppeer.
-        try:
-            res = self._opener.open(req)
-        except urlerr.httperror as e:
-            if e.code == 401:
-                raise error.Abort(_('authorization failed'))
-
-            raise
-        except httplib.HTTPException as e:
-            self.ui.traceback()
-            raise IOError(None, e)
-
-        # TODO validate response type, wrap response to handle I/O errors.
-        # TODO more robust frame receiver.
-        results = []
+        return f.result()
 
-        while True:
-            frame = wireprotoframing.readframe(res)
-            if frame is None:
-                break
-
-            self.ui.note(_('received %r\n') % frame)
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        results.append(decoder.decode())
-                else:
-                    results.append(meta['data'])
-            else:
-                error.ProgrammingError('unhandled action: %s' % action)
-
-        return results
+    def commandexecutor(self):
+        return httpv2executor(self.ui, self._opener, self._requestbuilder,
+                              self._apiurl, self._descriptor)
 
 # Registry of API service names to metadata about peers that handle it.
 #