Patchwork [15,of,15] streamclone: also stream caches to the client

login
register
mail settings
Submitter Boris Feld
Date Jan. 19, 2018, 8:08 p.m.
Message ID <cc93d342d0a692565edc.1516392539@FB>
Download mbox | patch
Permalink /patch/26969/
State Deferred, archived
Headers show

Comments

Boris Feld - Jan. 19, 2018, 8:08 p.m.
# HG changeset patch
# User Boris Feld <boris.feld@octobus.net>
# Date 1516233012 -3600
#      Thu Jan 18 00:50:12 2018 +0100
# Node ID cc93d342d0a692565edc6a1c8cf8acdea36a0980
# Parent  fec6950ccabdd6d93484732b341ae06697954890
# EXP-Topic b2-stream
# Available At https://bitbucket.org/octobus/mercurial-devel/
#              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r cc93d342d0a6
streamclone: also stream caches to the client

When stream clone is used over bundle2, relevant cache files are also streamed.
This is expected to be a massive performance win for clone since no important
cache will have to be recomputed.
Augie Fackler - Jan. 19, 2018, 8:39 p.m.
On Fri, Jan 19, 2018 at 09:08:59PM +0100, Boris Feld wrote:
> # HG changeset patch
> # User Boris Feld <boris.feld@octobus.net>
> # Date 1516233012 -3600
> #      Thu Jan 18 00:50:12 2018 +0100
> # Node ID cc93d342d0a692565edc6a1c8cf8acdea36a0980
> # Parent  fec6950ccabdd6d93484732b341ae06697954890
> # EXP-Topic b2-stream
> # Available At https://bitbucket.org/octobus/mercurial-devel/
> #              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r cc93d342d0a6
> streamclone: also stream caches to the client
>
> When stream clone is used over bundle2, relevant cache files are also streamed.
> This is expected to be a massive performance win for clone since no important
> cache will have to be recomputed.

Some numbers here would be nice.

