Patchwork [12,of,14] streamclone: add support for cloning non append-only file

login
register
mail settings
Submitter Boris Feld
Date Jan. 18, 2018, 11:21 a.m.
Message ID <504368f80f4213a76f72.1516274498@FB>
Download mbox | patch
Permalink /patch/26855/
State Superseded
Headers show

Comments

Boris Feld - Jan. 18, 2018, 11:21 a.m.
# HG changeset patch
# User Boris Feld <boris.feld@octobus.net>
# Date 1516233002 -3600
#      Thu Jan 18 00:50:02 2018 +0100
# Node ID 504368f80f4213a76f72b0e0b1e316186004d20c
# Parent  d3bfe88b1b56dc7bae6fbd302e5cf345f4e43bfa
# EXP-Topic b2-stream
# Available At https://bitbucket.org/octobus/mercurial-devel/
#              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 504368f80f42
streamclone: add support for cloning non append-only file

The phaseroots are stored in a non append-only file in the repository. We
include them in the stream too. Since they are not append-only, we have to
keep a copy around while we hold the lock to be able to stream them later.

Since phase get exchanged within the stream we can skip requesting them
independently.

As a side effect, this will fixes issue5648 once the feature is enabled by
default.

Patch

diff --git a/mercurial/exchange.py b/mercurial/exchange.py
--- a/mercurial/exchange.py
+++ b/mercurial/exchange.py
@@ -1465,6 +1465,7 @@  def _pullbundle2(pullop):
         kwargs['cg'] = False
         kwargs['stream'] = True
         pullop.stepsdone.add('changegroup')
+        pullop.stepsdone.add('phases')
 
     else:
         # pulling changegroup
@@ -1472,15 +1473,15 @@  def _pullbundle2(pullop):
 
         kwargs['cg'] = pullop.fetch
 
-    legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
-    hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
-    if (not legacyphase and hasbinaryphase):
-        kwargs['phases'] = True
-        pullop.stepsdone.add('phases')
+        legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
+        hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
+        if (not legacyphase and hasbinaryphase):
+            kwargs['phases'] = True
+            pullop.stepsdone.add('phases')
 
-    if 'listkeys' in pullop.remotebundle2caps:
-        if 'phases' not in pullop.stepsdone:
-            kwargs['listkeys'] = ['phases']
+        if 'listkeys' in pullop.remotebundle2caps:
+            if 'phases' not in pullop.stepsdone:
+                kwargs['listkeys'] = ['phases']
 
     bookmarksrequested = False
     legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -7,7 +7,10 @@ 
 
 from __future__ import absolute_import
 
+import contextlib
+import os
 import struct
+import tempfile
 
 from .i18n import _
 from . import (
@@ -428,32 +431,77 @@  class streamcloneapplier(object):
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
 
+# type of file to stream
+_fileappend = 0 # append only file
+_filefull = 1   # full snapshot file
+
+# This is it's own function so extensions can override it.
+def _walkstreamfullstorefiles(repo):
+    """list snapshot file from the store"""
+    fnames = []
+    if not repo.publishing():
+        fnames.append('phaseroots')
+    return fnames
+
+def _filterfull(entry, copy, vfs):
+    """actually copy the snapshot files"""
+    name, ftype, data = entry
+    if ftype != _filefull:
+        return entry
+    return (name, ftype, copy(vfs.join(name)))
+
+@contextlib.contextmanager
+def maketempcopies():
+    """return a function to temporary copy file"""
+    files = []
+    try:
+        def copy(src):
+            fd, dst = tempfile.mkstemp()
+            os.close(fd)
+            files.append(dst)
+            util.copyfiles(src, dst, hardlink=True)
+            return dst
+        yield copy
+    finally:
+        for tmp in files:
+            util.tryunlink(tmp)
+
 def _emit(repo, entries, totalfilesize):
     """actually emit the stream bundle"""
+    vfs = repo.svfs
     progress = repo.ui.progress
     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
-    vfs = repo.svfs
-    try:
-        seen = 0
-        for name, size in entries:
-            yield util.uvarintencode(len(name))
-            fp = vfs(name)
-            try:
-                yield util.uvarintencode(size)
-                yield name
-                if size <= 65536:
-                    chunks = (fp.read(size),)
-                else:
-                    chunks = util.filechunkiter(fp, limit=size)
-                for chunk in chunks:
-                    seen += len(chunk)
-                    progress(_('bundle'), seen, total=totalfilesize,
-                             unit=_('bytes'))
-                    yield chunk
-            finally:
-                fp.close()
-    finally:
-        progress(_('bundle'), None)
+    with maketempcopies() as copy:
+        try:
+            # copy is delayed until we are in the try
+            entries = [_filterfull(e, copy, vfs) for e in entries]
+            yield None # this release the lock on the repository
+            seen = 0
+
+            for name, ftype, data in entries:
+                yield util.uvarintencode(len(name))
+                if ftype == _fileappend:
+                    fp = vfs(name)
+                    size = data
+                elif ftype == _filefull:
+                    fp = open(data, 'rb')
+                    size = util.fstat(fp).st_size
+                try:
+                    yield util.uvarintencode(size)
+                    yield name
+                    if size <= 65536:
+                        chunks = (fp.read(size),)
+                    else:
+                        chunks = util.filechunkiter(fp, limit=size)
+                    for chunk in chunks:
+                        seen += len(chunk)
+                        progress(_('bundle'), seen, total=totalfilesize,
+                                 unit=_('bytes'))
+                        yield chunk
+                finally:
+                    fp.close()
+        finally:
+            progress(_('bundle'), None)
 
 def generatev2(repo):
     """Emit content for version 2 of a streaming clone.
@@ -475,10 +523,16 @@  def generatev2(repo):
         repo.ui.debug('scanning\n')
         for name, ename, size in _walkstreamfiles(repo):
             if size:
-                entries.append((name, size))
+                entries.append((name, _fileappend, size))
                 totalfilesize += size
+        for name in _walkstreamfullstorefiles(repo):
+            if repo.svfs.exists(name):
+                totalfilesize += repo.svfs.lstat(name).st_size
+                entries.append((name, _filefull, None))
 
         chunks = _emit(repo, entries, totalfilesize)
+        first = next(chunks)
+        assert first is None
 
     return len(entries), totalfilesize, chunks
 
diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t
--- a/tests/test-clone-uncompressed.t
+++ b/tests/test-clone-uncompressed.t
@@ -100,9 +100,7 @@  Clone with background file closing enabl
   transferred 96.3 KB in * seconds (* */sec) (glob)
   bundle2-input-part: total payload size 110887
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
-  bundle2-input-part: "phase-heads" supported
-  bundle2-input-part: total payload size 24
-  bundle2-input-bundle: 2 parts total
+  bundle2-input-bundle: 1 parts total
   checking for updated bookmarks
 #endif
 
@@ -316,11 +314,11 @@  Clone as non publishing
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1028 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
-  0: public
-  1: public
+  0: draft
+  1: draft
 #endif