Patchwork [12,of,14,V3] streamclone: add support for cloning non append-only file

login
register
mail settings
Submitter Boris Feld
Date Jan. 19, 2018, 11:47 p.m.
Message ID <789f14aef1ca0b39e61a.1516405637@FB>
Download mbox | patch
Permalink /patch/26996/
State Accepted
Headers show

Comments

Boris Feld - Jan. 19, 2018, 11:47 p.m.
# HG changeset patch
# User Boris Feld <boris.feld@octobus.net>
# Date 1516233002 -3600
#      Thu Jan 18 00:50:02 2018 +0100
# Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53
# Parent  d8a918033dcfd3dcbac8635616cc1b6c12078b18
# EXP-Topic b2-stream
# Available At https://bitbucket.org/octobus/mercurial-devel/
#              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 789f14aef1ca
streamclone: add support for cloning non append-only file

The phaseroots are stored in a non append-only file in the repository. We
include them in the stream too. Since they are not append-only, we have to
keep a copy around while we hold the lock to be able to stream them later.

Since phase get exchanged within the stream we can skip requesting them
independently.

As a side effect, this will fixes issue5648 once the feature is enabled by
default.
Yuya Nishihara - Jan. 20, 2018, 10:41 a.m.
On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote:
> # HG changeset patch
> # User Boris Feld <boris.feld@octobus.net>
> # Date 1516233002 -3600
> #      Thu Jan 18 00:50:02 2018 +0100
> # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53
> # Parent  d8a918033dcfd3dcbac8635616cc1b6c12078b18
> # EXP-Topic b2-stream
> # Available At https://bitbucket.org/octobus/mercurial-devel/
> #              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 789f14aef1ca
> streamclone: add support for cloning non append-only file

> +# type of file to stream
> +_fileappend = 0 # append only file
> +_filefull = 1   # full snapshot file
> +
> +# This is it's own function so extensions can override it.
> +def _walkstreamfullstorefiles(repo):
> +    """list snapshot file from the store"""
> +    fnames = []
> +    if not repo.publishing():
> +        fnames.append('phaseroots')
> +    return fnames
> +
> +def _filterfull(entry, copy, vfs):
> +    """actually copy the snapshot files"""
> +    name, ftype, data = entry
> +    if ftype != _filefull:
> +        return entry
> +    return (name, ftype, copy(vfs.join(name)))
> +
> +@contextlib.contextmanager
> +def maketempcopies():
> +    """return a function to temporary copy file"""
> +    files = []
> +    try:
> +        def copy(src):
> +            fd, dst = tempfile.mkstemp()
> +            os.close(fd)
> +            files.append(dst)
> +            util.copyfiles(src, dst, hardlink=True)

Better to create a temp file near the repository if we want to take advantage
of hardlinks. /tmp may be a separate filesystem.

> +    with maketempcopies() as copy:
> +        try:
> +            # copy is delayed until we are in the try
> +            entries = [_filterfull(e, copy, vfs) for e in entries]

I'm not sure if this is better or worse, but maybe we can hold open file
handles (instead of copying them) given "full" files are atomically replaced
on write.
Yuya Nishihara - Jan. 20, 2018, 10:53 a.m.
On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote:
> On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote:
> > # HG changeset patch
> > # User Boris Feld <boris.feld@octobus.net>
> > # Date 1516233002 -3600
> > #      Thu Jan 18 00:50:02 2018 +0100
> > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53
> > # Parent  d8a918033dcfd3dcbac8635616cc1b6c12078b18
> > # EXP-Topic b2-stream
> > # Available At https://bitbucket.org/octobus/mercurial-devel/
> > #              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 789f14aef1ca
> > streamclone: add support for cloning non append-only file
> 
> > +# type of file to stream
> > +_fileappend = 0 # append only file
> > +_filefull = 1   # full snapshot file
> > +
> > +# This is it's own function so extensions can override it.
> > +def _walkstreamfullstorefiles(repo):
> > +    """list snapshot file from the store"""
> > +    fnames = []
> > +    if not repo.publishing():
> > +        fnames.append('phaseroots')
> > +    return fnames
> > +
> > +def _filterfull(entry, copy, vfs):
> > +    """actually copy the snapshot files"""
> > +    name, ftype, data = entry
> > +    if ftype != _filefull:
> > +        return entry
> > +    return (name, ftype, copy(vfs.join(name)))
> > +
> > +@contextlib.contextmanager
> > +def maketempcopies():
> > +    """return a function to temporary copy file"""
> > +    files = []
> > +    try:
> > +        def copy(src):
> > +            fd, dst = tempfile.mkstemp()
> > +            os.close(fd)
> > +            files.append(dst)
> > +            util.copyfiles(src, dst, hardlink=True)
> 
> Better to create a temp file near the repository if we want to take advantage
> of hardlinks. /tmp may be a separate filesystem.

