Patchwork [3,of,8,zstd-revlogs] util: compression APIs to support revlog compression

login
register
mail settings
Submitter Gregory Szorc
Date Jan. 2, 2017, 11:57 p.m.
Message ID <edeb0e3c69621e638bfd.1483401474@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/18077/
State Accepted
Headers show

Comments

Gregory Szorc - Jan. 2, 2017, 11:57 p.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1483389543 28800
#      Mon Jan 02 12:39:03 2017 -0800
# Node ID edeb0e3c69621e638bfd4a0a18a8e0d080637f2e
# Parent  346b798126c521eb44fe480ddd25e2779df1b39b
util: compression APIs to support revlog compression

As part of "zstd all of the things," we need to teach revlogs to
use non-zlib compression formats. Because we're routing all compression
via the "compression manager" and "compression engine" APIs, we need to
introduction functionality there for performing revlog operations.

Ideally, revlog compression and decompression operations would be
implemented in terms of simple "compress" and "decompress" primitives.
However, there are a few considerations that make us want to have a
specialized primitive for handling revlogs:

1) Performance. Revlogs tend to do compression and especially
   decompression operations in batches. Any overhead for e.g.
   instantiating a "context" for performing an operation can be
   noticed. For this reason, our "revlog compressor" primitive is
   reusable. For zstd, we reuse the same compression "context" for
   multiple operations. I've measured this to have a performance
   impact versus constructing new contexts for each operation.

2) Specialization. By having a primitive dedicated to revlog use,
   we can make revlog-specific choices and leave the door open for
   more functionality in the future. For example, the zstd revlog
   compressor may one day make use of dictionary compression.

A future patch will introduce a decompress() on the compressor
object.

The code for the zlib compressor is basically copied from
revlog.compress(). Although it doesn't handle the empty input
case, the null first byte case, and the 'u' prefix case. These
cases will continue to be handled in revlog.py once that code is
ported to use this API.

Patch

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -3101,6 +3101,19 @@  class compressionengine(object):
         """
         raise NotImplementedError()
 
+    def revlogcompressor(self, opts=None):
+        """Obtain an object that can be used to compress revlog entries.
+
+        The object has a ``compress(data)`` method that compresses binary
+        data. This method returns compressed binary data or ``None`` if
+        the data could not be compressed (too small, not compressible, etc).
+        The returned data should have a header uniquely identifying this
+        compression format so decompression can be routed to this engine.
+
+        The object is reusable but is not thread safe.
+        """
+        raise NotImplementedError()
+
 class _zlibengine(compressionengine):
     def name(self):
         return 'zlib'
@@ -3132,6 +3145,41 @@  class _zlibengine(compressionengine):
 
         return chunkbuffer(gen())
 
+    class zlibrevlogcompressor(object):
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 44:
+                return None
+
+            elif insize <= 1000000:
+                compressed = zlib.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+
+            # zlib makes an internal copy of the input buffer, doubling
+            # memory usage for large inputs. So do streaming compression
+            # on large inputs.
+            else:
+                z = zlib.compressobj()
+                parts = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + 2**20
+                    parts.append(z.compress(data[pos:pos2]))
+                    pos = pos2
+                parts.append(z.flush())
+
+                if sum(map(len, parts)) < insize:
+                    return ''.join(parts)
+                return None
+
+    def revlogcompressor(self, opts=None):
+        return self.zlibrevlogcompressor()
+
 compengines.register(_zlibengine())
 
 class _bz2engine(compressionengine):
@@ -3195,6 +3243,13 @@  class _noopengine(compressionengine):
     def decompressorreader(self, fh):
         return fh
 
+    class nooprevlogcompressor(object):
+        def compress(self, data):
+            return None
+
+    def revlogcompressor(self, opts=None):
+        return self.nooprevlogcompressor()
+
 compengines.register(_noopengine())
 
 class _zstdengine(compressionengine):
@@ -3240,6 +3295,49 @@  class _zstdengine(compressionengine):
         dctx = zstd.ZstdDecompressor()
         return chunkbuffer(dctx.read_from(fh))
 
+    class zstdrevlogcompressor(object):
+        def __init__(self, zstd, level=3):
+            # Writing the content size adds a few bytes to the output. However,
+            # it allows decompression to be more optimal since we can
+            # pre-allocate a buffer to hold the result.
+            self._cctx = zstd.ZstdCompressor(level=level,
+                                             write_content_size=True)
+            self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 50:
+                return None
+
+            elif insize <= 1000000:
+                compressed = self._cctx.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+            else:
+                z = self._cctx.compressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._compinsize
+                    chunk = z.compress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                chunks.append(z.flush())
+
+                if sum(map(len, chunks)) < insize:
+                    return ''.join(chunks)
+                return None
+
+    def revlogcompressor(self, opts=None):
+        opts = opts or {}
+        return self.zstdrevlogcompressor(self._module,
+                                         level=opts.get('level', 3))
+
 compengines.register(_zstdengine())
 
 # convenient shortcut