Patchwork [4,of,5,modernize-streamclone] streamclone: move code out of exchange.py

login
register
mail settings
Submitter Gregory Szorc
Date Oct. 2, 2015, 11:34 p.m.
Message ID <3b427581580d347b861b.1443828844@gps-mbp.local>
Download mbox | patch
Permalink /patch/10751/
State Accepted
Headers show

Comments

Gregory Szorc - Oct. 2, 2015, 11:34 p.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1443827152 25200
#      Fri Oct 02 16:05:52 2015 -0700
# Node ID 3b427581580d347b861b99a6cfef37d56d5a3808
# Parent  dcafb3b5c5aeda449f1491c08a3050b5f6b5caea
streamclone: move code out of exchange.py

We bulk move functions from exchange.py related to streaming clones.

Function names were renamed slightly to drop a component redundant with
the module name. Docstrings and comments referencing old names and
locations were updated accordingly.

Patch

diff --git a/mercurial/exchange.py b/mercurial/exchange.py
--- a/mercurial/exchange.py
+++ b/mercurial/exchange.py
@@ -4,13 +4,12 @@ 
 #
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-import time
 from i18n import _
 from node import hex, nullid
 import errno, urllib
-import util, scmutil, changegroup, base85, error, store
+import util, scmutil, changegroup, base85, error
 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
 import lock as lockmod
 import tags
 
@@ -1458,132 +1457,4 @@  def unbundle(repo, cg, heads, source, ur
         lockmod.release(tr, lock, wlock)
         if recordout is not None:
             recordout(repo.ui.popbuffer())
     return r
-
-# This is it's own function so extensions can override it.
-def _walkstreamfiles(repo):
-    return repo.store.walk()
-
-def generatestreamclone(repo):
-    """Emit content for a streaming clone.
-
-    This is a generator of raw chunks that constitute a streaming clone.
-
-    The stream begins with a line of 2 space-delimited integers containing the
-    number of entries and total bytes size.
-
-    Next, are N entries for each file being transferred. Each file entry starts
-    as a line with the file name and integer size delimited by a null byte.
-    The raw file data follows. Following the raw file data is the next file
-    entry, or EOF.
-
-    When used on the wire protocol, an additional line indicating protocol
-    success will be prepended to the stream. This function is not responsible
-    for adding it.
-
-    This function will obtain a repository lock to ensure a consistent view of
-    the store is captured. It therefore may raise LockError.
-    """
-    entries = []
-    total_bytes = 0
-    # Get consistent snapshot of repo, lock during scan.
-    lock = repo.lock()
-    try:
-        repo.ui.debug('scanning\n')
-        for name, ename, size in _walkstreamfiles(repo):
-            if size:
-                entries.append((name, size))
-                total_bytes += size
-    finally:
-            lock.release()
-
-    repo.ui.debug('%d files, %d bytes to transfer\n' %
-                  (len(entries), total_bytes))
-    yield '%d %d\n' % (len(entries), total_bytes)
-
-    svfs = repo.svfs
-    oldaudit = svfs.mustaudit
-    debugflag = repo.ui.debugflag
-    svfs.mustaudit = False
-
-    try:
-        for name, size in entries:
-            if debugflag:
-                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
-            # partially encode name over the wire for backwards compat
-            yield '%s\0%d\n' % (store.encodedir(name), size)
-            if size <= 65536:
-                fp = svfs(name)
-                try:
-                    data = fp.read(size)
-                finally:
-                    fp.close()
-                yield data
-            else:
-                for chunk in util.filechunkiter(svfs(name), limit=size):
-                    yield chunk
-    finally:
-        svfs.mustaudit = oldaudit
-
-def consumestreamclone(repo, fp):
-    """Apply the contents from a streaming clone file.
-
-    This takes the output from "streamout" and applies it to the specified
-    repository.
-
-    Like "streamout," the status line added by the wire protocol is not handled
-    by this function.
-    """
-    lock = repo.lock()
-    try:
-        repo.ui.status(_('streaming all changes\n'))
-        l = fp.readline()
-        try:
-            total_files, total_bytes = map(int, l.split(' ', 1))
-        except (ValueError, TypeError):
-            raise error.ResponseError(
-                _('unexpected response from remote server:'), l)
-        repo.ui.status(_('%d files to transfer, %s of data\n') %
-                       (total_files, util.bytecount(total_bytes)))
-        handled_bytes = 0
-        repo.ui.progress(_('clone'), 0, total=total_bytes)
-        start = time.time()
-
-        tr = repo.transaction(_('clone'))
-        try:
-            for i in xrange(total_files):
-                # XXX doesn't support '\n' or '\r' in filenames
-                l = fp.readline()
-                try:
-                    name, size = l.split('\0', 1)
-                    size = int(size)
-                except (ValueError, TypeError):
-                    raise error.ResponseError(
-                        _('unexpected response from remote server:'), l)
-                if repo.ui.debugflag:
-                    repo.ui.debug('adding %s (%s)\n' %
-                                  (name, util.bytecount(size)))
-                # for backwards compat, name was partially encoded
-                ofp = repo.svfs(store.decodedir(name), 'w')
-                for chunk in util.filechunkiter(fp, limit=size):
-                    handled_bytes += len(chunk)
-                    repo.ui.progress(_('clone'), handled_bytes,
-                                     total=total_bytes)
-                    ofp.write(chunk)
-                ofp.close()
-            tr.close()
-        finally:
-            tr.release()
-
-        # Writing straight to files circumvented the inmemory caches
-        repo.invalidate()
-
-        elapsed = time.time() - start
-        if elapsed <= 0:
-            elapsed = 0.001
-        repo.ui.progress(_('clone'), None)
-        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
-                       (util.bytecount(total_bytes), elapsed,
-                        util.bytecount(total_bytes / elapsed)))
-    finally:
-        lock.release()
diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -6,16 +6,146 @@ 
 # GNU General Public License version 2 or any later version.
 
 from __future__ import absolute_import
 
