Submitter | Boris Feld |
---|---|
Date | Jan. 19, 2018, 11:47 p.m. |
Message ID | <789f14aef1ca0b39e61a.1516405637@FB> |
Download | mbox | patch |
Permalink | /patch/26996/ |
State | Accepted |
Headers | show |
Comments
On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote: > # HG changeset patch > # User Boris Feld <boris.feld@octobus.net> > # Date 1516233002 -3600 > # Thu Jan 18 00:50:02 2018 +0100 > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53 > # Parent d8a918033dcfd3dcbac8635616cc1b6c12078b18 > # EXP-Topic b2-stream > # Available At https://bitbucket.org/octobus/mercurial-devel/ > # hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 789f14aef1ca > streamclone: add support for cloning non append-only file > +# type of file to stream > +_fileappend = 0 # append only file > +_filefull = 1 # full snapshot file > + > +# This is it's own function so extensions can override it. > +def _walkstreamfullstorefiles(repo): > + """list snapshot file from the store""" > + fnames = [] > + if not repo.publishing(): > + fnames.append('phaseroots') > + return fnames > + > +def _filterfull(entry, copy, vfs): > + """actually copy the snapshot files""" > + name, ftype, data = entry > + if ftype != _filefull: > + return entry > + return (name, ftype, copy(vfs.join(name))) > + > +@contextlib.contextmanager > +def maketempcopies(): > + """return a function to temporary copy file""" > + files = [] > + try: > + def copy(src): > + fd, dst = tempfile.mkstemp() > + os.close(fd) > + files.append(dst) > + util.copyfiles(src, dst, hardlink=True) Better to create a temp file near the repository if we want to take advantage of hardlinks. /tmp may be a separate filesystem. > + with maketempcopies() as copy: > + try: > + # copy is delayed until we are in the try > + entries = [_filterfull(e, copy, vfs) for e in entries] I'm not sure if this is better or worse, but maybe we can hold open file handles (instead of copying them) given "full" files are atomically replaced on write.
On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote: > On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote: > > # HG changeset patch > > # User Boris Feld <boris.feld@octobus.net> > > # Date 1516233002 -3600 > > # Thu Jan 18 00:50:02 2018 +0100 > > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53 > > # Parent d8a918033dcfd3dcbac8635616cc1b6c12078b18 > > # EXP-Topic b2-stream > > # Available At https://bitbucket.org/octobus/mercurial-devel/ > > # hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 789f14aef1ca > > streamclone: add support for cloning non append-only file > > > +# type of file to stream > > +_fileappend = 0 # append only file > > +_filefull = 1 # full snapshot file > > + > > +# This is it's own function so extensions can override it. > > +def _walkstreamfullstorefiles(repo): > > + """list snapshot file from the store""" > > + fnames = [] > > + if not repo.publishing(): > > + fnames.append('phaseroots') > > + return fnames > > + > > +def _filterfull(entry, copy, vfs): > > + """actually copy the snapshot files""" > > + name, ftype, data = entry > > + if ftype != _filefull: > > + return entry > > + return (name, ftype, copy(vfs.join(name))) > > + > > +@contextlib.contextmanager > > +def maketempcopies(): > > + """return a function to temporary copy file""" > > + files = [] > > + try: > > + def copy(src): > > + fd, dst = tempfile.mkstemp() > > + os.close(fd) > > + files.append(dst) > > + util.copyfiles(src, dst, hardlink=True) > > Better to create a temp file near the repository if we want to take advantage > of hardlinks. /tmp may be a separate filesystem. Ah, but hardlink will fail anyway since destination file exists.
On Sat, 2018-01-20 at 19:53 +0900, Yuya Nishihara wrote: > On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote: > > On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote: > > > # HG changeset patch > > > # User Boris Feld <boris.feld@octobus.net> > > > # Date 1516233002 -3600 > > > # Thu Jan 18 00:50:02 2018 +0100 > > > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53 > > > # Parent d8a918033dcfd3dcbac8635616cc1b6c12078b18 > > > # EXP-Topic b2-stream > > > # Available At https://bitbucket.org/octobus/mercurial-devel/ > > > # hg pull https://bitbucket.org/octobus/mercurial-de > > > vel/ -r 789f14aef1ca > > > streamclone: add support for cloning non append-only file > > > +# type of file to stream > > > +_fileappend = 0 # append only file > > > +_filefull = 1 # full snapshot file > > > + > > > +# This is it's own function so extensions can override it. > > > +def _walkstreamfullstorefiles(repo): > > > + """list snapshot file from the store""" > > > + fnames = [] > > > + if not repo.publishing(): > > > + fnames.append('phaseroots') > > > + return fnames > > > + > > > +def _filterfull(entry, copy, vfs): > > > + """actually copy the snapshot files""" > > > + name, ftype, data = entry > > > + if ftype != _filefull: > > > + return entry > > > + return (name, ftype, copy(vfs.join(name))) > > > + > > > +@contextlib.contextmanager > > > +def maketempcopies(): > > > + """return a function to temporary copy file""" > > > + files = [] > > > + try: > > > + def copy(src): > > > + fd, dst = tempfile.mkstemp() > > > + os.close(fd) > > > + files.append(dst) > > > + util.copyfiles(src, dst, hardlink=True) > > > > Better to create a temp file near the repository if we want to take > > advantage > > of hardlinks. /tmp may be a separate filesystem. > > Ah, but hardlink will fail anyway since destination file exists. Hardlinks are an option since we create the temporary file just for the streaming. We might use a temporary directory inside the repository for this purpose. It would be an improvement, but it adds some complexity and requires having a clear distinction between files we can hardlink and those we can't. We think it would be better if we can ship the current version in 4.5, so we have the next cycle to improve it properly. Thank you for the feedback.
On Thu, 25 Jan 2018 09:35:38 +0100, Boris Feld wrote: > On Sat, 2018-01-20 at 19:53 +0900, Yuya Nishihara wrote: > > On Sat, 20 Jan 2018 19:41:23 +0900, Yuya Nishihara wrote: > > > On Sat, 20 Jan 2018 00:47:17 +0100, Boris Feld wrote: > > > > # HG changeset patch > > > > # User Boris Feld <boris.feld@octobus.net> > > > > # Date 1516233002 -3600 > > > > # Thu Jan 18 00:50:02 2018 +0100 > > > > # Node ID 789f14aef1ca0b39e61a47d8989c18cdc6015b53 > > > > # Parent d8a918033dcfd3dcbac8635616cc1b6c12078b18 > > > > # EXP-Topic b2-stream > > > > # Available At https://bitbucket.org/octobus/mercurial-devel/ > > > > # hg pull https://bitbucket.org/octobus/mercurial-de > > > > vel/ -r 789f14aef1ca > > > > streamclone: add support for cloning non append-only file > > > > +# type of file to stream > > > > +_fileappend = 0 # append only file > > > > +_filefull = 1 # full snapshot file > > > > + > > > > +# This is it's own function so extensions can override it. > > > > +def _walkstreamfullstorefiles(repo): > > > > + """list snapshot file from the store""" > > > > + fnames = [] > > > > + if not repo.publishing(): > > > > + fnames.append('phaseroots') > > > > + return fnames > > > > + > > > > +def _filterfull(entry, copy, vfs): > > > > + """actually copy the snapshot files""" > > > > + name, ftype, data = entry > > > > + if ftype != _filefull: > > > > + return entry > > > > + return (name, ftype, copy(vfs.join(name))) > > > > + > > > > +@contextlib.contextmanager > > > > +def maketempcopies(): > > > > + """return a function to temporary copy file""" > > > > + files = [] > > > > + try: > > > > + def copy(src): > > > > + fd, dst = tempfile.mkstemp() > > > > + os.close(fd) > > > > + files.append(dst) > > > > + util.copyfiles(src, dst, hardlink=True) > > > > > > Better to create a temp file near the repository if we want to take > > > advantage > > > of hardlinks. /tmp may be a separate filesystem. > > > > Ah, but hardlink will fail anyway since destination file exists. > > Hardlinks are an option since we create the temporary file just for the > streaming. We might use a temporary directory inside the > repository for this purpose. > > It would be an improvement, but it adds some complexity and requires > having a clear distinction between files we can hardlink and those we > can't. > > We think it would be better if we can ship the current version in 4.5, > so we have the next cycle to improve it properly. So maybe we should just turning off the hardlink flag for clarity? It doesn't work if "dst" exists, and the "dst" must be created by mkstemp().
Patch
diff --git a/mercurial/exchange.py b/mercurial/exchange.py --- a/mercurial/exchange.py +++ b/mercurial/exchange.py @@ -1465,6 +1465,7 @@ def _pullbundle2(pullop): kwargs['cg'] = False kwargs['stream'] = True pullop.stepsdone.add('changegroup') + pullop.stepsdone.add('phases') else: # pulling changegroup @@ -1472,15 +1473,15 @@ def _pullbundle2(pullop): kwargs['cg'] = pullop.fetch - legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange') - hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ()) - if (not legacyphase and hasbinaryphase): - kwargs['phases'] = True - pullop.stepsdone.add('phases') + legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange') + hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ()) + if (not legacyphase and hasbinaryphase): + kwargs['phases'] = True + pullop.stepsdone.add('phases') - if 'listkeys' in pullop.remotebundle2caps: - if 'phases' not in pullop.stepsdone: - kwargs['listkeys'] = ['phases'] + if 'listkeys' in pullop.remotebundle2caps: + if 'phases' not in pullop.stepsdone: + kwargs['listkeys'] = ['phases'] bookmarksrequested = False legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange') diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -7,7 +7,10 @@ from __future__ import absolute_import +import contextlib +import os import struct +import tempfile from .i18n import _ from . import ( @@ -428,32 +431,77 @@ class streamcloneapplier(object): def apply(self, repo): return applybundlev1(repo, self._fh) +# type of file to stream +_fileappend = 0 # append only file +_filefull = 1 # full snapshot file + +# This is it's own function so extensions can override it. +def _walkstreamfullstorefiles(repo): + """list snapshot file from the store""" + fnames = [] + if not repo.publishing(): + fnames.append('phaseroots') + return fnames + +def _filterfull(entry, copy, vfs): + """actually copy the snapshot files""" + name, ftype, data = entry + if ftype != _filefull: + return entry + return (name, ftype, copy(vfs.join(name))) + +@contextlib.contextmanager +def maketempcopies(): + """return a function to temporary copy file""" + files = [] + try: + def copy(src): + fd, dst = tempfile.mkstemp() + os.close(fd) + files.append(dst) + util.copyfiles(src, dst, hardlink=True) + return dst + yield copy + finally: + for tmp in files: + util.tryunlink(tmp) + def _emit(repo, entries, totalfilesize): """actually emit the stream bundle""" + vfs = repo.svfs progress = repo.ui.progress progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) - vfs = repo.svfs - try: - seen = 0 - for name, size in entries: - yield util.uvarintencode(len(name)) - fp = vfs(name) - try: - yield util.uvarintencode(size) - yield name - if size <= 65536: - chunks = (fp.read(size),) - else: - chunks = util.filechunkiter(fp, limit=size) - for chunk in chunks: - seen += len(chunk) - progress(_('bundle'), seen, total=totalfilesize, - unit=_('bytes')) - yield chunk - finally: - fp.close() - finally: - progress(_('bundle'), None) + with maketempcopies() as copy: + try: + # copy is delayed until we are in the try + entries = [_filterfull(e, copy, vfs) for e in entries] + yield None # this release the lock on the repository + seen = 0 + + for name, ftype, data in entries: + yield util.uvarintencode(len(name)) + if ftype == _fileappend: + fp = vfs(name) + size = data + elif ftype == _filefull: + fp = open(data, 'rb') + size = util.fstat(fp).st_size + try: + yield util.uvarintencode(size) + yield name + if size <= 65536: + chunks = (fp.read(size),) + else: + chunks = util.filechunkiter(fp, limit=size) + for chunk in chunks: + seen += len(chunk) + progress(_('bundle'), seen, total=totalfilesize, + unit=_('bytes')) + yield chunk + finally: + fp.close() + finally: + progress(_('bundle'), None) def generatev2(repo): """Emit content for version 2 of a streaming clone. @@ -475,10 +523,16 @@ def generatev2(repo): repo.ui.debug('scanning\n') for name, ename, size in _walkstreamfiles(repo): if size: - entries.append((name, size)) + entries.append((name, _fileappend, size)) totalfilesize += size + for name in _walkstreamfullstorefiles(repo): + if repo.svfs.exists(name): + totalfilesize += repo.svfs.lstat(name).st_size + entries.append((name, _filefull, None)) chunks = _emit(repo, entries, totalfilesize) + first = next(chunks) + assert first is None return len(entries), totalfilesize, chunks diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t --- a/tests/test-clone-uncompressed.t +++ b/tests/test-clone-uncompressed.t @@ -100,9 +100,7 @@ Clone with background file closing enabl transferred 96.3 KB in * seconds (* */sec) (glob) bundle2-input-part: total payload size 110887 bundle2-input-part: "listkeys" (params: 1 mandatory) supported - bundle2-input-part: "phase-heads" supported - bundle2-input-part: total payload size 24 - bundle2-input-bundle: 2 parts total + bundle2-input-bundle: 1 parts total checking for updated bookmarks #endif @@ -320,13 +318,13 @@ Clone as non publishing #if stream-bundle2 $ hg clone --stream http://localhost:$HGPORT phase-no-publish streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1028 files to transfer, 96.4 KB of data + transferred 96.4 KB in * seconds (* */sec) (glob) updating to branch default 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved $ hg -R phase-no-publish phase -r 'all()' - 0: public - 1: public + 0: draft + 1: draft #endif $ killdaemons.py