Patchwork [01,of,10,V2] util: create new abstraction for compression engines

login
register
mail settings
Submitter Gregory Szorc
Date Nov. 8, 2016, 3:13 a.m.
Message ID <f3c9da54ff5e23becaa4.1478574829@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/17381/
State Accepted
Headers show

Comments

Gregory Szorc - Nov. 8, 2016, 3:13 a.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1478572299 28800
#      Mon Nov 07 18:31:39 2016 -0800
# Node ID f3c9da54ff5e23becaa4d0e90a20c9de704a70ba
# Parent  0911191dc4c97cbc8334c8b83782e8134bf621f0
util: create new abstraction for compression engines

Currently, util.py has "compressors" and "decompressors" dicts
mapping compression algorithms to callables returning objects that
perform well-defined operations. In addition, revlog.py has code
for calling into a compressor or decompressor explicitly. And, there
is code in the wire protocol for performing zlib compression.

The 3rd party lz4revlog extension has demonstrated the utility of
supporting alternative compression formats for revlog storage. But
it stops short of supporting lz4 for bundles and the wire protocol.

There are also plans to support zstd as a general compression
replacement.

So, there appears to be a market for a unified API for registering
compression engines. This commit starts the process of establishing
one.

This commit establishes a base class/interface for defining
compression engines and how they will be used. A collection class
to hold references to registered compression engines has also been
introduced.

The built-in zlib, bz2, truncated bz2, and no-op compression engines
are registered with a singleton instance of the collection class.

The compression engine API will change once consumers are ported
to the new API and some common patterns can be simplified at the
engine API level. So don't get too attached to the API...
Augie Fackler - Nov. 9, 2016, 5:32 p.m.
On Mon, Nov 07, 2016 at 07:13:49PM -0800, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1478572299 28800
> #      Mon Nov 07 18:31:39 2016 -0800
> # Node ID f3c9da54ff5e23becaa4d0e90a20c9de704a70ba
> # Parent  0911191dc4c97cbc8334c8b83782e8134bf621f0
> util: create new abstraction for compression engines

Can this code live in (say) "compression.py"? util is already shaggy,
and I'd rather not make it more of a dumping ground.

