Patchwork [04,of,11] httppeer: use compression engine API for decompressing responses

login
register
mail settings
Submitter Gregory Szorc
Date Nov. 20, 2016, 10:23 p.m.
Message ID <da1caf5b703a641f0167.1479680621@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/17655/
State Accepted
Headers show

Comments

Gregory Szorc - Nov. 20, 2016, 10:23 p.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1479678953 28800
#      Sun Nov 20 13:55:53 2016 -0800
# Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
# Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
httppeer: use compression engine API for decompressing responses

In preparation for supporting multiple compression formats on the
wire protocol, we need all users of the wire protocol to use
compression engine APIs.

This commit ports the HTTP wire protocol client to use the
compression engine API.

The code for handling the HTTPException is a bit hacky. Essentially,
HTTPException could be thrown by any read() from the socket. However,
as part of porting the API, we no longer have a generator wrapping
the socket and we don't have a single place where we can trap the
exception. We solve this by introducing a proxy class that intercepts
read() and converts the exception appropriately.

In the future, we could introduce a new compression engine API that
supports emitting a generator of decompressed chunks. This would
eliminate the need for the proxy class. As I said when I introduced
the decompressorreader() API, I'm not fond of it and would support
transitioning to something better. This can be done as a follow-up,
preferably once all the code is using the compression engine API and
we have a better idea of the API needs of all the consumers.
Augie Fackler - Nov. 21, 2016, 10:18 p.m.
On Sun, Nov 20, 2016 at 02:23:41PM -0800, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1479678953 28800
> #      Sun Nov 20 13:55:53 2016 -0800
> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
> # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
> httppeer: use compression engine API for decompressing responses
>
> In preparation for supporting multiple compression formats on the
> wire protocol, we need all users of the wire protocol to use
> compression engine APIs.
>
> This commit ports the HTTP wire protocol client to use the
> compression engine API.
>
> The code for handling the HTTPException is a bit hacky. Essentially,
> HTTPException could be thrown by any read() from the socket. However,
> as part of porting the API, we no longer have a generator wrapping
> the socket and we don't have a single place where we can trap the
> exception. We solve this by introducing a proxy class that intercepts
> read() and converts the exception appropriately.
>
> In the future, we could introduce a new compression engine API that
> supports emitting a generator of decompressed chunks. This would
> eliminate the need for the proxy class. As I said when I introduced
> the decompressorreader() API, I'm not fond of it and would support
> transitioning to something better. This can be done as a follow-up,
> preferably once all the code is using the compression engine API and
> we have a better idea of the API needs of all the consumers.
>
> diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
> --- a/mercurial/httppeer.py
> +++ b/mercurial/httppeer.py
> @@ -12,7 +12,6 @@ import errno
>  import os
>  import socket
>  import tempfile
> -import zlib
>
>  from .i18n import _
>  from .node import nullid
> @@ -30,16 +29,26 @@ httplib = util.httplib
>  urlerr = util.urlerr
>  urlreq = util.urlreq
>
> -def zgenerator(f):
> -    zd = zlib.decompressobj()
> +# FUTURE: consider refactoring this API to use generators. This will
> +# require a compression engine API to emit generators.
> +def decompressresponse(response, engine):
>      try:
> -        for chunk in util.filechunkiter(f):
> -            while chunk:
> -                yield zd.decompress(chunk, 2**18)
> -                chunk = zd.unconsumed_tail
> +        reader = engine.decompressorreader(response)
>      except httplib.HTTPException:
>          raise IOError(None, _('connection ended unexpectedly'))
> -    yield zd.flush()
> +
> +    # We need to wrap reader.read() so HTTPException on subsequent
> +    # reads is also converted.
> +    origread = reader.read
> +    class readerproxy(reader.__class__):
> +        def read(self, *args, **kwargs):
> +            try:
> +                return origread(*args, **kwargs)

