Patchwork [07,of,10,V2] util: add a stream compression API to compression engines

login
register
mail settings
Submitter Gregory Szorc
Date Nov. 8, 2016, 3:13 a.m.
Message ID <fa24595b79b603ff7be6.1478574835@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/17390/
State Accepted
Headers show

Comments

Gregory Szorc - Nov. 8, 2016, 3:13 a.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1478573827 28800
#      Mon Nov 07 18:57:07 2016 -0800
# Node ID fa24595b79b603ff7be6f32b849c07ddfdee3da4
# Parent  8672777162085c92b836ce1e97ca254734b0fae0
util: add a stream compression API to compression engines

It is a common pattern throughout the code to perform compression
on an iterator of chunks, yielding an iterator of compressed chunks.
Let's formalize that as part of the compression engine API.

The zlib and bzip2 implementations allow an optional "level" option
to control the compression level. The default values are the same as
what the Python modules use. This option will be used in subsequent
patches.

Patch

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2966,10 +2966,22 @@  class compressionengine(object):
         exclude the name from external usage, set the first element to ``None``.
 
         If bundle compression is supported, the class must also implement
-        ``compressorobj`` and `decompressorreader``.
+        ``compressstream``, ``compressorobj`` and `decompressorreader``.
         """
         return None
 
+    def compressstream(self, it, opts=None):
+        """Compress an iterator of chunks.
+
+        The method receives an iterator (ideally a generator) of chunks of
+        bytes to be compressed. It returns an iterator (ideally a generator)
+        of bytes of chunks representing the compressed output.
+
+        Optionally accepts an argument defining how to perform compression.
+        Each engine treats this argument differently.
+        """
+        raise NotImplementedError()
+
     def compressorobj(self):
         """(Temporary) Obtain an object used for compression.
 
@@ -2997,6 +3009,19 @@  class _zlibengine(compressionengine):
     def compressorobj(self):
         return zlib.compressobj()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = zlib.decompressobj()
@@ -3017,6 +3042,16 @@  class _bz2engine(compressionengine):
     def compressorobj(self):
         return bz2.BZ2Compressor()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = bz2.BZ2Decompressor()
@@ -3065,6 +3100,9 @@  class _noopengine(compressionengine):
     def compressorobj(self):
         return nocompress()
 
+    def compressstream(self, it, opts=None):
+        return it
+
     def decompressorreader(self, fh):
         return fh