@@ -275,6 +275,7 @@
b'known',
b'getbundle',
b'unbundlehash',
+ b'iblt-changelog',
]
@@ -426,6 +427,15 @@
continue
return None
+@wireprotocommand(b'getestimator', b'name', permission=b'pull')
+def getestimator(repo, proto, name):
+ estimator = repo.peer().getestimator(name)
+ return wireprototypes.bytesresponse(estimator.dump())
+
+@wireprotocommand(b'getiblt', b'name size seed', permission=b'pull')
+def getiblt(repo, proto, name, size, seed):
+ inst = repo.peer().getiblt(name, int(size), int(seed))
+ return wireprototypes.bytesresponse(inst.dump())
@wireprotocommand(b'getbundle', b'*', permission=b'pull')
def getbundle(repo, proto, others):
@@ -21,6 +21,7 @@
changegroup as changegroupmod,
encoding,
error,
+ iblt,
pushkey as pushkeymod,
pycompat,
util,
@@ -503,6 +504,14 @@
ret = bundle2.getunbundler(self.ui, stream)
return ret
+ def getestimator(self, name):
+ d = self._call(b"getestimator", name=name)
+ return iblt.estimator.load(d)
+
+ def getiblt(self, name, size, seed):
+ d = self._call(b"getiblt", name=name, size=b'%d' % size, seed=b'%d' % seed)
+ return iblt.iblt.load(d)[0]
+
# End of ipeercommands interface.
# Begin of ipeerlegacycommands interface.
@@ -279,6 +279,70 @@
'discovery', member='PartialDiscovery', default=partialdiscovery
)
+import math
+iblt_sizes = [(1 << i) for i in range(5, 31)]
+iblt_sizes += [math.trunc(math.sqrt(2) * s) for s in iblt_sizes]
+iblt_sizes.sort()
+
+def round_iblt_size(size):
+ size = size + size // 4
+ for s in iblt_sizes:
+ if s >= size:
+ return s
+
+def findsetdifferences(ui, local, remote):
+ if not remote.capable(b'iblt-changelog'):
+ ui.status(b'no iblt support: %s\n' % b' '.join(list(remote.capabilities())))
+ return False, [], [], [], []
+ myestimator = local.peer().getestimator(b'changelog')
+ theirestimator = remote.getestimator(b'changelog')
+ estimated_diff = myestimator.compare(theirestimator)
+ # bail out if estimated_diff = O(len(repo)) and fallback to the classic mechanism?
+ iblt_size = round_iblt_size(estimated_diff)
+ ui.debug(b"expected difference is: %d, using IBLT size of %d\n" % (estimated_diff, iblt_size))
+
+ attempt = 0
+ while True:
+ myiblt = local.peer().getiblt(b'changelog', iblt_size, 0)
+ theiriblt = remote.getiblt(b'changelog', iblt_size, 0)
+ theiriblt.subtract(myiblt)
+ success, them_only, my_only = theiriblt.list()
+ if not success:
+ attempt += 1
+ if attempt == 3:
+ ui.debug(b'iblt extraction failed\n')
+ return False, [], [], [], []
+ iblt_size = round_iblt_size(iblt_size + 1)
+ ui.debug(b'iblt extraction failed, retrying with size %d' % iblt_size)
+ continue
+
+ ui.status(b'iblt extraction worked, %d local changes and %d remote changes found\n' % (len(my_only), len(them_only)))
+ break
+
+ has_node = local.changelog.index.has_node
+ nodelen = len(local.nullid)
+ my_only = [node[:nodelen] for node in my_only]
+
+ # first: find all parents and nodes
+ parents = set()
+ nodes = set()
+ for row in them_only:
+ node = row[:nodelen]
+ if has_node(node):
+ raise error.Abort(_(b"found already known remote change: %s") % node)
+ nodes.add(node)
+ parents.add(row[nodelen:2*nodelen])
+ parents.add(row[2*nodelen:])
+ # second: remote heads are all nodes that are not also parents
+ remoteheads = nodes - parents
+ # third: parent nodes that are not nodes themselve are the boundary
+ # of the common set. Double check that they are known locally.
+ commonheadscandidates = parents - nodes
+ commonheads = [node for node in commonheadscandidates if has_node(node)]
+ if len(commonheads) != len(commonheadscandidates):
+ raise error.Abort(_(b"found remote changes with unknown parents"))
+
+ return True, my_only, them_only, commonheads, remoteheads
def findcommonheads(
ui,
@@ -295,7 +359,6 @@
will be updated with extra data about the discovery, this is useful for
debug.
"""
-
samplegrowth = float(ui.config(b'devel', b'discovery.grow-sample.rate'))
if audit is not None:
@@ -46,6 +46,7 @@
extensions,
filelog,
hook,
+ iblt,
lock as lockmod,
match as matchmod,
mergestate as mergestatemod,
@@ -246,6 +247,7 @@
b'known',
b'getbundle',
b'unbundle',
+ b'iblt-changelog',
}
legacycaps = moderncaps.union({b'changegroupsubset'})
@@ -344,6 +346,58 @@
def clonebundles(self):
return self._repo.tryread(bundlecaches.CB_MANIFEST_FILE)
+ def getestimator(self, name):
+ if name == b'changelog':
+ repo = self.local()
+ cachename = b'estimator-changelog.%d' % repo.changelog.tiprev()
+ try:
+ data = repo.cachevfs.read(cachename)
+ except (IOError, OSError):
+ data = None
+ if data:
+ return iblt.estimator.load(data)
+
+ estimator = iblt.estimator(32)
+ tonode = repo.unfiltered().changelog.node
+ for rev in repo.revs('all()'):
+ estimator.insert(tonode(rev))
+ try:
+ with repo.cachevfs.open(cachename, b'wb') as f:
+ f.write(estimator.dump())
+ except (IOError, OSError) as inst:
+ pass
+ else:
+ raise KeyError(b'unknown getestimator key %s' % name)
+ return estimator
+
+ def getiblt(self, name, size, seed):
+ if seed != 0:
+ raise KeyError(b'unsupport getiblt seed: %s' % seed)
+ if name == b'changelog':
+ repo = self.local()
+ cachename = b'iblt-changelog.%d-%d' % (repo.changelog.tiprev(), size)
+ try:
+ data = repo.cachevfs.read(cachename)
+ except (IOError, OSError):
+ data = None
+ if data:
+ return iblt.iblt.load(data)[0]
+
+ tonode = repo.unfiltered().changelog.node
+ parents = repo.unfiltered().changelog.parents
+ inst = iblt.iblt(size, 3, 3 * len(repo.nullid))
+ for rev in repo.revs('all()'):
+ node = tonode(rev)
+ inst.insert(node + b''.join(parents(node)))
+ try:
+ with repo.cachevfs.open(cachename, b'wb') as f:
+ f.write(inst.dump())
+ except (IOError, OSError) as inst:
+ pass
+ else:
+ raise KeyError(b'unknown getiblt key %s' % name)
+ return inst
+
def debugwireargs(self, one, two, three=None, four=None, five=None):
"""Used to test argument passing over the wire"""
return b"%s %s %s %s %s" % (
@@ -199,6 +199,12 @@
Returns a generator of bundle data.
"""
+ def getestimator(name):
+ pass
+
+ def getiblt(name, size, seed):
+ pass
+
def heads():
"""Determine all known head revisions in the peer.
new file mode 100644
@@ -0,0 +1,205 @@
+import copy
+import hashlib
+import struct
+
+def range_size(upper):
+ if upper < 256:
+ return 1
+ if upper < 65536:
+ return 2
+ if upper < 16777216:
+ return 3
+ return 4
+
+class iblt:
+ def __init__(self, m, k, key_size):
+ self.m = m
+ self.k = k
+ self.key_size = key_size
+ self.key_xors = [0] * m
+ self.key_hash_xors = [0] * m
+ self.key_hash_size = 4
+ self.counts = [0] * m
+
+ def insert(self, key):
+ self._change(key, 1)
+
+ def remove(self, key):
+ self._change(key, -1)
+
+ def __hash(self, key):
+ hashes = hashlib.blake2b(key, digest_size=4*self.k + self.key_hash_size).digest()
+ values = [int.from_bytes(hashes[4*i:4*i+4], 'big') % self.m for i in range(self.k)]
+ # Fudge indices if they are not unique. This avoids the most common
+ # reason for the (implicit) peeling process to fail.
+ if values[0] == values[1]:
+ values[1] ^= 1
+ if values[0] == values[2] or values[1] == values[2]:
+ values[2] ^= 1
+ if values[0] == values[2] or values[1] == values[2]:
+ values[2] ^= 2
+ return values, int.from_bytes(hashes[4 * self.k:], 'big')
+
+ def _change(self, key, count):
+ indices, keyhash = self.__hash(key)
+ numkey = int.from_bytes(key, 'big')
+ for i in indices:
+ self.key_xors[i] ^= numkey
+ self.key_hash_xors[i] ^= keyhash
+ self.counts[i] += count
+
+ def list(self):
+ left = []
+ right = []
+ queue = []
+ for i in range(self.m):
+ if self.counts[i] in (1, -1):
+ queue.append(i)
+ while queue:
+ i = queue.pop()
+ c = self.counts[i]
+ if c not in (1, -1):
+ continue
+ intkey = self.key_xors[i]
+ key = intkey.to_bytes(length = self.key_size, byteorder='big')
+ indices, keyhash = self.__hash(key)
+ if self.key_hash_xors[i] != keyhash:
+ continue
+
+ for k in indices:
+ self.key_xors[k] ^= intkey
+ self.key_hash_xors[k] ^= keyhash
+ self.counts[k] -= c
+ if self.counts[k] in (1, -1):
+ queue.append(k)
+
+ if c == 1:
+ left.append(key)
+ else:
+ right.append(key)
+ for i in range(self.m):
+ if self.key_xors[i] or self.key_hash_xors[i] or self.counts[i]:
+ return False, left, right
+ return True, left, right
+
+ def subtract(self, other):
+ assert self.m == other.m
+ assert self.k == other.k
+ assert self.key_size == other.key_size
+
+ for i in range(self.m):
+ self.key_xors[i] ^= other.key_xors[i]
+ self.key_hash_xors[i] ^= other.key_hash_xors[i]
+ self.counts[i] -= other.counts[i]
+
+ def dump(self):
+ min_count = min(self.counts)
+ max_count = max(self.counts)
+ count_size = range_size(max_count - min_count)
+ data = []
+ data.extend(self.m.to_bytes(4, 'big'))
+ data.extend(self.k.to_bytes(1, 'big'))
+ data.extend(self.key_size.to_bytes(1, 'big'))
+ data.extend(self.key_hash_size.to_bytes(1, 'big'))
+ data.extend(count_size.to_bytes(1, 'big'))
+ data.extend(min_count.to_bytes(4, 'big', signed = True))
+ for i in range(self.m):
+ data.extend((self.counts[i] - min_count).to_bytes(count_size, 'big'))
+ data.extend(self.key_hash_xors[i].to_bytes(self.key_hash_size, 'big'))
+ data.extend(self.key_xors[i].to_bytes(self.key_size, 'big'))
+ return bytes(data)
+
+ @classmethod
+ def load(cls, data):
+ self = cls.__new__(cls)
+ self.m = int.from_bytes(data[:4], 'big')
+ self.k = int.from_bytes(data[4:5], 'big')
+ self.key_size = int.from_bytes(data[5:6], 'big')
+ self.key_hash_size = int.from_bytes(data[6:7], 'big')
+ count_size = int.from_bytes(data[7:8], 'big')
+ min_count = int.from_bytes(data[8:12], 'big', signed = True)
+ pos = 12
+ self.counts = []
+ self.key_hash_xors = []
+ self.key_xors = []
+ for i in range(self.m):
+ self.counts.append(min_count + int.from_bytes(data[pos:pos+count_size], 'big'))
+ pos += count_size
+ self.key_hash_xors.append(int.from_bytes(data[pos:pos+self.key_hash_size], 'big'))
+ pos += self.key_hash_size
+ self.key_xors.append(int.from_bytes(data[pos:pos+self.key_size], 'big'))
+ pos += self.key_size
+ return self, pos
+
+ def compatible(self, other):
+ return self.m == other.m and self.k == other.k and self.key_size == other.key_size and self.key_hash_size == other.key_hash_size
+
+ def __eq__(self, other):
+ if not self.compatible(other):
+ return False
+ return self.counts == other.counts and self.key_xors == other.key_xors and self.key_hash_xors == other.key_hash_xors
+
+def ffs(x):
+ return (x&-x).bit_length() - 1
+
+class estimator:
+ def __init__(self, stratas = 32):
+ self.stratas = stratas
+ self.key_size = self.stratas // 8
+ assert self.stratas <= 256
+ self.strata_size = 120
+ self.k = 3
+ self.iblts = [iblt(self.strata_size, self.k, self.key_size) for n in range(self.stratas)]
+
+ def insert(self, key):
+ self._change(key, 1)
+
+ def remove(self, key):
+ self._change(key, -1)
+
+ def _change(self, key, count):
+ h = self.__hash(key)
+ lowest = ffs(h) if h else self.stratas - 1
+ self.iblts[lowest]._change(bytes(h.to_bytes(self.key_size, 'big')), count)
+
+ def __hash(self, key):
+ return int.from_bytes(hashlib.blake2b(key, digest_size=self.key_size).digest(), 'big')
+
+ def dump(self):
+ data = []
+ data.extend(self.stratas.to_bytes(2, 'big'))
+ data.extend(self.strata_size.to_bytes(2, 'big'))
+ for i in range(self.stratas):
+ data.extend(self.iblts[i].dump())
+ return bytes(data)
+
+ @classmethod
+ def load(cls, data):
+ self = cls.__new__(cls)
+ self.stratas = int.from_bytes(data[:2], 'big')
+ self.strata_size = int.from_bytes(data[2:4], 'big')
+ self.key_size = self.stratas // 8
+ assert self.stratas <= 256
+ self.k = 3
+ data = data[4:]
+ self.iblts = []
+ for i in range(self.stratas):
+ inst, pos = iblt.load(data)
+ self.iblts.append(inst)
+ data = data[pos:]
+ return self
+
+ def compare(self, other):
+ assert self.stratas == other.stratas and self.strata_size == other.strata_size and self.k == other.k
+ estimate = 0
+ for i in reversed(range(self.stratas)):
+ iblt = copy.deepcopy(self.iblts[i])
+ #iblt = self.iblts[i]
+ iblt.subtract(other.iblts[i])
+ success, ours, theirs = iblt.list()
+ if success:
+ estimate += len(ours) + len(theirs)
+ else:
+ estimate <<= i + 1
+ break
+ return estimate
@@ -1748,6 +1748,17 @@
Current handle changeset discovery only, will change handle all discovery
at some point."""
+
+ if not pullop.heads:
+ from . import setdiscovery
+ success, mychanges, theirchanges, common, rheads = setdiscovery.findsetdifferences(pullop.repo.ui, pullop.repo, pullop.remote)
+ if success:
+ tonode = pullop.repo.unfiltered().changelog.node
+ pullop.common = common
+ pullop.rheads = rheads
+ pullop.fetch = bool(theirchanges)
+ return
+
tmp = discovery.findcommonincoming(
pullop.repo, pullop.remote, heads=pullop.heads, force=pullop.force
)
@@ -1001,7 +1001,7 @@
@command(
b'debugdiscovery',
[
- (b'', b'old', None, _(b'use old-style discovery')),
+ (b'', b'protocol', b'set', _(b'use given discovery protocol')),
(
b'',
b'nonheads',
@@ -1116,7 +1116,7 @@
repo = repo.filtered(b'debug-discovery-local-filter')
data = {}
- if opts.get(b'old'):
+ if opts[b'protocol'] == b'tree':
def doit(pushedrevs, remoteheads, remote=remote):
if not util.safehasattr(remote, b'branches'):
@@ -1137,7 +1137,7 @@
common = {clnode(r) for r in common}
return common, hds
- else:
+ elif opts[b'protocol'] == b'set':
def doit(pushedrevs, remoteheads, remote=remote):
nodes = None
@@ -1149,6 +1149,19 @@
)
return common, hds
+ elif opts[b'protocol'] == b'iblt':
+
+ def doit(pushedrevs, remoteheads, remote=remote):
+ success, _, _, common, rheads = setdiscovery.findsetdifferences(ui, repo, remote)
+ assert success
+ return common, rheads
+
+ else:
+ raise error.InputError(
+ _(b"Unknown or unsupported discovery protocol")
+ )
+
+
remoterevs, _checkout = hg.addbranchrevs(repo, remote, branches, revs=None)
localrevs = opts[b'rev']
@@ -1220,6 +1233,8 @@
fm.data(**pycompat.strkwargs(data))
# display discovery summary
fm.plain(b"elapsed time: %(elapsed)f seconds\n" % data)
+ fm.end()
+ return
fm.plain(b"round-trips: %(total-roundtrips)9d\n" % data)
fm.plain(b"queries: %(total-queries)9d\n" % data)
fm.plain(b"heads summary:\n")
@@ -583,18 +583,31 @@
the changes; it closes both the original "peer" and the one returned
here.
"""
- tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads, force=force)
- common, incoming, rheads = tmp
- if not incoming:
- try:
- if bundlename:
- os.unlink(bundlename)
- except OSError:
- pass
- return repo, [], peer.close
+ success = False
+ if not onlyheads:
+ from . import setdiscovery
+ success, mychanges, theirchanges, common, rheads = setdiscovery.findsetdifferences(ui, repo, peer)
+ if success and not theirchanges:
+ try:
+ if bundlename:
+ os.unlink(bundlename)
+ except OSError:
+ pass
+ return repo, [], peer.close
- commonset = set(common)
- rheads = [x for x in rheads if x not in commonset]
+ if not success:
+ tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads, force=force)
+ common, incoming, rheads = tmp
+ if not incoming:
+ try:
+ if bundlename:
+ os.unlink(bundlename)
+ except OSError:
+ pass
+ return repo, [], peer.close
+
+ commonset = set(common)
+ rheads = [x for x in rheads if x not in commonset]
bundle = None
bundlerepo = None