nit: I think you could use super(readerproxy, self).read() here (but
do this as a follow-up if you agree with the idea)

> +            except httplib.HTTPException:
> +                raise IOError(None, _('connection ended unexpectedly'))
> +
> +    reader.__class__ = readerproxy
> +    return reader
>
>  class httppeer(wireproto.wirepeer):
>      def __init__(self, ui, path):
> @@ -202,7 +211,7 @@ class httppeer(wireproto.wirepeer):
>                                        (safeurl, version))
>
>          if _compressible:
> -            return util.chunkbuffer(zgenerator(resp))
> +            return decompressresponse(resp, util.compengines['zlib'])
>
>          return resp
>
Gregory Szorc - Nov. 21, 2016, 10:27 p.m.
On Mon, Nov 21, 2016 at 2:18 PM, Augie Fackler <raf@durin42.com> wrote:

> On Sun, Nov 20, 2016 at 02:23:41PM -0800, Gregory Szorc wrote:
> > # HG changeset patch
> > # User Gregory Szorc <gregory.szorc@gmail.com>
> > # Date 1479678953 28800
> > #      Sun Nov 20 13:55:53 2016 -0800
> > # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
> > # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
> > httppeer: use compression engine API for decompressing responses
> >
> > In preparation for supporting multiple compression formats on the
> > wire protocol, we need all users of the wire protocol to use
> > compression engine APIs.
> >
> > This commit ports the HTTP wire protocol client to use the
> > compression engine API.
> >
> > The code for handling the HTTPException is a bit hacky. Essentially,
> > HTTPException could be thrown by any read() from the socket. However,
> > as part of porting the API, we no longer have a generator wrapping
> > the socket and we don't have a single place where we can trap the
> > exception. We solve this by introducing a proxy class that intercepts
> > read() and converts the exception appropriately.
> >
> > In the future, we could introduce a new compression engine API that
> > supports emitting a generator of decompressed chunks. This would
> > eliminate the need for the proxy class. As I said when I introduced
> > the decompressorreader() API, I'm not fond of it and would support
> > transitioning to something better. This can be done as a follow-up,
> > preferably once all the code is using the compression engine API and
> > we have a better idea of the API needs of all the consumers.
> >
> > diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
> > --- a/mercurial/httppeer.py
> > +++ b/mercurial/httppeer.py
> > @@ -12,7 +12,6 @@ import errno
> >  import os
> >  import socket
> >  import tempfile
> > -import zlib
> >
> >  from .i18n import _
> >  from .node import nullid
> > @@ -30,16 +29,26 @@ httplib = util.httplib
> >  urlerr = util.urlerr
> >  urlreq = util.urlreq
> >
> > -def zgenerator(f):
> > -    zd = zlib.decompressobj()
> > +# FUTURE: consider refactoring this API to use generators. This will
> > +# require a compression engine API to emit generators.
> > +def decompressresponse(response, engine):
> >      try:
> > -        for chunk in util.filechunkiter(f):
> > -            while chunk:
> > -                yield zd.decompress(chunk, 2**18)
> > -                chunk = zd.unconsumed_tail
> > +        reader = engine.decompressorreader(response)
> >      except httplib.HTTPException:
> >          raise IOError(None, _('connection ended unexpectedly'))
> > -    yield zd.flush()
> > +
> > +    # We need to wrap reader.read() so HTTPException on subsequent
> > +    # reads is also converted.
> > +    origread = reader.read
> > +    class readerproxy(reader.__class__):
> > +        def read(self, *args, **kwargs):
> > +            try:
> > +                return origread(*args, **kwargs)
>
> nit: I think you could use super(readerproxy, self).read() here (but
> do this as a follow-up if you agree with the idea)
>

Fun fact: I did this initially and ran into a wonky failure about super
expecting a type not a classobj or some other esoteric error I had never
seen before. I think this was due to the built-in file object type not
behaving like a real class/type. I could reproduce and add an inline
comment on why super isn't used if you want.


