Patchwork D2470: wireproto: allow direct stream processing for unbundle

login
register
mail settings
Submitter phabricator
Date Feb. 27, 2018, 1:39 a.m.
Message ID <differential-rev-PHID-DREV-souq2ocdtfjnwnw7o57h-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/28435/
State Superseded
Headers show

Comments

phabricator - Feb. 27, 2018, 1:39 a.m.
joerg.sonnenberger created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Introduce a new option server.streamunbundle which starts a transaction
  immediately to apply a bundle instead of writing it to a temporary file
  first. This side steps the need for a large tmp directory at the cost of
  preventing concurrent pushes. This is a reasonable trade-off for many
  setups as concurrent pushes for the main branch at least are disallowed
  anyway. The option defaults to off to preserve existing behavior.
  
  Change the wireproto interface to provide a generator for reading the
  payload and make callers responsible for consuming all data.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2470

AFFECTED FILES
  hgext/largefiles/proto.py
  mercurial/configitems.py
  mercurial/help/config.txt
  mercurial/wireproto.py
  mercurial/wireprotoserver.py
  mercurial/wireprototypes.py

CHANGE DETAILS




To: joerg.sonnenberger, #hg-reviewers
Cc: mercurial-devel
phabricator - Feb. 27, 2018, 10:56 a.m.
lothiraldan added inline comments.

INLINE COMMENTS

> wireproto.py:980
> +                    def cleanup():
> +                        for p in payload:
> +                            pass

It's not perfectly clear why do we need to do this loop? Is it to consume the end of the payload before ending? Would it be possible to add a comment here?

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2470

To: joerg.sonnenberger, #hg-reviewers
Cc: lothiraldan, mercurial-devel
phabricator - April 6, 2018, 11:07 p.m.
indygreg accepted this revision.
indygreg added a comment.
This revision is now accepted and ready to land.


  LGTM. Thanks for the feature!

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2470

To: joerg.sonnenberger, #hg-reviewers, lothiraldan, indygreg
Cc: indygreg, lothiraldan, mercurial-devel

Patch

diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -92,10 +92,11 @@ 
         returns a list of values (same order as <args>)"""
 
     @abc.abstractmethod
-    def forwardpayload(self, fp):
-        """Read the raw payload and forward to a file.
+    def getpayload(self):
+        """Provide a generator for the raw payload.
 
-        The payload is read in full before the function returns.
+        The caller is responsible for ensuring that the full payload is
+        processed.
         """
 
     @abc.abstractmethod
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -91,16 +91,15 @@ 
         args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
         return args
 
-    def forwardpayload(self, fp):
+    def getpayload(self):
         if r'HTTP_CONTENT_LENGTH' in self._req.env:
             length = int(self._req.env[r'HTTP_CONTENT_LENGTH'])
         else:
             length = int(self._req.env[r'CONTENT_LENGTH'])
         # If httppostargs is used, we need to read Content-Length
         # minus the amount that was consumed by args.
         length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
-        for s in util.filechunkiter(self._req, limit=length):
-            fp.write(s)
+        return util.filechunkiter(self._req, limit=length)
 
     @contextlib.contextmanager
     def mayberedirectstdio(self):
@@ -346,7 +345,7 @@ 
                 data[arg] = val
         return [data[k] for k in keys]
 
-    def forwardpayload(self, fpout):
+    def getpayload(self):
         # We initially send an empty response. This tells the client it is
         # OK to start sending data. If a client sees any other response, it
         # interprets it as an error.
@@ -359,7 +358,7 @@ 
         # 0\n
         count = int(self._fin.readline())
         while count:
-            fpout.write(self._fin.read(count))
+            yield self._fin.read(count)
             count = int(self._fin.readline())
 
     @contextlib.contextmanager
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -972,14 +972,29 @@ 
     with proto.mayberedirectstdio() as output:
         try:
             exchange.check_heads(repo, their_heads, 'preparing changes')
+            cleanup = lambda: None
+            try:
+                payload = proto.getpayload()
+                if repo.ui.configbool('server', 'streamunbundle'):
+                    def cleanup():
+                        for p in payload:
+                            pass
+                    fp = util.chunkbuffer(payload)
+                else:
+                    # write bundle data to temporary file because it can be big
+                    fp, tempname = None, None
+                    def cleanup():
+                        if fp:
+                            fp.close()
+                        if tempname:
+                            os.unlink(tempname)
+                    fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+                    fp = os.fdopen(fd, pycompat.sysstr('wb+'))
+                    r = 0
+                    for p in payload:
+                        fp.write(p)
+                    fp.seek(0)
 
-            # write bundle data to temporary file because it can be big
-            fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
-            fp = os.fdopen(fd, pycompat.sysstr('wb+'))
-            r = 0
-            try:
-                proto.forwardpayload(fp)
-                fp.seek(0)
                 gen = exchange.readbundle(repo.ui, fp, None)
                 if (isinstance(gen, changegroupmod.cg1unpacker)
                     and not bundle1allowed(repo, 'push')):
@@ -1001,8 +1016,7 @@ 
                 return pushres(r, output.getvalue() if output else '')
 
             finally:
-                fp.close()
-                os.unlink(tempname)
+                cleanup()
 
         except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
             # handle non-bundle2 case first
diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt
--- a/mercurial/help/config.txt
+++ b/mercurial/help/config.txt
@@ -1792,6 +1792,11 @@ 
     are highly recommended. Partial clones will still be allowed.
     (default: False)
 
+``streamunbundle``
+    When set, servers will apply data sent from the client directly,
+    otherwise it will be written to a temporary file first. This option
+    effectively prevents concurrent pushes.
+
 ``concurrent-push-mode``
     Level of allowed race condition between two pushing clients.
 
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -902,6 +902,9 @@ 
 coreconfigitem('server', 'disablefullbundle',
     default=False,
 )
+coreconfigitem('server', 'streamunbundle',
+    default=False,
+)
 coreconfigitem('server', 'maxhttpheaderlen',
     default=1024,
 )
diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py
--- a/hgext/largefiles/proto.py
+++ b/hgext/largefiles/proto.py
@@ -41,7 +41,8 @@ 
         tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
 
         try:
-            proto.forwardpayload(tmpfp)
+            for p in proto.getpayload():
+                tmpfp.write(p)
             tmpfp._fp.seek(0)
             if sha != lfutil.hexsha1(tmpfp._fp):
                 raise IOError(0, _('largefile contents do not match hash'))