>
> Currently, util.py has "compressors" and "decompressors" dicts
> mapping compression algorithms to callables returning objects that
> perform well-defined operations. In addition, revlog.py has code
> for calling into a compressor or decompressor explicitly. And, there
> is code in the wire protocol for performing zlib compression.
>
> The 3rd party lz4revlog extension has demonstrated the utility of
> supporting alternative compression formats for revlog storage. But
> it stops short of supporting lz4 for bundles and the wire protocol.
>
> There are also plans to support zstd as a general compression
> replacement.
>
> So, there appears to be a market for a unified API for registering
> compression engines. This commit starts the process of establishing
> one.
>
> This commit establishes a base class/interface for defining
> compression engines and how they will be used. A collection class
> to hold references to registered compression engines has also been
> introduced.
>
> The built-in zlib, bz2, truncated bz2, and no-op compression engines
> are registered with a singleton instance of the collection class.
>
> The compression engine API will change once consumers are ported
> to the new API and some common patterns can be simplified at the
> engine API level. So don't get too attached to the API...
>
> diff --git a/mercurial/util.py b/mercurial/util.py
> --- a/mercurial/util.py
> +++ b/mercurial/util.py
> @@ -2856,13 +2856,219 @@ class ctxmanager(object):
>              raise exc_val
>          return received and suppressed
>
> -# compression utility
> +# compression code
> +
> +class compressormanager(object):
> +    """Holds registrations of various compression engines.
> +
> +    This class essentially abstracts the differences between compression
> +    engines to allow new compression formats to be added easily, possibly from
> +    extensions.
> +
> +    Compressors are registered against the global instance by calling its
> +    ``register()`` method.
> +    """
> +    def __init__(self):
> +        self._engines = {}
> +        # Bundle spec human name to engine name.
> +        self._bundlenames = {}
> +        # Internal bundle identifier to engine name.
> +        self._bundletypes = {}
> +
> +    def __getitem__(self, key):
> +        return self._engines[key]
> +
> +    def __contains__(self, key):
> +        return key in self._engines
> +
> +    def __iter__(self):
> +        return iter(self._engines.keys())
> +
> +    def register(self, engine):
> +        """Register a compression engine with the manager.
> +
> +        The argument must be a ``compressionengine`` instance.
> +        """
> +        if not isinstance(engine, compressionengine):
> +            raise ValueError(_('argument must be a compressionengine'))
> +
> +        name = engine.name()
> +
> +        if name in self._engines:
> +            raise error.Abort(_('compression engine %s already registered') %
> +                              name)
> +
> +        bundleinfo = engine.bundletype()
> +        if bundleinfo:
> +            bundlename, bundletype = bundleinfo
> +
> +            if bundlename in self._bundlenames:
> +                raise error.Abort(_('bundle name %s already registered') %
> +                                  bundlename)
> +            if bundletype in self._bundletypes:
> +                raise error.Abort(_('bundle type %s already registered by %s') %
> +                                  (bundletype, self._bundletypes[bundletype]))
> +
> +            # No external facing name declared.
> +            if bundlename:
> +                self._bundlenames[bundlename] = name
> +
> +            self._bundletypes[bundletype] = name
> +
> +        self._engines[name] = engine
> +
> +    @property
> +    def supportedbundlenames(self):
> +        return set(self._bundlenames.keys())
> +
> +    @property
> +    def supportedbundletypes(self):
> +        return set(self._bundletypes.keys())
> +
> +    def forbundlename(self, bundlename):
> +        """Obtain a compression engine registered to a bundle name.
> +
> +        Will raise KeyError if the bundle type isn't registered.
> +        """
> +        return self._engines[self._bundlenames[bundlename]]
> +
> +    def forbundletype(self, bundletype):
> +        """Obtain a compression engine registered to a bundle type.
> +
> +        Will raise KeyError if the bundle type isn't registered.
> +        """
> +        return self._engines[self._bundletypes[bundletype]]
> +
> +compengines = compressormanager()
> +
> +class compressionengine(object):
> +    """Base class for compression engines.
> +
> +    Compression engines must implement the interface defined by this class.
> +    """
> +    def name(self):
> +        """Returns the name of the compression engine.
> +
> +        This is the key the engine is registered under.
> +
> +        This method must be implemented.
> +        """
> +        raise NotImplementedError()
> +
> +    def bundletype(self):
> +        """Describes bundle identifiers for this engine.
> +
> +        If this compression engine isn't supported for bundles, returns None.
> +
> +        If this engine can be used for bundles, returns a 2-tuple of strings of
> +        the user-facing "bundle spec" compression name and an internal
> +        identifier used to denote the compression format within bundles. To
> +        exclude the name from external usage, set the first element to ``None``.
> +
> +        If bundle compression is supported, the class must also implement
> +        ``compressorobj`` and `decompressorreader``.
> +        """
> +        return None
> +
> +    def compressorobj(self):
> +        """(Temporary) Obtain an object used for compression.
> +
> +        The returned object has ``compress(data)`` and ``flush()`` methods.
> +        These are used to incrementally feed data chunks into a compressor.
> +        """
> +        raise NotImplementedError()
> +
> +    def decompressorreader(self, fh):
> +        """Perform decompression on a file object.
> +
> +        Argument is an object with a ``read(size)`` method that returns
> +        compressed data. Return value is an object with a ``read(size)`` that
> +        returns uncompressed data.
> +        """
> +        raise NotImplementedError()
> +
> +class _zlibengine(compressionengine):
> +    def name(self):
> +        return 'zlib'
> +
> +    def bundletype(self):
> +        return 'gzip', 'GZ'
> +
> +    def compressorobj(self):
> +        return zlib.compressobj()
> +
> +    def decompressorreader(self, fh):
> +        def gen():
> +            d = zlib.decompressobj()
> +            for chunk in filechunkiter(fh):
> +                yield d.decompress(chunk)
> +
> +        return chunkbuffer(gen())

Squinting at this, could compressionengine be a namedtuple that has
four fields? nothing ever uses self in these, so I think compressorobj
and decompressorreader could both be free functions that get tucked
into a namedtuple.

[elided rest of file that is routine]

Patch

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2856,13 +2856,219 @@  class ctxmanager(object):
             raise exc_val
         return received and suppressed
 