>
> > +            except httplib.HTTPException:
> > +                raise IOError(None, _('connection ended unexpectedly'))
> > +
> > +    reader.__class__ = readerproxy
> > +    return reader
> >
> >  class httppeer(wireproto.wirepeer):
> >      def __init__(self, ui, path):
> > @@ -202,7 +211,7 @@ class httppeer(wireproto.wirepeer):
> >                                        (safeurl, version))
> >
> >          if _compressible:
> > -            return util.chunkbuffer(zgenerator(resp))
> > +            return decompressresponse(resp, util.compengines['zlib'])
> >
> >          return resp
> >
>
Augie Fackler - Nov. 21, 2016, 10:28 p.m.
> On Nov 21, 2016, at 17:27, Gregory Szorc <gregory.szorc@gmail.com> wrote:
> 
> nit: I think you could use super(readerproxy, self).read() here (but
> do this as a follow-up if you agree with the idea)
> 
> Fun fact: I did this initially and ran into a wonky failure about super expecting a type not a classobj or some other esoteric error I had never seen before. I think this was due to the built-in file object type not behaving like a real class/type. I could reproduce and add an inline comment on why super isn't used if you want.

Yes, please do as a follow-up once this lands (I've already got 1-5 prepped to land).
Yuya Nishihara - Nov. 26, 2016, 10:19 a.m.
On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1479678953 28800
> #      Sun Nov 20 13:55:53 2016 -0800
> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
> # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
> httppeer: use compression engine API for decompressing responses

[...]

> +def decompressresponse(response, engine):
>      try:
> -        for chunk in util.filechunkiter(f):
> -            while chunk:
> -                yield zd.decompress(chunk, 2**18)
> -                chunk = zd.unconsumed_tail

65bd4b8e48bd says "decompress stream incrementally to reduce memory usage",
but util._zlibengine has no limit for the decompressed size. If the 256kB
limit is still valid, we should port it to _zlibengine.
Pierre-Yves David - Jan. 10, 2017, 4:56 p.m.
On 11/26/2016 11:19 AM, Yuya Nishihara wrote:
> On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote:
>> # HG changeset patch
>> # User Gregory Szorc <gregory.szorc@gmail.com>
>> # Date 1479678953 28800
>> #      Sun Nov 20 13:55:53 2016 -0800
>> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
>> # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
>> httppeer: use compression engine API for decompressing responses
>
> [...]
>
>> +def decompressresponse(response, engine):
>>      try:
>> -        for chunk in util.filechunkiter(f):
>> -            while chunk:
>> -                yield zd.decompress(chunk, 2**18)
>> -                chunk = zd.unconsumed_tail
>
> 65bd4b8e48bd says "decompress stream incrementally to reduce memory usage",
> but util._zlibengine has no limit for the decompressed size. If the 256kB
> limit is still valid, we should port it to _zlibengine.

It does not looks like this ever got addressed, (I nigh of course be 
wrong) should we create a ticket to track it?

Cheers,
Gregory Szorc - Jan. 10, 2017, 6:37 p.m.
On Tue, Jan 10, 2017 at 8:56 AM, Pierre-Yves David <
pierre-yves.david@ens-lyon.org> wrote:

>
>
> On 11/26/2016 11:19 AM, Yuya Nishihara wrote:
>
>> On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote:
>>
>>> # HG changeset patch
>>> # User Gregory Szorc <gregory.szorc@gmail.com>
>>> # Date 1479678953 28800
>>> #      Sun Nov 20 13:55:53 2016 -0800
>>> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
>>> # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
>>> httppeer: use compression engine API for decompressing responses
>>>
>>
>> [...]
>>
>> +def decompressresponse(response, engine):
>>>      try:
>>> -        for chunk in util.filechunkiter(f):
>>> -            while chunk:
>>> -                yield zd.decompress(chunk, 2**18)
>>> -                chunk = zd.unconsumed_tail
>>>
>>
>> 65bd4b8e48bd says "decompress stream incrementally to reduce memory
>> usage",
>> but util._zlibengine has no limit for the decompressed size. If the 256kB
>> limit is still valid, we should port it to _zlibengine.
>>
>
> It does not looks like this ever got addressed, (I nigh of course be
> wrong) should we create a ticket to track it?
>

This was addressed in https://www.mercurial-scm.org/repo/hg/rev/98d7636c4729
Pierre-Yves David - Jan. 10, 2017, 6:46 p.m.
On 01/10/2017 07:37 PM, Gregory Szorc wrote:
> On Tue, Jan 10, 2017 at 8:56 AM, Pierre-Yves David
> <pierre-yves.david@ens-lyon.org <mailto:pierre-yves.david@ens-lyon.org>>
> wrote:
>
>
>
>     On 11/26/2016 11:19 AM, Yuya Nishihara wrote:
>
>         On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote:
>
>             # HG changeset patch
>             # User Gregory Szorc <gregory.szorc@gmail.com
>             <mailto:gregory.szorc@gmail.com>>
>             # Date 1479678953 28800
>             #      Sun Nov 20 13:55:53 2016 -0800
>             # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1
>             # Parent  0bef0b8fb9f44ed8568df6cfeabf162aa12b211e
>             httppeer: use compression engine API for decompressing responses
>
>
>         [...]
>
>             +def decompressresponse(response, engine):
>                  try:
>             -        for chunk in util.filechunkiter(f):
>             -            while chunk:
>             -                yield zd.decompress(chunk, 2**18)
>             -                chunk = zd.unconsumed_tail
>
>
>         65bd4b8e48bd says "decompress stream incrementally to reduce
>         memory usage",
>         but util._zlibengine has no limit for the decompressed size. If
>         the 256kB
>         limit is still valid, we should port it to _zlibengine.
>
>
>     It does not looks like this ever got addressed, (I nigh of course be
>     wrong) should we create a ticket to track it?
>
>
> This was addressed in
> https://www.mercurial-scm.org/repo/hg/rev/98d7636c4729

Ha right, I looked at that code before asking, but got its meaning 
wrong. Sorry for the noise :-/

Patch

diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -12,7 +12,6 @@  import errno
 import os
 import socket
 import tempfile
-import zlib
 
 from .i18n import _
 from .node import nullid
@@ -30,16 +29,26 @@  httplib = util.httplib
 urlerr = util.urlerr
 urlreq = util.urlreq
 
-def zgenerator(f):
-    zd = zlib.decompressobj()
+# FUTURE: consider refactoring this API to use generators. This will
+# require a compression engine API to emit generators.
+def decompressresponse(response, engine):
     try:
-        for chunk in util.filechunkiter(f):
-            while chunk:
-                yield zd.decompress(chunk, 2**18)
-                chunk = zd.unconsumed_tail
+        reader = engine.decompressorreader(response)
     except httplib.HTTPException:
         raise IOError(None, _('connection ended unexpectedly'))
-    yield zd.flush()
+
+    # We need to wrap reader.read() so HTTPException on subsequent
+    # reads is also converted.
+    origread = reader.read
+    class readerproxy(reader.__class__):
+        def read(self, *args, **kwargs):
+            try:
+                return origread(*args, **kwargs)
+            except httplib.HTTPException:
+                raise IOError(None, _('connection ended unexpectedly'))
+
+    reader.__class__ = readerproxy
+    return reader
 
 class httppeer(wireproto.wirepeer):
     def __init__(self, ui, path):
@@ -202,7 +211,7 @@  class httppeer(wireproto.wirepeer):
                                       (safeurl, version))
 
         if _compressible:
-            return util.chunkbuffer(zgenerator(resp))
+            return decompressresponse(resp, util.compengines['zlib'])
 
         return resp