+import time
+
 from .i18n import _
 from . import (
     branchmap,
     error,
-    exchange,
+    store,
     util,
 )
 
+# This is it's own function so extensions can override it.
+def _walkstreamfiles(repo):
+    return repo.store.walk()
+
+def generatev1(repo):
+    """Emit content for version 1 of a streaming clone.
+
+    This is a generator of raw chunks that constitute a streaming clone.
+
+    The stream begins with a line of 2 space-delimited integers containing the
+    number of entries and total bytes size.
+
+    Next, are N entries for each file being transferred. Each file entry starts
+    as a line with the file name and integer size delimited by a null byte.
+    The raw file data follows. Following the raw file data is the next file
+    entry, or EOF.
+
+    When used on the wire protocol, an additional line indicating protocol
+    success will be prepended to the stream. This function is not responsible
+    for adding it.
+
+    This function will obtain a repository lock to ensure a consistent view of
+    the store is captured. It therefore may raise LockError.
+    """
+    entries = []
+    total_bytes = 0
+    # Get consistent snapshot of repo, lock during scan.
+    lock = repo.lock()
+    try:
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                total_bytes += size
+    finally:
+            lock.release()
+
+    repo.ui.debug('%d files, %d bytes to transfer\n' %
+                  (len(entries), total_bytes))
+    yield '%d %d\n' % (len(entries), total_bytes)
+
+    svfs = repo.svfs
+    oldaudit = svfs.mustaudit
+    debugflag = repo.ui.debugflag
+    svfs.mustaudit = False
+
+    try:
+        for name, size in entries:
+            if debugflag:
+                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
+            # partially encode name over the wire for backwards compat
+            yield '%s\0%d\n' % (store.encodedir(name), size)
+            if size <= 65536:
+                fp = svfs(name)
+                try:
+                    data = fp.read(size)
+                finally:
+                    fp.close()
+                yield data
+            else:
+                for chunk in util.filechunkiter(svfs(name), limit=size):
+                    yield chunk
+    finally:
+        svfs.mustaudit = oldaudit
+
+def consumev1(repo, fp):
+    """Apply the contents from version 1 of a streaming clone file handle.
+
+    This takes the output from "streamout" and applies it to the specified
+    repository.
+
+    Like "streamout," the status line added by the wire protocol is not handled
+    by this function.
+    """
+    lock = repo.lock()
+    try:
+        repo.ui.status(_('streaming all changes\n'))
+        l = fp.readline()
+        try:
+            total_files, total_bytes = map(int, l.split(' ', 1))
+        except (ValueError, TypeError):
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (total_files, util.bytecount(total_bytes)))
+        handled_bytes = 0
+        repo.ui.progress(_('clone'), 0, total=total_bytes)
+        start = time.time()
+
+        tr = repo.transaction(_('clone'))
+        try:
+            for i in xrange(total_files):
+                # XXX doesn't support '\n' or '\r' in filenames
+                l = fp.readline()
+                try:
+                    name, size = l.split('\0', 1)
+                    size = int(size)
+                except (ValueError, TypeError):
+                    raise error.ResponseError(
+                        _('unexpected response from remote server:'), l)
+                if repo.ui.debugflag:
+                    repo.ui.debug('adding %s (%s)\n' %
+                                  (name, util.bytecount(size)))
+                # for backwards compat, name was partially encoded
+                ofp = repo.svfs(store.decodedir(name), 'w')
+                for chunk in util.filechunkiter(fp, limit=size):
+                    handled_bytes += len(chunk)
+                    repo.ui.progress(_('clone'), handled_bytes,
+                                     total=total_bytes)
+                    ofp.write(chunk)
+                ofp.close()
+            tr.close()
+        finally:
+            tr.release()
+
+        # Writing straight to files circumvented the inmemory caches
+        repo.invalidate()
+
+        elapsed = time.time() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        repo.ui.progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(total_bytes), elapsed,
+                        util.bytecount(total_bytes / elapsed)))
+    finally:
+        lock.release()
+
 def streamin(repo, remote, remotereqs):
     # Save remote branchmap. We will use it later
     # to speed up branchcache creation
     rbranchmap = None
@@ -45,13 +175,13 @@  def applyremotedata(repo, remotereqs, re
     "remotereqs" is a set of requirements to handle the incoming data.
     "remotebranchmap" is the result of a branchmap lookup on the remote. It
     can be None.
     "fp" is a file object containing the raw stream data, suitable for
-    feeding into exchange.consumestreamclone.
+    feeding into consumev1().
     """
     lock = repo.lock()
     try:
-        exchange.consumestreamclone(repo, fp)
+        consumev1(repo, fp)
 
         # new requirements = old non-format requirements +
         #                    new format-related remote requirements
         # requirements from the streamed-in repository
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -25,8 +25,9 @@  from . import (
     error,
     exchange,
     peer,
     pushkey as pushkeymod,
+    streamclone,
     util,
 )
 
 class abstractserverproto(object):
@@ -719,9 +720,9 @@  def stream(repo, proto):
 
     try:
         # LockError may be raised before the first result is yielded. Don't
         # emit output until we're sure we got the lock successfully.
-        it = exchange.generatestreamclone(repo)
+        it = streamclone.generatev1(repo)
         return streamres(getstream(it))
     except error.LockError:
         return '2\n'