Patchwork [09,of,11] bundle2: use compressstream compression engine API

login
register
mail settings
Submitter Gregory Szorc
Date Nov. 2, 2016, 12:08 a.m.
Message ID <03555032b7e3bc7192fd.1478045322@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/17252/
State Superseded
Headers show

Comments

Gregory Szorc - Nov. 2, 2016, 12:08 a.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1477160145 25200
#      Sat Oct 22 11:15:45 2016 -0700
# Node ID 03555032b7e3bc7192fd8bebf6af3f05b1e70516
# Parent  1d4d111b644453acc4893478528a5f2ecd7ca023
bundle2: use compressstream compression engine API

Compression engines now have an API for compressing a stream of
chunks. Switch to it and make low-level compression code disappear.
Pierre-Yves David - Nov. 7, 2016, 2:13 p.m.
On 11/02/2016 01:08 AM, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1477160145 25200
> #      Sat Oct 22 11:15:45 2016 -0700
> # Node ID 03555032b7e3bc7192fd8bebf6af3f05b1e70516
> # Parent  1d4d111b644453acc4893478528a5f2ecd7ca023
> bundle2: use compressstream compression engine API
>
> Compression engines now have an API for compressing a stream of
> chunks. Switch to it and make low-level compression code disappear.

Do we get any performance benefit for this ? I know you have spend a lot 
of time tracking performance gain in bundle creation/application. And 
this likely have some effect.

Talking about performance, Philippe Pépiot have a patch to setup some 
official performance tracking tool, if you could help reviewing it we 
could include these operations to it and we would have an easy and 
standard way to get these number.

> diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
> --- a/mercurial/bundle2.py
> +++ b/mercurial/bundle2.py
> @@ -566,23 +566,18 @@ class bundle20(object):
>              self.ui.debug(''.join(msg))
>          outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
>          yield self._magicstring
>          param = self._paramchunk()
>          outdebug(self.ui, 'bundle parameter: %s' % param)
>          yield _pack(_fstreamparamsize, len(param))
>          if param:
>              yield param
> -        # starting compression
> -        compressor = self._compengine.compressorobj()
> -        for chunk in self._getcorechunk():
> -            data = compressor.compress(chunk)
> -            if data:
> -                yield data
> -        yield compressor.flush()
> +        for chunk in self._compengine.compressstream(self._getcorechunk()):
> +            yield chunk
>
>      def _paramchunk(self):
>          """return a encoded version of all stream parameters"""
>          blocks = []
>          for par, value in self._params:
>              par = urlreq.quote(par)
>              if value is not None:
>                  value = urlreq.quote(value)
> @@ -1318,25 +1313,20 @@ def writebundle(ui, cg, filename, bundle
>          if cg.version != '01':
>              raise error.Abort(_('old bundle types only supports v1 '
>                                  'changegroups'))
>          header, comp = bundletypes[bundletype]
>          if comp not in util.compressionengines.supportedbundletypes:
>              raise error.Abort(_('unknown stream compression type: %s')
>                                % comp)
>          compengine = util.compressionengines.forbundletype(comp)
> -        compressor = compengine.compressorobj()
> -        subchunkiter = cg.getchunks()
>          def chunkiter():
>              yield header
> -            for chunk in subchunkiter:
> -                data = compressor.compress(chunk)
> -                if data:
> -                    yield data
> -            yield compressor.flush()
> +            for chunk in compengine.compressstream(cg.getchunks()):
> +                yield chunk
>          chunkiter = chunkiter()
>
>      # parse the changegroup data, otherwise we will block
>      # in case of sshrepo because we don't know the end of the stream
>      return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
>
>  @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
>  def handlechangegroup(op, inpart):
Gregory Szorc - Nov. 8, 2016, 2:53 a.m.
On Mon, Nov 7, 2016 at 6:13 AM, Pierre-Yves David <
pierre-yves.david@ens-lyon.org> wrote:

>
>
> On 11/02/2016 01:08 AM, Gregory Szorc wrote:
>
>> # HG changeset patch
>> # User Gregory Szorc <gregory.szorc@gmail.com>
>> # Date 1477160145 25200
>> #      Sat Oct 22 11:15:45 2016 -0700
>> # Node ID 03555032b7e3bc7192fd8bebf6af3f05b1e70516
>> # Parent  1d4d111b644453acc4893478528a5f2ecd7ca023
>> bundle2: use compressstream compression engine API
>>
>> Compression engines now have an API for compressing a stream of
>> chunks. Switch to it and make low-level compression code disappear.
>>
>
> Do we get any performance benefit for this ? I know you have spend a lot
> of time tracking performance gain in bundle creation/application. And this
> likely have some effect.
>
> Talking about performance, Philippe Pépiot have a patch to setup some
> official performance tracking tool, if you could help reviewing it we could
> include these operations to it and we would have an easy and standard way
> to get these number.


From this patch, most likely not. The reason is because the code is nearly
identical and I expect any performance changes due to how functions are
called to be dwarfed by the time spent inside the compressor.


>
>
> diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
>> --- a/mercurial/bundle2.py
>> +++ b/mercurial/bundle2.py
>> @@ -566,23 +566,18 @@ class bundle20(object):
>>              self.ui.debug(''.join(msg))
>>          outdebug(self.ui, 'start emission of %s stream' %
>> self._magicstring)
>>          yield self._magicstring
>>          param = self._paramchunk()
>>          outdebug(self.ui, 'bundle parameter: %s' % param)
>>          yield _pack(_fstreamparamsize, len(param))
>>          if param:
>>              yield param
>> -        # starting compression
>> -        compressor = self._compengine.compressorobj()
>> -        for chunk in self._getcorechunk():
>> -            data = compressor.compress(chunk)
>> -            if data:
>> -                yield data
>> -        yield compressor.flush()
>> +        for chunk in self._compengine.compressstrea
>> m(self._getcorechunk()):
>> +            yield chunk
>>
>>      def _paramchunk(self):
>>          """return a encoded version of all stream parameters"""
>>          blocks = []
>>          for par, value in self._params:
>>              par = urlreq.quote(par)
>>              if value is not None:
>>                  value = urlreq.quote(value)
>> @@ -1318,25 +1313,20 @@ def writebundle(ui, cg, filename, bundle
>>          if cg.version != '01':
>>              raise error.Abort(_('old bundle types only supports v1 '
>>                                  'changegroups'))
>>          header, comp = bundletypes[bundletype]
>>          if comp not in util.compressionengines.supportedbundletypes:
>>              raise error.Abort(_('unknown stream compression type: %s')
>>                                % comp)
>>          compengine = util.compressionengines.forbundletype(comp)
>> -        compressor = compengine.compressorobj()
>> -        subchunkiter = cg.getchunks()
>>          def chunkiter():
>>              yield header
>> -            for chunk in subchunkiter:
>> -                data = compressor.compress(chunk)
>> -                if data:
>> -                    yield data
>> -            yield compressor.flush()
>> +            for chunk in compengine.compressstream(cg.getchunks()):
>> +                yield chunk
>>          chunkiter = chunkiter()
>>
>>      # parse the changegroup data, otherwise we will block
>>      # in case of sshrepo because we don't know the end of the stream
>>      return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
>>
>>  @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
>>  def handlechangegroup(op, inpart):
>>
>
>
> --
> Pierre-Yves David
>

Patch

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -566,23 +566,18 @@  class bundle20(object):
             self.ui.debug(''.join(msg))
         outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
         yield self._magicstring
         param = self._paramchunk()
         outdebug(self.ui, 'bundle parameter: %s' % param)
         yield _pack(_fstreamparamsize, len(param))
         if param:
             yield param
-        # starting compression
-        compressor = self._compengine.compressorobj()
-        for chunk in self._getcorechunk():
-            data = compressor.compress(chunk)
-            if data:
-                yield data
-        yield compressor.flush()
+        for chunk in self._compengine.compressstream(self._getcorechunk()):
+            yield chunk
 
     def _paramchunk(self):
         """return a encoded version of all stream parameters"""
         blocks = []
         for par, value in self._params:
             par = urlreq.quote(par)
             if value is not None:
                 value = urlreq.quote(value)
@@ -1318,25 +1313,20 @@  def writebundle(ui, cg, filename, bundle
         if cg.version != '01':
             raise error.Abort(_('old bundle types only supports v1 '
                                 'changegroups'))
         header, comp = bundletypes[bundletype]
         if comp not in util.compressionengines.supportedbundletypes:
             raise error.Abort(_('unknown stream compression type: %s')
                               % comp)
         compengine = util.compressionengines.forbundletype(comp)
-        compressor = compengine.compressorobj()
-        subchunkiter = cg.getchunks()
         def chunkiter():
             yield header
-            for chunk in subchunkiter:
-                data = compressor.compress(chunk)
-                if data:
-                    yield data
-            yield compressor.flush()
+            for chunk in compengine.compressstream(cg.getchunks()):
+                yield chunk
         chunkiter = chunkiter()
 
     # parse the changegroup data, otherwise we will block
     # in case of sshrepo because we don't know the end of the stream
     return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
 
 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
 def handlechangegroup(op, inpart):