Patchwork [08,of,11] util: add a stream compression API to compression engines

login
register
mail settings
Submitter Gregory Szorc
Date Nov. 2, 2016, 12:08 a.m.
Message ID <1d4d111b644453acc489.1478045321@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/17251/
State Superseded
Headers show

Comments

Gregory Szorc - Nov. 2, 2016, 12:08 a.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1477159930 25200
#      Sat Oct 22 11:12:10 2016 -0700
# Node ID 1d4d111b644453acc4893478528a5f2ecd7ca023
# Parent  289da69280d95f1b983fdf9216739411a9953fb6
util: add a stream compression API to compression engines

It is a common pattern throughout the code to perform compression
on an iterator of chunks, yielding an iterator of compressed chunks.
Let's formalize that as part of the compression engine API.
Pierre-Yves David - Nov. 7, 2016, 2:09 p.m.
On 11/02/2016 01:08 AM, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1477159930 25200
> #      Sat Oct 22 11:12:10 2016 -0700
> # Node ID 1d4d111b644453acc4893478528a5f2ecd7ca023
> # Parent  289da69280d95f1b983fdf9216739411a9953fb6
> util: add a stream compression API to compression engines
>
> It is a common pattern throughout the code to perform compression
> on an iterator of chunks, yielding an iterator of compressed chunks.
> Let's formalize that as part of the compression engine API.

The basic compression implementation for stream compression will be 
similar. We should maybe have a base class for these object?

>
> diff --git a/mercurial/util.py b/mercurial/util.py
> --- a/mercurial/util.py
> +++ b/mercurial/util.py
> @@ -2890,16 +2890,22 @@ class compressormanager(object):
>            format as used by bundles.
>
>          * compressorobj -- Method returning an object with ``compress(data)``
>            and ``flush()`` methods. This object and these methods are used to
>            incrementally feed data (presumably uncompressed) chunks into a
>            compressor. Calls to these methods return compressed bytes, which
>            may be 0-length if there is no output for the operation.
>
> +        * compressstream -- Compress an iterator of chunks and return an
> +          iterator of compressed chunks.
> +
> +          Optionally accepts an argument defining how to perform compression.
> +          Each engine treats this argument differently.
> +
>          * decompressorreader -- Method that is used to perform decompression
>            on a file object. Argument is an object with a ``read(size)`` method
>            that returns compressed data. Return value is an object with a
>            ``read(size)`` that returns uncompressed data.
>          """
>          bundletype = getattr(engine, 'bundletype', None)
>          if bundletype and bundletype in self._bundletypes:
>              raise error.Abort(_('bundle type %s is already registered') %
> @@ -2925,16 +2931,29 @@ compressionengines = compressormanager()
>  class _zlibengine(object):
>      @property
>      def bundletype(self):
>          return 'GZ'
>
>      def compressorobj(self):
>          return zlib.compressobj()
>
> +    def compressstream(self, it, opts=None):
> +        opts = opts or {}
> +
> +        z = zlib.compressobj(opts.get('level', -1))
> +        for chunk in it:
> +            data = z.compress(chunk)
> +            # Not all calls to compress emit data. It is cheaper to inspect
> +            # here than to feed empty chunks through generator.
> +            if data:
> +                yield data
> +
> +        yield z.flush()
> +
>      def decompressorreader(self, fh):
>          def gen():
>              d = zlib.decompressobj()
>              for chunk in filechunkiter(fh):
>                  yield d.decompress(chunk)
>
>          return chunkbuffer(gen())
>
> @@ -2943,16 +2962,26 @@ compressionengines.register('zlib', _zli
>  class _bz2engine(object):
>      @property
>      def bundletype(self):
>          return 'BZ'
>
>      def compressorobj(self):
>          return bz2.BZ2Compressor()
>
> +    def compressstream(self, it, opts=None):
> +        opts = opts or {}
> +        z = bz2.BZ2Compressor(opts.get('level', 9))
> +        for chunk in it:
> +            data = z.compress(chunk)
> +            if data:
> +                yield data
> +
> +        yield z.flush()
> +
>      def decompressorreader(self, fh):
>          def gen():
>              d = bz2.BZ2Decompressor()
>              for chunk in filechunkiter(fh):
>                  yield d.decompress(chunk)
>
>          return chunkbuffer(gen())
>
> @@ -2987,15 +3016,18 @@ class nocompress(object):
>  class _noopengine(object):
>      @property
>      def bundletype(self):
>          return 'UN'
>
>      def compressorobj(self):
>          return nocompress()
>
> +    def compressstream(self, it, opts=None):
> +        return it
> +
>      def decompressorreader(self, fh):
>          return fh
>
>  compressionengines.register('none', _noopengine())
>
>  # convenient shortcut
>  dst = debugstacktrace
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
>

Patch

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2890,16 +2890,22 @@  class compressormanager(object):
           format as used by bundles.
 
         * compressorobj -- Method returning an object with ``compress(data)``
           and ``flush()`` methods. This object and these methods are used to
           incrementally feed data (presumably uncompressed) chunks into a
           compressor. Calls to these methods return compressed bytes, which
           may be 0-length if there is no output for the operation.
 
+        * compressstream -- Compress an iterator of chunks and return an
+          iterator of compressed chunks.
+
+          Optionally accepts an argument defining how to perform compression.
+          Each engine treats this argument differently.
+
         * decompressorreader -- Method that is used to perform decompression
           on a file object. Argument is an object with a ``read(size)`` method
           that returns compressed data. Return value is an object with a
           ``read(size)`` that returns uncompressed data.
         """
         bundletype = getattr(engine, 'bundletype', None)
         if bundletype and bundletype in self._bundletypes:
             raise error.Abort(_('bundle type %s is already registered') %
@@ -2925,16 +2931,29 @@  compressionengines = compressormanager()
 class _zlibengine(object):
     @property
     def bundletype(self):
         return 'GZ'
 
     def compressorobj(self):
         return zlib.compressobj()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = zlib.decompressobj()
             for chunk in filechunkiter(fh):
                 yield d.decompress(chunk)
 
         return chunkbuffer(gen())
 
@@ -2943,16 +2962,26 @@  compressionengines.register('zlib', _zli
 class _bz2engine(object):
     @property
     def bundletype(self):
         return 'BZ'
 
     def compressorobj(self):
         return bz2.BZ2Compressor()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = bz2.BZ2Decompressor()
             for chunk in filechunkiter(fh):
                 yield d.decompress(chunk)
 
         return chunkbuffer(gen())
 
@@ -2987,15 +3016,18 @@  class nocompress(object):
 class _noopengine(object):
     @property
     def bundletype(self):
         return 'UN'
 
     def compressorobj(self):
         return nocompress()
 
+    def compressstream(self, it, opts=None):
+        return it
+
     def decompressorreader(self, fh):
         return fh
 
 compressionengines.register('none', _noopengine())
 
 # convenient shortcut
 dst = debugstacktrace