@@ -2963,6 +2963,8 @@ class compressormanager(object):
self._bundlenames = {}
# Internal bundle identifier to engine name.
self._bundletypes = {}
+ # Revlog header to engine name.
+ self._revlogheaders = {}
def __getitem__(self, key):
return self._engines[key]
@@ -3004,6 +3006,14 @@ class compressormanager(object):
self._bundletypes[bundletype] = name
+ revlogheader = engine.revlogheader()
+ if revlogheader and revlogheader in self._revlogheaders:
+ raise error.Abort(_('revlog header %s already registered by %s') %
+ (revlogheader, self._revlogheaders[revlogheader]))
+
+ if revlogheader:
+ self._revlogheaders[revlogheader] = name
+
self._engines[name] = engine
@property
@@ -3040,6 +3050,13 @@ class compressormanager(object):
engine.name())
return engine
+ def forrevlogheader(self, header):
+ """Obtain a compression engine registered to a revlog header.
+
+ Will raise KeyError if the revlog header value isn't registered.
+ """
+ return self._engines[self._revlogheaders[header]]
+
compengines = compressormanager()
class compressionengine(object):
@@ -3080,6 +3097,16 @@ class compressionengine(object):
"""
return None
+ def revlogheader(self):
+ """Header added to revlog chunks that identifies this engine.
+
+ If this engine can be used to compress revlogs, this method should
+ return the bytes used to identify chunks compressed with this engine.
+ Else, the method should return ``None`` to indicate it does not
+ participate in revlog compression.
+ """
+ return None
+
def compressstream(self, it, opts=None):
"""Compress an iterator of chunks.
@@ -3109,6 +3136,13 @@ class compressionengine(object):
the data could not be compressed (too small, not compressible, etc).
The returned data should have a header uniquely identifying this
compression format so decompression can be routed to this engine.
+ This header should be identified by the ``revlogheader()`` return
+ value.
+
+ The object has a ``decompress(data)`` method that decompresses
+ data. The method will only be called if ``data`` begins with
+ ``revlogheader()``. The method should return the raw, uncompressed
+ data or raise a ``RevlogError``.
The object is reusable but is not thread safe.
"""
@@ -3121,6 +3155,9 @@ class _zlibengine(compressionengine):
def bundletype(self):
return 'gzip', 'GZ'
+ def revlogheader(self):
+ return 'x'
+
def compressstream(self, it, opts=None):
opts = opts or {}
@@ -3177,6 +3214,13 @@ class _zlibengine(compressionengine):
return ''.join(parts)
return None
+ def decompress(self, data):
+ try:
+ return zlib.decompress(data)
+ except zlib.error as e:
+ raise error.RevlogError(_('revlog decompress error: %s') %
+ str(e))
+
def revlogcompressor(self, opts=None):
return self.zlibrevlogcompressor()
@@ -3237,6 +3281,9 @@ class _noopengine(compressionengine):
def bundletype(self):
return 'none', 'UN'
+ # We don't implement revlogheader because it is handled specially
+ # in the revlog class.
+
def compressstream(self, it, opts=None):
return it
@@ -3274,6 +3321,9 @@ class _zstdengine(compressionengine):
def bundletype(self):
return 'zstd', 'ZS'
+ def revlogheader(self):
+ return '\x28'
+
def compressstream(self, it, opts=None):
opts = opts or {}
# zstd level 3 is almost always significantly faster than zlib
@@ -3302,7 +3352,9 @@ class _zstdengine(compressionengine):
# pre-allocate a buffer to hold the result.
self._cctx = zstd.ZstdCompressor(level=level,
write_content_size=True)
+ self._dctx = zstd.ZstdDecompressor()
self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+ self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
def compress(self, data):
insize = len(data)
@@ -3333,6 +3385,28 @@ class _zstdengine(compressionengine):
return ''.join(chunks)
return None
+ def decompress(self, data):
+ insize = len(data)
+
+ try:
+ # This was measured to be faster than other streaming
+ # decompressors.
+ dobj = self._dctx.decompressobj()
+ chunks = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + self._decompinsize
+ chunk = dobj.decompress(data[pos:pos2])
+ if chunk:
+ chunks.append(chunk)
+ pos = pos2
+ # Frame should be exhausted, so no finish() API.
+
+ return ''.join(chunks)
+ except Exception as e:
+ raise error.RevlogError(_('revlog decompress error: %s') %
+ str(e))
+
def revlogcompressor(self, opts=None):
opts = opts or {}
return self.zstdrevlogcompressor(self._module,