Patchwork D1387: bundle2: implement generic part payload decoder

login
register
mail settings
Submitter phabricator
Date Nov. 20, 2017, 11:50 p.m.
Message ID <698348eab25fea8808e826998ac969e2@localhost.localdomain>
Download mbox | patch
Permalink /patch/25689/
State Not Applicable
Headers show

Comments

phabricator - Nov. 20, 2017, 11:50 p.m.
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG8aa43ff9c12c: bundle2: implement generic part payload decoder (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D1387?vs=3465&id=3691

REVISION DETAIL
  https://phab.mercurial-scm.org/D1387

AFFECTED FILES
  mercurial/bundle2.py

CHANGE DETAILS




To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel

Patch

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -1187,6 +1187,32 @@ 
     def gettransaction(self):
         raise TransactionUnavailable('no repo access from stream interruption')
 
+def decodepayloadchunks(ui, fh):
+    """Reads bundle2 part payload data into chunks.
+
+    Part payload data consists of framed chunks. This function takes
+    a file handle and emits those chunks.
+    """
+    headersize = struct.calcsize(_fpayloadsize)
+    readexactly = changegroup.readexactly
+
+    chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+    indebug(ui, 'payload chunk size: %i' % chunksize)
+
+    while chunksize:
+        if chunksize >= 0:
+            yield readexactly(fh, chunksize)
+        elif chunksize == flaginterrupt:
+            # Interrupt "signal" detected. The regular stream is interrupted
+            # and a bundle2 part follows. Consume it.
+            interrupthandler(ui, fh)()
+        else:
+            raise error.BundleValueError(
+                'negative payload chunk size: %s' % chunksize)
+
+        chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+        indebug(ui, 'payload chunk size: %i' % chunksize)
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
@@ -1270,6 +1296,10 @@ 
         # we read the data, tell it
         self._initialized = True
 
+    def _payloadchunks(self):
+        """Generator of decoded chunks in the payload."""
+        return decodepayloadchunks(self.ui, self._fp)
+
     def read(self, size=None):
         """read payload data"""
         if not self._initialized:
@@ -1320,25 +1350,14 @@ 
             self._seekfp(self._chunkindex[chunknum][1])
 
         pos = self._chunkindex[chunknum][0]
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        indebug(self.ui, 'payload chunk size: %i' % payloadsize)
-        while payloadsize:
-            if payloadsize == flaginterrupt:
-                # interruption detection, the handler will now read a
-                # single part and process it.
-                interrupthandler(self.ui, self._fp)()
-            elif payloadsize < 0:
-                msg = 'negative payload chunk size: %i' %  payloadsize
-                raise error.BundleValueError(msg)
-            else:
-                result = self._readexact(payloadsize)
-                chunknum += 1
-                pos += payloadsize
-                if chunknum == len(self._chunkindex):
-                    self._chunkindex.append((pos, self._tellfp()))
-                yield result
-            payloadsize = self._unpack(_fpayloadsize)[0]
-            indebug(self.ui, 'payload chunk size: %i' % payloadsize)
+
+        for chunk in decodepayloadchunks(self.ui, self._fp):
+            chunknum += 1
+            pos += len(chunk)
+            if chunknum == len(self._chunkindex):
+                self._chunkindex.append((pos, self._tellfp()))
+
+            yield chunk
 
     def _findchunk(self, pos):
         '''for a given payload position, return a chunk number and offset'''