>
> diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
> --- a/mercurial/streamclone.py
> +++ b/mercurial/streamclone.py
> @@ -11,10 +11,12 @@ import contextlib
>  import os
>  import struct
>  import tempfile
> +import warnings
>
>  from .i18n import _
>  from . import (
>      branchmap,
> +    cacheutil,
>      error,
>      phases,
>      store,
> @@ -435,6 +437,10 @@ class streamcloneapplier(object):
>  _fileappend = 0 # append only file
>  _filefull = 1   # full snapshot file
>
> +# Source of the file
> +_srcstore = 's' # store (svfs)
> +_srccache = 'c' # cache (cache)
> +
>  # This is it's own function so extensions can override it.
>  def _walkstreamfullstorefiles(repo):
>      """list snapshot file from the store"""
> @@ -443,12 +449,12 @@ def _walkstreamfullstorefiles(repo):
>          fnames.append('phaseroots')
>      return fnames
>
> -def _filterfull(entry, copy, vfs):
> +def _filterfull(entry, copy, vfsmap):
>      """actually copy the snapshot files"""
> -    name, ftype, data = entry
> +    src, name, ftype, data = entry
>      if ftype != _filefull:
>          return entry
> -    return (name, ftype, copy(vfs.join(name)))
> +    return (src, name, ftype, copy(vfsmap[src].join(name)))
>
>  @contextlib.contextmanager
>  def maketempcopies():
> @@ -466,19 +472,33 @@ def maketempcopies():
>          for tmp in files:
>              util.tryunlink(tmp)
>
> +def _makemap(repo):
> +    """make a (src -> vfs) map for the repo"""
> +    vfsmap = {
> +        _srcstore: repo.svfs,
> +        _srccache: repo.cachevfs,
> +    }
> +    # we keep repo.vfs out of the on purpose, ther are too many danger there
> +    # (eg: .hg/hgrc)
> +    assert repo.vfs not in vfsmap.values()
> +
> +    return vfsmap
> +
>  def _emit(repo, entries, totalfilesize):
>      """actually emit the stream bundle"""
> -    vfs = repo.svfs
> +    vfsmap = _makemap(repo)
>      progress = repo.ui.progress
>      progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
>      with maketempcopies() as copy:
>          try:
>              # copy is delayed until we are in the try
> -            entries = [_filterfull(e, copy, vfs) for e in entries]
> +            entries = [_filterfull(e, copy, vfsmap) for e in entries]
>              yield None # this release the lock on the repository
>              seen = 0
>
> -            for name, ftype, data in entries:
> +            for src, name, ftype, data in entries:
> +                vfs = vfsmap[src]
> +                yield src
>                  yield util.uvarintencode(len(name))
>                  if ftype == _fileappend:
>                      fp = vfs(name)
> @@ -507,10 +527,11 @@ def generatev2(repo):
>      """Emit content for version 2 of a streaming clone.
>
>      the data stream consists the following entries:
> -    1) A varint containing the length of the filename
> -    2) A varint containing the length of file data
> -    3) N bytes containing the filename (the internal, store-agnostic form)
> -    4) N bytes containing the file data
> +    1) A char representing the file destination (eg: store or cache)
> +    2) A varint containing the length of the filename
> +    3) A varint containing the length of file data
> +    4) N bytes containing the filename (the internal, store-agnostic form)
> +    5) N bytes containing the file data
>
>      Returns a 3-tuple of (file count, file size, data iterator).
>      """
> @@ -523,12 +544,16 @@ def generatev2(repo):
>          repo.ui.debug('scanning\n')
>          for name, ename, size in _walkstreamfiles(repo):
>              if size:
> -                entries.append((name, _fileappend, size))
> +                entries.append((_srcstore, name, _fileappend, size))
>                  totalfilesize += size
>          for name in _walkstreamfullstorefiles(repo):
>              if repo.svfs.exists(name):
>                  totalfilesize += repo.svfs.lstat(name).st_size
> -                entries.append((name, _filefull, None))
> +                entries.append((_srcstore, name, _filefull, None))
> +        for name in cacheutil.cachetocopy(repo):
> +            if repo.cachevfs.exists(name):
> +                totalfilesize += repo.cachevfs.lstat(name).st_size
> +                entries.append((_srccache, name, _filefull, None))
>
>          chunks = _emit(repo, entries, totalfilesize)
>          first = next(chunks)
> @@ -536,6 +561,16 @@ def generatev2(repo):
>
>      return len(entries), totalfilesize, chunks
>
> +@contextlib.contextmanager
> +def nested(*ctxs):
> +    with warnings.catch_warnings():
> +        # For some reason, Python decided 'nested' was deprecated without
> +        # replacement. They officially advertised for filtering the deprecation
> +        # warning for people who actually need the feature.
> +        warnings.filterwarnings("ignore",category=DeprecationWarning)
> +        with contextlib.nested(*ctxs):
> +            yield
> +
>  def consumev2(repo, fp, filecount, filesize):
>      """Apply the contents from a version 2 streaming clone.
>
> @@ -552,19 +587,23 @@ def consumev2(repo, fp, filecount, files
>
>          progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
>
> -        vfs = repo.svfs
> +        vfsmap = _makemap(repo)
>
>          with repo.transaction('clone'):
> -            with vfs.backgroundclosing(repo.ui):
> +            ctxs = (vfs.backgroundclosing(repo.ui)
> +                    for vfs in vfsmap.values())
> +            with nested(*ctxs):
>                  for i in range(filecount):
> +                    src = fp.read(1)
> +                    vfs = vfsmap[src]
>                      namelen = util.uvarintdecodestream(fp)
>                      datalen = util.uvarintdecodestream(fp)
>
>                      name = fp.read(namelen)
>
>                      if repo.ui.debugflag:
> -                        repo.ui.debug('adding %s (%s)\n' %
> -                                      (name, util.bytecount(datalen)))
> +                        repo.ui.debug('adding [%s] %s (%s)\n' %
> +                                      (src, name, util.bytecount(datalen)))
>
>                      with vfs(name, 'w') as ofp:
>                          for chunk in util.filechunkiter(fp, limit=datalen):
> diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t
> --- a/tests/test-clone-uncompressed.t
> +++ b/tests/test-clone-uncompressed.t
> @@ -38,8 +38,13 @@ Basic clone
>  #if stream-bundle2
>    $ hg clone --stream -U http://localhost:$HGPORT clone1
>    streaming all changes
> -  1027 files to transfer, 96.3 KB of data
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> +  1030 files to transfer, 96.4 KB of data
> +  transferred 96.4 KB in * seconds (* */sec) (glob)
> +
> +  $ ls -1 clone1/.hg/cache
> +  branch2-served
> +  rbc-names-v1
> +  rbc-revs-v1
>  #endif
>
>  --uncompressed is an alias to --stream
> @@ -55,8 +60,8 @@ Basic clone
>  #if stream-bundle2
>    $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
>    streaming all changes
> -  1027 files to transfer, 96.3 KB of data
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> +  1030 files to transfer, 96.4 KB of data
> +  transferred 96.4 KB in * seconds (* */sec) (glob)
>  #endif
>
>  Clone with background file closing enabled
> @@ -95,10 +100,11 @@ Clone with background file closing enabl
>    bundle2-input-bundle: with-transaction
>    bundle2-input-part: "stream" (params: 4 mandatory) supported
>    applying stream bundle
> -  1027 files to transfer, 96.3 KB of data
> +  1030 files to transfer, 96.4 KB of data
> +  starting 4 threads for background file closing
>    starting 4 threads for background file closing
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> -  bundle2-input-part: total payload size 110887
> +  transferred 96.4 KB in * seconds (* */sec) (glob)
> +  bundle2-input-part: total payload size 112077
>    bundle2-input-part: "listkeys" (params: 1 mandatory) supported
>    bundle2-input-bundle: 1 parts total
>    checking for updated bookmarks
> @@ -136,8 +142,8 @@ Streaming of secrets can be overridden b
>  #if stream-bundle2
>    $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
>    streaming all changes
> -  1027 files to transfer, 96.3 KB of data
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> +  1030 files to transfer, 96.4 KB of data
> +  transferred 96.4 KB in * seconds (* */sec) (glob)
>  #endif
>
>    $ killdaemons.py
> @@ -253,8 +259,8 @@ clone it
>  #if stream-bundle2
>    $ hg clone --stream http://localhost:$HGPORT with-bookmarks
>    streaming all changes
> -  1027 files to transfer, 96.3 KB of data
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> +  1033 files to transfer, 96.6 KB of data
> +  transferred 96.6 KB in * seconds (* */sec) (glob)
>    updating to branch default
>    1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
>  #endif
> @@ -283,8 +289,8 @@ Clone as publishing
>  #if stream-bundle2
>    $ hg clone --stream http://localhost:$HGPORT phase-publish
>    streaming all changes
> -  1027 files to transfer, 96.3 KB of data
> -  transferred 96.3 KB in * seconds (* */sec) (glob)
> +  1033 files to transfer, 96.6 KB of data
> +  transferred 96.6 KB in * seconds (* */sec) (glob)
>    updating to branch default
>    1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
>  #endif
> @@ -318,8 +324,8 @@ Clone as non publishing
>  #if stream-bundle2
>    $ hg clone --stream http://localhost:$HGPORT phase-no-publish
>    streaming all changes
> -  1028 files to transfer, 96.4 KB of data
> -  transferred 96.4 KB in * seconds (* */sec) (glob)
> +  1034 files to transfer, 96.7 KB of data
> +  transferred 96.7 KB in * seconds (* */sec) (glob)
>    updating to branch default
>    1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
>    $ hg -R phase-no-publish phase -r 'all()'
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Patch

diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -11,10 +11,12 @@  import contextlib
 import os
 import struct
 import tempfile
+import warnings
 
 from .i18n import _
 from . import (
     branchmap,
+    cacheutil,
     error,
     phases,
     store,
@@ -435,6 +437,10 @@  class streamcloneapplier(object):
 _fileappend = 0 # append only file
 _filefull = 1   # full snapshot file
 
+# Source of the file
+_srcstore = 's' # store (svfs)
+_srccache = 'c' # cache (cache)
+
 # This is it's own function so extensions can override it.
 def _walkstreamfullstorefiles(repo):
     """list snapshot file from the store"""
@@ -443,12 +449,12 @@  def _walkstreamfullstorefiles(repo):
         fnames.append('phaseroots')
     return fnames
 
-def _filterfull(entry, copy, vfs):
+def _filterfull(entry, copy, vfsmap):
     """actually copy the snapshot files"""
-    name, ftype, data = entry
+    src, name, ftype, data = entry
     if ftype != _filefull:
         return entry
-    return (name, ftype, copy(vfs.join(name)))
+    return (src, name, ftype, copy(vfsmap[src].join(name)))
 
 @contextlib.contextmanager
 def maketempcopies():
@@ -466,19 +472,33 @@  def maketempcopies():
         for tmp in files:
             util.tryunlink(tmp)
 
+def _makemap(repo):
+    """make a (src -> vfs) map for the repo"""
+    vfsmap = {
+        _srcstore: repo.svfs,
+        _srccache: repo.cachevfs,
+    }
+    # we keep repo.vfs out of the on purpose, ther are too many danger there
+    # (eg: .hg/hgrc)
+    assert repo.vfs not in vfsmap.values()
+
+    return vfsmap
+
 def _emit(repo, entries, totalfilesize):
     """actually emit the stream bundle"""
-    vfs = repo.svfs
+    vfsmap = _makemap(repo)
     progress = repo.ui.progress
     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
     with maketempcopies() as copy:
         try:
             # copy is delayed until we are in the try
-            entries = [_filterfull(e, copy, vfs) for e in entries]
+            entries = [_filterfull(e, copy, vfsmap) for e in entries]
             yield None # this release the lock on the repository
             seen = 0
 
-            for name, ftype, data in entries:
+            for src, name, ftype, data in entries:
+                vfs = vfsmap[src]
+                yield src
                 yield util.uvarintencode(len(name))
                 if ftype == _fileappend:
                     fp = vfs(name)
@@ -507,10 +527,11 @@  def generatev2(repo):
     """Emit content for version 2 of a streaming clone.
 
     the data stream consists the following entries:
-    1) A varint containing the length of the filename
-    2) A varint containing the length of file data
-    3) N bytes containing the filename (the internal, store-agnostic form)
-    4) N bytes containing the file data
+    1) A char representing the file destination (eg: store or cache)
+    2) A varint containing the length of the filename
+    3) A varint containing the length of file data
+    4) N bytes containing the filename (the internal, store-agnostic form)
+    5) N bytes containing the file data
 
     Returns a 3-tuple of (file count, file size, data iterator).
     """
@@ -523,12 +544,16 @@  def generatev2(repo):
         repo.ui.debug('scanning\n')
         for name, ename, size in _walkstreamfiles(repo):
             if size:
-                entries.append((name, _fileappend, size))
+                entries.append((_srcstore, name, _fileappend, size))
                 totalfilesize += size
         for name in _walkstreamfullstorefiles(repo):
             if repo.svfs.exists(name):
                 totalfilesize += repo.svfs.lstat(name).st_size
-                entries.append((name, _filefull, None))
+                entries.append((_srcstore, name, _filefull, None))
+        for name in cacheutil.cachetocopy(repo):
+            if repo.cachevfs.exists(name):
+                totalfilesize += repo.cachevfs.lstat(name).st_size
+                entries.append((_srccache, name, _filefull, None))
 
         chunks = _emit(repo, entries, totalfilesize)
         first = next(chunks)
@@ -536,6 +561,16 @@  def generatev2(repo):
 
     return len(entries), totalfilesize, chunks
 
+@contextlib.contextmanager
+def nested(*ctxs):
+    with warnings.catch_warnings():
+        # For some reason, Python decided 'nested' was deprecated without
+        # replacement. They officially advertised for filtering the deprecation
+        # warning for people who actually need the feature.
+        warnings.filterwarnings("ignore",category=DeprecationWarning)
+        with contextlib.nested(*ctxs):
+            yield
+
 def consumev2(repo, fp, filecount, filesize):
     """Apply the contents from a version 2 streaming clone.
 
