Patchwork [03,of,14] streamclone: define first iteration of version 2 of stream format

login
register
mail settings
Submitter Boris Feld
Date Jan. 18, 2018, 11:21 a.m.
Message ID <b67de9629f6845041c84.1516274489@FB>
Download mbox | patch
Permalink /patch/26845/
State Superseded
Headers show

Comments

Boris Feld - Jan. 18, 2018, 11:21 a.m.
# HG changeset patch
# User Boris Feld <boris.feld@octobus.net>
# Date 1516232936 -3600
#      Thu Jan 18 00:48:56 2018 +0100
# Node ID b67de9629f6845041c84b9c1d868ba91c0850d79
# Parent  1262be5f656bb0597b59a7dd2139b81922829fae
# EXP-Topic b2-stream
# Available At https://bitbucket.org/octobus/mercurial-devel/
#              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r b67de9629f68
streamclone: define first iteration of version 2 of stream format

(This patch is based on a first draft from Gregory Szorc, with deeper rework)

Version 1 of the stream clone format was invented many years ago and suffers
from a few deficiencies:

1) Filenames are stored in store-encoded (on filesystem) form rather than in
   their internal form. This makes future compatibility with new store
   filename encodings more difficult.
2) File entry "headers" consist of a newline of the file name followed by the
   string file size. Converting strings to integers is avoidable overhead. We
   can't store filenames with newlines (manifests have this limitation as
   well, so it isn't a major concern). But the big concern here is the
   necessity for readline(). Scanning for newlines means reading ahead and
   that means extra buffer allocations and slicing (in Python) and this makes
   performance suffer.
3) Filenames aren't compressed optimally. Filenames should be compressed well
   since there is a lot of repeated data. However, since they are scattered
   all over the stream (with revlog data in between), they typically fall
   outside the window size of the compressor and don't compress.
4) It can only exchange stored based content, being able to exchange caches
   too would be nice.
5) It is limited to a stream-based protocol and isn't suitable for an on-disk
   format for general repository reading because the offset of individual file
   entries requires scanning the entire file to find file records.

As part of enabling streaming clones to work in bundle2, #2 proved to have a
significant negative impact on performance. Since bundle2 provides the
opportunity to start fresh, Gregory Szorc figured he would take the
opportunity to invent a new streaming clone data format.

The new format devised in this series addresses #1, #2, and #4. It punts on #3
because it was complex without yielding a significant gain and on #5 because
devising a new store format that "packs" multiple revlogs into a single
"packed revlog" is massive scope bloat. However, this v2 format might be
suitable for streaming into a "packed revlog" with minimal processing. If it
works, great. If not, we can always invent stream format when it is needed.

This patch only introduces the bases of the format. We'll get it usable through
bundle2 first, then we'll extend the format in future patches to bring it to its
full potential (especially #4).

Patch

diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -428,3 +428,115 @@  class streamcloneapplier(object):
 
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
+
+def _emit(repo, entries, totalfilesize):
+    """actually emit the stream bundle"""
+    progress = repo.ui.progress
+    progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
+    vfs = repo.svfs
+    try:
+        seen = 0
+        for name, size in entries:
+            yield util.uvarintencode(len(name))
+            fp = vfs(name)
+            try:
+                yield util.uvarintencode(size)
+                yield name
+                if size <= 65536:
+                    chunks = (fp.read(size),)
+                else:
+                    chunks = util.filechunkiter(fp, limit=size)
+                for chunk in chunks:
+                    seen += len(chunk)
+                    progress(_('bundle'), seen, total=totalfilesize,
+                             unit=_('bytes'))
+                    yield chunk
+            finally:
+                fp.close()
+    finally:
+        progress(_('bundle'), None)
+
+def generatev2(repo):
+    """Emit content for version 2 of a streaming clone.
+
+    the data stream consists the following entries:
+    1) A varint containing the length of the filename
+    2) A varint containing the length of file data
+    3) N bytes containing the filename (the internal, store-agnostic form)
+    4) N bytes containing the file data
+
+    Returns a 3-tuple of (file count, file size, data iterator).
+    """
+
+    with repo.lock():
+
+        entries = []
+        totalfilesize = 0
+
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                totalfilesize += size
+
+        chunks = _emit(repo, entries, totalfilesize)
+
+    return len(entries), totalfilesize, chunks
+
+def consumev2(repo, fp, filecount, filesize):
+    """Apply the contents from a version 2 streaming clone.
+
+    Data is read from an object that only needs to provide a ``read(size)``
+    method.
+    """
+    with repo.lock():
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (filecount, util.bytecount(filesize)))
+
+        start = util.timer()
+        handledbytes = 0
+        progress = repo.ui.progress
+
+        progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
+
+        vfs = repo.svfs
+
+        with repo.transaction('clone'):
+            with vfs.backgroundclosing(repo.ui):
+                for i in range(filecount):
+                    namelen = util.uvarintdecodestream(fp)
+                    datalen = util.uvarintdecodestream(fp)
+
+                    name = fp.read(namelen)
+
+                    if repo.ui.debugflag:
+                        repo.ui.debug('adding %s (%s)\n' %
+                                      (name, util.bytecount(datalen)))
+
+                    with vfs(name, 'w') as ofp:
+                        for chunk in util.filechunkiter(fp, limit=datalen):
+                            handledbytes += len(chunk)
+                            progress(_('clone'), handledbytes, total=filesize,
+                                     unit=_('bytes'))
+                            ofp.write(chunk)
+
+            # force @filecache properties to be reloaded from
+            # streamclone-ed file at next access
+            repo.invalidate(clearfilecache=True)
+
+        elapsed = util.timer() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(handledbytes), elapsed,
+                        util.bytecount(handledbytes / elapsed)))
+
+def applybundlev2(repo, fp, filecount, filesize, requirements):
+    missingreqs = [r for r in requirements if r not in repo.supported]
+    if missingreqs:
+        raise error.Abort(_('unable to apply stream clone: '
+                            'unsupported format: %s') %
+                          ', '.join(sorted(missingreqs)))
+
+    consumev2(repo, fp, filecount, filesize)