Patchwork [7,of,8] bundle2: lazy unbundle of part payload

login
register
mail settings
Submitter Pierre-Yves David
Date April 12, 2014, 10:08 p.m.
Message ID <192cace034eaafb08e53.1397340526@marginatus.alto.octopoid.net>
Download mbox | patch
Permalink /patch/4309/
State Accepted
Headers show

Comments

Pierre-Yves David - April 12, 2014, 10:08 p.m.
# HG changeset patch
# User Pierre-Yves David <pierre-yves.david@fb.com>
# Date 1397246722 14400
#      Fri Apr 11 16:05:22 2014 -0400
# Node ID 192cace034eaafb08e536afd113f0b10ff0cf0e8
# Parent  01a7df5372c9498424670dfe049fa69af0b1a905
bundle2: lazy unbundle of part payload

The `unbundle` part gain a `read` method to retrieve content of the payload.
This method behave as a python file like read method.

The process bundle code is updated to make sure a part is fully consumed before
before another one is extracted.

Test output change because the debug output is even more interleaved now.

Patch

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -286,10 +286,11 @@  def processbundle(repo, unbundler, trans
     # todo:
     # - replace this is a init function soon.
     # - exception catching
     unbundler.params
     iterparts = iter(unbundler)
+    part = None
     try:
         for part in iterparts:
             parttype = part.type
             # part key are matched lower case
             key = parttype.lower()
@@ -300,22 +301,27 @@  def processbundle(repo, unbundler, trans
                 if key != parttype: # mandatory parts
                     # todo:
                     # - use a more precise exception
                     raise
                 op.ui.debug('ignoring unknown advisory part %r\n' % key)
-                # todo:
-                # - consume the part once we use streaming
+                # consuming the part
+                part.read()
                 continue
 
             # handler is called outside the above try block so that we don't
             # risk catching KeyErrors from anything other than the
             # parthandlermapping lookup (any KeyError raised by handler()
             # itself represents a defect of a different variety).
             handler(op, part)
+            part.read()
     except Exception:
+        if part is not None:
+            # consume the bundle content
+            part.read()
         for part in iterparts:
-            pass # consume the bundle content
+            # consume the bundle content
+            part.read()
         raise
     return op
 
 class bundle20(object):
     """represent an outgoing bundle2 container
@@ -542,36 +548,36 @@  class unbundlepart(unpackermixing):
         super(unbundlepart, self).__init__(fp)
         self.ui = ui
         # unbundle state attr
         self._headerdata = header
         self._headeroffset = 0
+        self._initialized = False
+        self.consumed = False
         # part data
         self.id = None
         self.type = None
         self.mandatoryparams = None
         self.advisoryparams = None
-        self.data = None
-        self._readdata()
+        self._payloadstream = None
+        self._readheader()
 
     def _fromheader(self, size):
         """return the next <size> byte from the header"""
         offset = self._headeroffset
         data = self._headerdata[offset:(offset + size)]
-        self._headeroffset += size
+        self._headeroffset = offset + size
         return data
 
     def _unpackheader(self, format):
         """read given format from header
 
         This automatically compute the size of the format to read."""
         data = self._fromheader(struct.calcsize(format))
         return _unpack(format, data)
 
-    def _readdata(self):
+    def _readheader(self):
         """read the header and setup the object"""
-        # some utility to help reading from the header block
-
         typesize = self._unpackheader(_fparttypesize)[0]
         self.type = self._fromheader(typesize)
         self.ui.debug('part type: "%s"\n' % self.type)
         self.id = self._unpackheader(_fpartid)[0]
         self.ui.debug('part id: "%s"\n' % self.id)
@@ -595,18 +601,33 @@  class unbundlepart(unpackermixing):
         for key, value in advsizes:
             advparams.append((self._fromheader(key), self._fromheader(value)))
         self.mandatoryparams = manparams
         self.advisoryparams  = advparams
         ## part payload
-        payload = []
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        while payloadsize:
-            payload.append(self._readexact(payloadsize))
+        def payloadchunks():
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        self.data = ''.join(payload)
+            while payloadsize:
+                yield self._readexact(payloadsize)
+                payloadsize = self._unpack(_fpayloadsize)[0]
+                self.ui.debug('payload chunk size: %i\n' % payloadsize)
+        self._payloadstream = util.chunkbuffer(payloadchunks())
+        # we read the data, tell it
+        self._initialized = True
+
+    def read(self, size=None):
+        """read payload data"""
+        if not self._initialized:
+            self._readheader()
+        if size is None:
+            data = self._payloadstream.read()
+        else:
+            data = self._payloadstream.read(size)
+        if size is None or len(data) < size:
+            self.consumed = True
+        return data
+
 
 @parthandler('changegroup')
 def handlechangegroup(op, inpart):
     """apply a changegroup part on the repo
 
@@ -617,11 +638,11 @@  def handlechangegroup(op, inpart):
     #
     # The addchangegroup function will get a transaction object by itself, but
     # we need to make sure we trigger the creation of a transaction object used
     # for the whole processing scope.
     op.gettransaction()
-    data = StringIO.StringIO(inpart.data)
+    data = StringIO.StringIO(inpart.read())
     data.seek(0)
     cg = changegroup.readbundle(data, 'bundle2part')
     ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
     op.records.add('changegroup', {'return': ret})
     if op.reply is not None:
@@ -629,10 +650,11 @@  def handlechangegroup(op, inpart):
         # return. But one need to start somewhere.
         part = bundlepart('reply:changegroup', (),
                            [('in-reply-to', str(inpart.id)),
                             ('return', '%i' % ret)])
         op.reply.addpart(part)
+    assert not inpart.read()
 
 @parthandler('reply:changegroup')
 def handlechangegroup(op, inpart):
     p = dict(inpart.advisoryparams)
     ret = int(p['return'])
diff --git a/tests/test-bundle2.t b/tests/test-bundle2.t
--- a/tests/test-bundle2.t
+++ b/tests/test-bundle2.t
@@ -26,11 +26,11 @@  Create an extension to test bundle2 API
   > @bundle2.parthandler('test:song')
   > def songhandler(op, part):
   >     """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
   >     op.ui.write('The choir starts singing:\n')
   >     verses = 0
-  >     for line in part.data.split('\n'):
+  >     for line in part.read().split('\n'):
   >         op.ui.write('    %s\n' % line)
   >         verses += 1
   >     op.records.add('song', {'verses': verses})
   > 
   > @bundle2.parthandler('test:ping')
@@ -150,11 +150,11 @@  Create an extension to test bundle2 API
   >     for p in unbundler:
   >         count += 1
   >         ui.write('  :%s:\n' % p.type)
   >         ui.write('    mandatory: %i\n' % len(p.mandatoryparams))
   >         ui.write('    advisory: %i\n' % len(p.advisoryparams))
-  >         ui.write('    payload: %i bytes\n' % len(p.data))
+  >         ui.write('    payload: %i bytes\n' % len(p.read()))
   >     ui.write('parts count:   %i\n' % count)
   > EOF
   $ cat >> $HGRCPATH << EOF
   > [extensions]
   > bundle2=$TESTTMP/bundle2.py
@@ -376,52 +376,52 @@  Test part
   start extraction of bundle2 parts
   part header size: 17
   part type: "test:empty"
   part id: "0"
   part parameters: 0
-  payload chunk size: 0
     :test:empty:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 17
   part type: "test:empty"
   part id: "1"
   part parameters: 0
-  payload chunk size: 0
     :test:empty:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 16
   part type: "test:song"
   part id: "2"
   part parameters: 0
-  payload chunk size: 178
-  payload chunk size: 0
     :test:song:
       mandatory: 0
       advisory: 0
+  payload chunk size: 178
+  payload chunk size: 0
       payload: 178 bytes
   part header size: 43
   part type: "test:math"
   part id: "3"
   part parameters: 3
-  payload chunk size: 2
-  payload chunk size: 0
     :test:math:
       mandatory: 2
       advisory: 1
+  payload chunk size: 2
+  payload chunk size: 0
       payload: 2 bytes
   part header size: 16
   part type: "test:ping"
   part id: "4"
   part parameters: 0
-  payload chunk size: 0
     :test:ping:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 0
   end of bundle2 stream
   parts count:   5
 
@@ -436,43 +436,43 @@  Process the bundle
   start extraction of bundle2 parts
   part header size: 17
   part type: "test:empty"
   part id: "0"
   part parameters: 0
+  ignoring unknown advisory part 'test:empty'
   payload chunk size: 0
-  ignoring unknown advisory part 'test:empty'
   part header size: 17
   part type: "test:empty"
   part id: "1"
   part parameters: 0
+  ignoring unknown advisory part 'test:empty'
   payload chunk size: 0
-  ignoring unknown advisory part 'test:empty'
   part header size: 16
   part type: "test:song"
   part id: "2"
   part parameters: 0
+  found a handler for part 'test:song'
+  The choir starts singing:
   payload chunk size: 178
   payload chunk size: 0
-  found a handler for part 'test:song'
-  The choir starts singing:
       Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
       Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
       Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
   part header size: 43
   part type: "test:math"
   part id: "3"
   part parameters: 3
+  ignoring unknown advisory part 'test:math'
   payload chunk size: 2
   payload chunk size: 0
-  ignoring unknown advisory part 'test:math'
   part header size: 16
   part type: "test:ping"
   part id: "4"
   part parameters: 0
-  payload chunk size: 0
   found a handler for part 'test:ping'
   received ping request (id 4)
+  payload chunk size: 0
   part header size: 0
   end of bundle2 stream
   0 unread bytes
   3 total verses sung