Ah, but hardlink will fail anyway since destination file exists.
Boris Feld - Jan. 25, 2018, 8:35 a.m.
On Sat, 2018-01-20 at 19:53 +0900, Yuya Nishihara wrote:
> On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote:
> > On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote:
> > > # HG changeset patch
> > > # User Boris Feld <boris.feld@octobus.net>
> > > # Date 1516233002 -3600
> > > #      Thu Jan 18 00:50:02 2018 +0100
> > > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53
> > > # Parent  d8a918033dcfd3dcbac8635616cc1b6c12078b18
> > > # EXP-Topic b2-stream
> > > # Available At https://bitbucket.org/octobus/mercurial-devel/
> > > #              hg pull https://bitbucket.org/octobus/mercurial-de
> > > vel/ -r 789f14aef1ca
> > > streamclone: add support for cloning non append-only file
> > > +# type of file to stream
> > > +_fileappend = 0 # append only file
> > > +_filefull = 1   # full snapshot file
> > > +
> > > +# This is it's own function so extensions can override it.
> > > +def _walkstreamfullstorefiles(repo):
> > > +    """list snapshot file from the store"""
> > > +    fnames = []
> > > +    if not repo.publishing():
> > > +        fnames.append('phaseroots')
> > > +    return fnames
> > > +
> > > +def _filterfull(entry, copy, vfs):
> > > +    """actually copy the snapshot files"""
> > > +    name, ftype, data = entry
> > > +    if ftype != _filefull:
> > > +        return entry
> > > +    return (name, ftype, copy(vfs.join(name)))
> > > +
> > > +@contextlib.contextmanager
> > > +def maketempcopies():
> > > +    """return a function to temporary copy file"""
> > > +    files = []
> > > +    try:
> > > +        def copy(src):
> > > +            fd, dst = tempfile.mkstemp()
> > > +            os.close(fd)
> > > +            files.append(dst)
> > > +            util.copyfiles(src, dst, hardlink=True)
> > 
> > Better to create a temp file near the repository if we want to take
> > advantage
> > of hardlinks. /tmp may be a separate filesystem.
> 
> Ah, but hardlink will fail anyway since destination file exists.

Hardlinks are an option since we create the temporary file just for the
streaming. We might use a temporary directory inside the 
repository for this purpose.

It would be an improvement, but it adds some complexity and requires
having a clear distinction between files we can hardlink and those we
can't.

We think it would be better if we can ship the current version in 4.5,
so we have the next cycle to improve it properly.