@@ -552,19 +587,23 @@  def consumev2(repo, fp, filecount, files
 
         progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
 
-        vfs = repo.svfs
+        vfsmap = _makemap(repo)
 
         with repo.transaction('clone'):
-            with vfs.backgroundclosing(repo.ui):
+            ctxs = (vfs.backgroundclosing(repo.ui)
+                    for vfs in vfsmap.values())
+            with nested(*ctxs):
                 for i in range(filecount):
+                    src = fp.read(1)
+                    vfs = vfsmap[src]
                     namelen = util.uvarintdecodestream(fp)
                     datalen = util.uvarintdecodestream(fp)
 
                     name = fp.read(namelen)
 
                     if repo.ui.debugflag:
-                        repo.ui.debug('adding %s (%s)\n' %
-                                      (name, util.bytecount(datalen)))
+                        repo.ui.debug('adding [%s] %s (%s)\n' %
+                                      (src, name, util.bytecount(datalen)))
 
                     with vfs(name, 'w') as ofp:
                         for chunk in util.filechunkiter(fp, limit=datalen):
diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t
--- a/tests/test-clone-uncompressed.t
+++ b/tests/test-clone-uncompressed.t
@@ -38,8 +38,13 @@  Basic clone
 #if stream-bundle2
   $ hg clone --stream -U http://localhost:$HGPORT clone1
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
+
+  $ ls -1 clone1/.hg/cache
+  branch2-served
+  rbc-names-v1
+  rbc-revs-v1
 #endif
 
 --uncompressed is an alias to --stream
@@ -55,8 +60,8 @@  Basic clone
 #if stream-bundle2
   $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
 #endif
 
 Clone with background file closing enabled
@@ -95,10 +100,11 @@  Clone with background file closing enabl
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream" (params: 4 mandatory) supported
   applying stream bundle
-  1027 files to transfer, 96.3 KB of data
+  1030 files to transfer, 96.4 KB of data
+  starting 4 threads for background file closing
   starting 4 threads for background file closing
-  transferred 96.3 KB in * seconds (* */sec) (glob)
-  bundle2-input-part: total payload size 110887
+  transferred 96.4 KB in * seconds (* */sec) (glob)
+  bundle2-input-part: total payload size 112077
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
   bundle2-input-bundle: 1 parts total
   checking for updated bookmarks
@@ -136,8 +142,8 @@  Streaming of secrets can be overridden b
 #if stream-bundle2
   $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
 #endif
 
   $ killdaemons.py
@@ -253,8 +259,8 @@  clone it
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT with-bookmarks
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1033 files to transfer, 96.6 KB of data
+  transferred 96.6 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
@@ -283,8 +289,8 @@  Clone as publishing
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-publish
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1033 files to transfer, 96.6 KB of data
+  transferred 96.6 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
@@ -318,8 +324,8 @@  Clone as non publishing
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1028 files to transfer, 96.4 KB of data
-  transferred 96.4 KB in * seconds (* */sec) (glob)
+  1034 files to transfer, 96.7 KB of data
+  transferred 96.7 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'