-# compression utility
+# compression code
+
+class compressormanager(object):
+    """Holds registrations of various compression engines.
+
+    This class essentially abstracts the differences between compression
+    engines to allow new compression formats to be added easily, possibly from
+    extensions.
+
+    Compressors are registered against the global instance by calling its
+    ``register()`` method.
+    """
+    def __init__(self):
+        self._engines = {}
+        # Bundle spec human name to engine name.
+        self._bundlenames = {}
+        # Internal bundle identifier to engine name.
+        self._bundletypes = {}
+
+    def __getitem__(self, key):
+        return self._engines[key]
+
+    def __contains__(self, key):
+        return key in self._engines
+
+    def __iter__(self):
+        return iter(self._engines.keys())
+
+    def register(self, engine):
+        """Register a compression engine with the manager.
+
+        The argument must be a ``compressionengine`` instance.
+        """
+        if not isinstance(engine, compressionengine):
+            raise ValueError(_('argument must be a compressionengine'))
+
+        name = engine.name()
+
+        if name in self._engines:
+            raise error.Abort(_('compression engine %s already registered') %
+                              name)
+
+        bundleinfo = engine.bundletype()
+        if bundleinfo:
+            bundlename, bundletype = bundleinfo
+
+            if bundlename in self._bundlenames:
+                raise error.Abort(_('bundle name %s already registered') %
+                                  bundlename)
+            if bundletype in self._bundletypes:
+                raise error.Abort(_('bundle type %s already registered by %s') %
+                                  (bundletype, self._bundletypes[bundletype]))
+
+            # No external facing name declared.
+            if bundlename:
+                self._bundlenames[bundlename] = name
+
+            self._bundletypes[bundletype] = name
+
+        self._engines[name] = engine
+
+    @property
+    def supportedbundlenames(self):
+        return set(self._bundlenames.keys())
+
+    @property
+    def supportedbundletypes(self):
+        return set(self._bundletypes.keys())
+
+    def forbundlename(self, bundlename):
+        """Obtain a compression engine registered to a bundle name.
+
+        Will raise KeyError if the bundle type isn't registered.
+        """
+        return self._engines[self._bundlenames[bundlename]]
+
+    def forbundletype(self, bundletype):
+        """Obtain a compression engine registered to a bundle type.
+
+        Will raise KeyError if the bundle type isn't registered.
+        """
+        return self._engines[self._bundletypes[bundletype]]
+
+compengines = compressormanager()
+
+class compressionengine(object):
+    """Base class for compression engines.
+
+    Compression engines must implement the interface defined by this class.
+    """
+    def name(self):
+        """Returns the name of the compression engine.
+
+        This is the key the engine is registered under.
+
+        This method must be implemented.
+        """
+        raise NotImplementedError()
+
+    def bundletype(self):
+        """Describes bundle identifiers for this engine.
+
+        If this compression engine isn't supported for bundles, returns None.
+
+        If this engine can be used for bundles, returns a 2-tuple of strings of
+        the user-facing "bundle spec" compression name and an internal
+        identifier used to denote the compression format within bundles. To
+        exclude the name from external usage, set the first element to ``None``.
+
+        If bundle compression is supported, the class must also implement
+        ``compressorobj`` and `decompressorreader``.
+        """
+        return None
+
+    def compressorobj(self):
+        """(Temporary) Obtain an object used for compression.
+
+        The returned object has ``compress(data)`` and ``flush()`` methods.
+        These are used to incrementally feed data chunks into a compressor.
+        """
+        raise NotImplementedError()
+
+    def decompressorreader(self, fh):
+        """Perform decompression on a file object.
+
+        Argument is an object with a ``read(size)`` method that returns
+        compressed data. Return value is an object with a ``read(size)`` that
+        returns uncompressed data.
+        """
+        raise NotImplementedError()
+
+class _zlibengine(compressionengine):
+    def name(self):
+        return 'zlib'
+
+    def bundletype(self):
+        return 'gzip', 'GZ'
+
+    def compressorobj(self):
+        return zlib.compressobj()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = zlib.decompressobj()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_zlibengine())
+
+class _bz2engine(compressionengine):
+    def name(self):
+        return 'bz2'
+
+    def bundletype(self):
+        return 'bzip2', 'BZ'
+
+    def compressorobj(self):
+        return bz2.BZ2Compressor()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = bz2.BZ2Decompressor()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_bz2engine())
+
+class _truncatedbz2engine(compressionengine):
+    def name(self):
+        return 'bz2truncated'
+
+    def bundletype(self):
+        return None, '_truncatedBZ'
+
+    # We don't implement compressorobj because it is hackily handled elsewhere.
+
+    def decompressorreader(self, fh):
+        def gen():
+            # The input stream doesn't have the 'BZ' header. So add it back.
+            d = bz2.BZ2Decompressor()
+            d.decompress('BZ')
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_truncatedbz2engine())
 
 class nocompress(object):
     def compress(self, x):
         return x
+
     def flush(self):
-        return ""
+        return ''
+
+class _noopengine(compressionengine):
+    def name(self):
+        return 'none'
+
+    def bundletype(self):
+        return 'none', 'UN'
+
+    def compressorobj(self):
+        return nocompress()
+
+    def decompressorreader(self, fh):
+        return fh
+
+compengines.register(_noopengine())
 
 compressors = {
     None: nocompress,