Thank you for the feedback.
Yuya Nishihara - Jan. 25, 2018, 11:49 a.m.
On Thu, 25 Jan 2018 09:35:38 +0100, Boris Feld wrote:
> On Sat, 2018-01-20 at 19:53 +0900, Yuya Nishihara wrote:
> > On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote:
> > > On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote:
> > > > # HG changeset patch
> > > > # User Boris Feld <boris.feld@octobus.net>
> > > > # Date 1516233002 -3600
> > > > #      Thu Jan 18 00:50:02 2018 +0100
> > > > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53
> > > > # Parent  d8a918033dcfd3dcbac8635616cc1b6c12078b18
> > > > # EXP-Topic b2-stream
> > > > # Available At https://bitbucket.org/octobus/mercurial-devel/
> > > > #              hg pull https://bitbucket.org/octobus/mercurial-de
> > > > vel/ -r 789f14aef1ca
> > > > streamclone: add support for cloning non append-only file
> > > > +# type of file to stream
> > > > +_fileappend = 0 # append only file
> > > > +_filefull = 1   # full snapshot file
> > > > +
> > > > +# This is it's own function so extensions can override it.
> > > > +def _walkstreamfullstorefiles(repo):
> > > > +    """list snapshot file from the store"""
> > > > +    fnames = []
> > > > +    if not repo.publishing():
> > > > +        fnames.append('phaseroots')
> > > > +    return fnames
> > > > +
> > > > +def _filterfull(entry, copy, vfs):
> > > > +    """actually copy the snapshot files"""
> > > > +    name, ftype, data = entry
> > > > +    if ftype != _filefull:
> > > > +        return entry
> > > > +    return (name, ftype, copy(vfs.join(name)))
> > > > +
> > > > +@contextlib.contextmanager
> > > > +def maketempcopies():
> > > > +    """return a function to temporary copy file"""
> > > > +    files = []
> > > > +    try:
> > > > +        def copy(src):
> > > > +            fd, dst = tempfile.mkstemp()
> > > > +            os.close(fd)
> > > > +            files.append(dst)
> > > > +            util.copyfiles(src, dst, hardlink=True)
> > > 
> > > Better to create a temp file near the repository if we want to take
> > > advantage
> > > of hardlinks. /tmp may be a separate filesystem.
> > 
> > Ah, but hardlink will fail anyway since destination file exists.
> 
> Hardlinks are an option since we create the temporary file just for the
> streaming. We might use a temporary directory inside the 
> repository for this purpose.
> 
> It would be an improvement, but it adds some complexity and requires
> having a clear distinction between files we can hardlink and those we
> can't.
> 
> We think it would be better if we can ship the current version in 4.5,
> so we have the next cycle to improve it properly.

So maybe we should just turning off the hardlink flag for clarity?
It doesn't work if "dst" exists, and the "dst" must be created by mkstemp().

Patch

diff --git a/mercurial/exchange.py b/mercurial/exchange.py
--- a/mercurial/exchange.py
+++ b/mercurial/exchange.py
@@ -1465,6 +1465,7 @@  def _pullbundle2(pullop):
         kwargs['cg'] = False
         kwargs['stream'] = True
         pullop.stepsdone.add('changegroup')
+        pullop.stepsdone.add('phases')
 
     else:
         # pulling changegroup
@@ -1472,15 +1473,15 @@  def _pullbundle2(pullop):
 
         kwargs['cg'] = pullop.fetch
 
-    legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
-    hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
-    if (not legacyphase and hasbinaryphase):
-        kwargs['phases'] = True
-        pullop.stepsdone.add('phases')
+        legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
+        hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
+        if (not legacyphase and hasbinaryphase):
+            kwargs['phases'] = True
+            pullop.stepsdone.add('phases')
 
-    if 'listkeys' in pullop.remotebundle2caps:
-        if 'phases' not in pullop.stepsdone:
-            kwargs['listkeys'] = ['phases']
+        if 'listkeys' in pullop.remotebundle2caps:
+            if 'phases' not in pullop.stepsdone:
+                kwargs['listkeys'] = ['phases']
 
     bookmarksrequested = False
     legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -7,7 +7,10 @@ 
 
 from __future__ import absolute_import
 
+import contextlib
+import os
 import struct
