@@ -23,6 +23,7 @@
vfs as vfsmod,
wireprotoserver,
wireprototypes,
+ wireprotov1peer,
wireprotov2server,
)
@@ -102,6 +103,14 @@
localrepo.localpeer)
checkzobject(localrepo.localpeer(dummyrepo()))
+ ziverify.verifyClass(repository.ipeercommandexecutor,
+ localrepo.localcommandexecutor)
+ checkzobject(localrepo.localcommandexecutor(None))
+
+ ziverify.verifyClass(repository.ipeercommandexecutor,
+ wireprotov1peer.peerexecutor)
+ checkzobject(wireprotov1peer.peerexecutor(None))
+
ziverify.verifyClass(repository.ipeerbaselegacycommands,
sshpeer.sshv1peer)
checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
@@ -7,13 +7,17 @@
from __future__ import absolute_import
+import contextlib
import hashlib
+import sys
from .i18n import _
from .node import (
bin,
)
-
+from .thirdparty.zope import (
+ interface as zi,
+)
from . import (
bundle2,
changegroup as changegroupmod,
@@ -177,14 +181,96 @@
return ';'.join(cmds)
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+ def __init__(self, peer):
+ self._peer = peer
+ self._sent = False
+ self._closed = False
+ self._calls = []
+
+ def callcommand(self, command, args):
+ if self._sent:
+ raise error.ProgrammingError('callcommand() cannot be used '
+ 'after sendcommands()')
+
+ if self._closed:
+ raise error.ProgrammingError('callcommand() cannot be used '
+ 'after close()')
+
+ # Commands typically have methods on the peer
+ fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+ if fn:
+ # Not all commands are batchable. So verify we don't attempt
+ # to batch non-batchable commands.
+ isbatchable = getattr(fn, 'batchable', False)
+
+ if not isbatchable and self._calls:
+ raise error.ProgrammingError(
+ '%s is not batchable and cannot be called on a command '
+ 'executor along with other commands' % command)
+
+ if self._calls and not self._calls[-1][2]:
+ raise error.ProgrammingError(
+ '%s cannot be called on a command executor after a '
+ 'non-batchable command')
+
+ f = pycompat.futures.Future()
+
+ self._calls.append((command, args, fn, f))
+
+ return f
+
+ def sendcommands(self):
+ if self._sent:
+ return
+
+ if not self._calls:
+ return
+
+ self._sent = True
+
+ if len(self._calls) == 1:
+ command, args, fn, f = self._calls[0]
+
+ # Future was cancelled. Ignore it.
+ if not f.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = fn(**pycompat.strkwargs(args))
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ else:
+ f.set_result(result)
+
+ return
+
+ raise error.ProgrammingError('support for multiple commands not '
+ 'yet implemented')
+
+ def close(self):
+ self.sendcommands()
+
+ self._closed = True
+
class wirepeer(repository.legacypeer):
"""Client-side interface for communicating with a peer repository.
Methods commonly call wire protocol commands of the same name.
See also httppeer.py and sshpeer.py for protocol-specific
implementations of this interface.
"""
+ @contextlib.contextmanager
+ def commandexecutor(self):
+ executor = peerexecutor(self)
+ try:
+ yield executor
+ finally:
+ executor.close()
+
# Begin of ipeercommands interface.
def iterbatch(self):
@@ -228,7 +228,14 @@
% (roundtrips, len(undecided), len(sample)))
# indices between sample and externalized version must match
sample = list(sample)
- yesno = remote.known(dag.externalizeall(sample))
+
+ with remote.commandexecutor() as e:
+ fyesno = e.callcommand('known', {
+ 'nodes': dag.externalizeall(sample),
+ })
+
+ yesno = fyesno.result()
+
full = True
if sample:
@@ -268,7 +268,8 @@
being issued.
"""
-class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands):
+class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands,
+ ipeerrequests):
"""Unified interface for peer repositories.
All peer instances must conform to this interface.
@@ -7,10 +7,12 @@
from __future__ import absolute_import
+import contextlib
import errno
import hashlib
import os
import random
+import sys
import time
import weakref
@@ -167,6 +169,44 @@
resref.set(getattr(self.local, name)(*args, **opts))
yield resref.value
+@zi.implementer(repository.ipeercommandexecutor)
+class localcommandexecutor(object):
+ def __init__(self, peer):
+ self._peer = peer
+ self._sent = False
+ self._closed = False
+
+ def callcommand(self, command, args):
+ if self._sent:
+ raise error.ProgrammingError('callcommand() cannot be used after '
+ 'sendcommands()')
+
+ if self._closed:
+ raise error.ProgrammingError('callcommand() cannot be used after '
+ 'close()')
+
+ # We don't need to support anything fancy. Just call the named
+ # method on the peer and return a resolved future.
+ fn = getattr(self._peer, pycompat.sysstr(command))
+
+ f = pycompat.futures.Future()
+
+ try:
+ result = fn(**args)
+ except Exception:
+ e, tb = sys.exc_info()[1:]
+ f.set_exception_info(e, tb)
+ else:
+ f.set_result(result)
+
+ return f
+
+ def sendcommands(self):
+ self._sent = True
+
+ def close(self):
+ self._closed = True
+
class localpeer(repository.peer):
'''peer for a local repo; reflects only the most recent API'''
@@ -286,6 +326,14 @@
# Begin of peer interface.
+ @contextlib.contextmanager
+ def commandexecutor(self):
+ executor = localcommandexecutor(self)
+ try:
+ yield executor
+ finally:
+ executor.close()
+
def iterbatch(self):
return localiterbatcher(self)