+import tempfile
 
 from .i18n import _
 from . import (
@@ -428,32 +431,77 @@  class streamcloneapplier(object):
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
 
+# type of file to stream
+_fileappend = 0 # append only file
+_filefull = 1   # full snapshot file
+
+# This is it's own function so extensions can override it.
+def _walkstreamfullstorefiles(repo):
+    """list snapshot file from the store"""
+    fnames = []
+    if not repo.publishing():
+        fnames.append('phaseroots')
+    return fnames
+
+def _filterfull(entry, copy, vfs):
+    """actually copy the snapshot files"""
+    name, ftype, data = entry
+    if ftype != _filefull:
+        return entry
+    return (name, ftype, copy(vfs.join(name)))
+
+@contextlib.contextmanager
+def maketempcopies():
+    """return a function to temporary copy file"""
+    files = []
+    try:
+        def copy(src):
+            fd, dst = tempfile.mkstemp()
+            os.close(fd)
+            files.append(dst)
+            util.copyfiles(src, dst, hardlink=True)
+            return dst
+        yield copy
+    finally:
+        for tmp in files:
+            util.tryunlink(tmp)
+
 def _emit(repo, entries, totalfilesize):
     """actually emit the stream bundle"""
+    vfs = repo.svfs
     progress = repo.ui.progress
     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
-    vfs = repo.svfs
-    try:
-        seen = 0
-        for name, size in entries:
-            yield util.uvarintencode(len(name))
-            fp = vfs(name)
-            try:
-                yield util.uvarintencode(size)
-                yield name
-                if size <= 65536:
-                    chunks = (fp.read(size),)
-                else:
-                    chunks = util.filechunkiter(fp, limit=size)
-                for chunk in chunks:
-                    seen += len(chunk)
-                    progress(_('bundle'), seen, total=totalfilesize,
-                             unit=_('bytes'))
-                    yield chunk
-            finally:
-                fp.close()
-    finally:
-        progress(_('bundle'), None)
+    with maketempcopies() as copy:
+        try:
+            # copy is delayed until we are in the try
+            entries = [_filterfull(e, copy, vfs) for e in entries]
+            yield None # this release the lock on the repository
+            seen = 0
+
+            for name, ftype, data in entries:
+                yield util.uvarintencode(len(name))
+                if ftype == _fileappend:
+                    fp = vfs(name)
+                    size = data
+                elif ftype == _filefull:
+                    fp = open(data, 'rb')
+                    size = util.fstat(fp).st_size
+                try:
+                    yield util.uvarintencode(size)
+                    yield name
+                    if size <= 65536:
+                        chunks = (fp.read(size),)
+                    else:
+                        chunks = util.filechunkiter(fp, limit=size)
+                    for chunk in chunks:
+                        seen += len(chunk)
+                        progress(_('bundle'), seen, total=totalfilesize,
+                                 unit=_('bytes'))
+                        yield chunk
+                finally:
+                    fp.close()
+        finally:
+            progress(_('bundle'), None)
 
 def generatev2(repo):
     """Emit content for version 2 of a streaming clone.
@@ -475,10 +523,16 @@  def generatev2(repo):
         repo.ui.debug('scanning\n')
         for name, ename, size in _walkstreamfiles(repo):
             if size:
-                entries.append((name, size))
+                entries.append((name, _fileappend, size))
                 totalfilesize += size
+        for name in _walkstreamfullstorefiles(repo):
+            if repo.svfs.exists(name):
+                totalfilesize += repo.svfs.lstat(name).st_size
+                entries.append((name, _filefull, None))
 
         chunks = _emit(repo, entries, totalfilesize)
+        first = next(chunks)
+        assert first is None
 
     return len(entries), totalfilesize, chunks
 
diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t
--- a/tests/test-clone-uncompressed.t
+++ b/tests/test-clone-uncompressed.t
@@ -100,9 +100,7 @@  Clone with background file closing enabl
   transferred 96.3 KB in * seconds (* */sec) (glob)
   bundle2-input-part: total payload size 110887
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
-  bundle2-input-part: "phase-heads" supported
-  bundle2-input-part: total payload size 24
-  bundle2-input-bundle: 2 parts total
+  bundle2-input-bundle: 1 parts total
   checking for updated bookmarks
 #endif
 
@@ -320,13 +318,13 @@  Clone as non publishing
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1028 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
-  0: public
-  1: public
+  0: draft
+  1: draft
 #endif
 
   $ killdaemons.py