Submitter | Gregory Szorc |
---|---|
Date | Nov. 20, 2016, 10:23 p.m. |
Message ID | <da1caf5b703a641f0167.1479680621@ubuntu-vm-main> |
Download | mbox | patch |
Permalink | /patch/17655/ |
State | Accepted |
Headers | show |
Comments
On Sun, Nov 20, 2016 at 02:23:41PM -0800, Gregory Szorc wrote: > # HG changeset patch > # User Gregory Szorc <gregory.szorc@gmail.com> > # Date 1479678953 28800 > # Sun Nov 20 13:55:53 2016 -0800 > # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 > # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e > httppeer: use compression engine API for decompressing responses > > In preparation for supporting multiple compression formats on the > wire protocol, we need all users of the wire protocol to use > compression engine APIs. > > This commit ports the HTTP wire protocol client to use the > compression engine API. > > The code for handling the HTTPException is a bit hacky. Essentially, > HTTPException could be thrown by any read() from the socket. However, > as part of porting the API, we no longer have a generator wrapping > the socket and we don't have a single place where we can trap the > exception. We solve this by introducing a proxy class that intercepts > read() and converts the exception appropriately. > > In the future, we could introduce a new compression engine API that > supports emitting a generator of decompressed chunks. This would > eliminate the need for the proxy class. As I said when I introduced > the decompressorreader() API, I'm not fond of it and would support > transitioning to something better. This can be done as a follow-up, > preferably once all the code is using the compression engine API and > we have a better idea of the API needs of all the consumers. > > diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py > --- a/mercurial/httppeer.py > +++ b/mercurial/httppeer.py > @@ -12,7 +12,6 @@ import errno > import os > import socket > import tempfile > -import zlib > > from .i18n import _ > from .node import nullid > @@ -30,16 +29,26 @@ httplib = util.httplib > urlerr = util.urlerr > urlreq = util.urlreq > > -def zgenerator(f): > - zd = zlib.decompressobj() > +# FUTURE: consider refactoring this API to use generators. This will > +# require a compression engine API to emit generators. > +def decompressresponse(response, engine): > try: > - for chunk in util.filechunkiter(f): > - while chunk: > - yield zd.decompress(chunk, 2**18) > - chunk = zd.unconsumed_tail > + reader = engine.decompressorreader(response) > except httplib.HTTPException: > raise IOError(None, _('connection ended unexpectedly')) > - yield zd.flush() > + > + # We need to wrap reader.read() so HTTPException on subsequent > + # reads is also converted. > + origread = reader.read > + class readerproxy(reader.__class__): > + def read(self, *args, **kwargs): > + try: > + return origread(*args, **kwargs) nit: I think you could use super(readerproxy, self).read() here (but do this as a follow-up if you agree with the idea) > + except httplib.HTTPException: > + raise IOError(None, _('connection ended unexpectedly')) > + > + reader.__class__ = readerproxy > + return reader > > class httppeer(wireproto.wirepeer): > def __init__(self, ui, path): > @@ -202,7 +211,7 @@ class httppeer(wireproto.wirepeer): > (safeurl, version)) > > if _compressible: > - return util.chunkbuffer(zgenerator(resp)) > + return decompressresponse(resp, util.compengines['zlib']) > > return resp >
On Mon, Nov 21, 2016 at 2:18 PM, Augie Fackler <raf@durin42.com> wrote: > On Sun, Nov 20, 2016 at 02:23:41PM -0800, Gregory Szorc wrote: > > # HG changeset patch > > # User Gregory Szorc <gregory.szorc@gmail.com> > > # Date 1479678953 28800 > > # Sun Nov 20 13:55:53 2016 -0800 > > # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 > > # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e > > httppeer: use compression engine API for decompressing responses > > > > In preparation for supporting multiple compression formats on the > > wire protocol, we need all users of the wire protocol to use > > compression engine APIs. > > > > This commit ports the HTTP wire protocol client to use the > > compression engine API. > > > > The code for handling the HTTPException is a bit hacky. Essentially, > > HTTPException could be thrown by any read() from the socket. However, > > as part of porting the API, we no longer have a generator wrapping > > the socket and we don't have a single place where we can trap the > > exception. We solve this by introducing a proxy class that intercepts > > read() and converts the exception appropriately. > > > > In the future, we could introduce a new compression engine API that > > supports emitting a generator of decompressed chunks. This would > > eliminate the need for the proxy class. As I said when I introduced > > the decompressorreader() API, I'm not fond of it and would support > > transitioning to something better. This can be done as a follow-up, > > preferably once all the code is using the compression engine API and > > we have a better idea of the API needs of all the consumers. > > > > diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py > > --- a/mercurial/httppeer.py > > +++ b/mercurial/httppeer.py > > @@ -12,7 +12,6 @@ import errno > > import os > > import socket > > import tempfile > > -import zlib > > > > from .i18n import _ > > from .node import nullid > > @@ -30,16 +29,26 @@ httplib = util.httplib > > urlerr = util.urlerr > > urlreq = util.urlreq > > > > -def zgenerator(f): > > - zd = zlib.decompressobj() > > +# FUTURE: consider refactoring this API to use generators. This will > > +# require a compression engine API to emit generators. > > +def decompressresponse(response, engine): > > try: > > - for chunk in util.filechunkiter(f): > > - while chunk: > > - yield zd.decompress(chunk, 2**18) > > - chunk = zd.unconsumed_tail > > + reader = engine.decompressorreader(response) > > except httplib.HTTPException: > > raise IOError(None, _('connection ended unexpectedly')) > > - yield zd.flush() > > + > > + # We need to wrap reader.read() so HTTPException on subsequent > > + # reads is also converted. > > + origread = reader.read > > + class readerproxy(reader.__class__): > > + def read(self, *args, **kwargs): > > + try: > > + return origread(*args, **kwargs) > > nit: I think you could use super(readerproxy, self).read() here (but > do this as a follow-up if you agree with the idea) > Fun fact: I did this initially and ran into a wonky failure about super expecting a type not a classobj or some other esoteric error I had never seen before. I think this was due to the built-in file object type not behaving like a real class/type. I could reproduce and add an inline comment on why super isn't used if you want. > > > + except httplib.HTTPException: > > + raise IOError(None, _('connection ended unexpectedly')) > > + > > + reader.__class__ = readerproxy > > + return reader > > > > class httppeer(wireproto.wirepeer): > > def __init__(self, ui, path): > > @@ -202,7 +211,7 @@ class httppeer(wireproto.wirepeer): > > (safeurl, version)) > > > > if _compressible: > > - return util.chunkbuffer(zgenerator(resp)) > > + return decompressresponse(resp, util.compengines['zlib']) > > > > return resp > > >
> On Nov 21, 2016, at 17:27, Gregory Szorc <gregory.szorc@gmail.com> wrote: > > nit: I think you could use super(readerproxy, self).read() here (but > do this as a follow-up if you agree with the idea) > > Fun fact: I did this initially and ran into a wonky failure about super expecting a type not a classobj or some other esoteric error I had never seen before. I think this was due to the built-in file object type not behaving like a real class/type. I could reproduce and add an inline comment on why super isn't used if you want. Yes, please do as a follow-up once this lands (I've already got 1-5 prepped to land).
On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote: > # HG changeset patch > # User Gregory Szorc <gregory.szorc@gmail.com> > # Date 1479678953 28800 > # Sun Nov 20 13:55:53 2016 -0800 > # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 > # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e > httppeer: use compression engine API for decompressing responses [...] > +def decompressresponse(response, engine): > try: > - for chunk in util.filechunkiter(f): > - while chunk: > - yield zd.decompress(chunk, 2**18) > - chunk = zd.unconsumed_tail 65bd4b8e48bd says "decompress stream incrementally to reduce memory usage", but util._zlibengine has no limit for the decompressed size. If the 256kB limit is still valid, we should port it to _zlibengine.
On 11/26/2016 11:19 AM, Yuya Nishihara wrote: > On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote: >> # HG changeset patch >> # User Gregory Szorc <gregory.szorc@gmail.com> >> # Date 1479678953 28800 >> # Sun Nov 20 13:55:53 2016 -0800 >> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 >> # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e >> httppeer: use compression engine API for decompressing responses > > [...] > >> +def decompressresponse(response, engine): >> try: >> - for chunk in util.filechunkiter(f): >> - while chunk: >> - yield zd.decompress(chunk, 2**18) >> - chunk = zd.unconsumed_tail > > 65bd4b8e48bd says "decompress stream incrementally to reduce memory usage", > but util._zlibengine has no limit for the decompressed size. If the 256kB > limit is still valid, we should port it to _zlibengine. It does not looks like this ever got addressed, (I nigh of course be wrong) should we create a ticket to track it? Cheers,
On Tue, Jan 10, 2017 at 8:56 AM, Pierre-Yves David < pierre-yves.david@ens-lyon.org> wrote: > > > On 11/26/2016 11:19 AM, Yuya Nishihara wrote: > >> On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote: >> >>> # HG changeset patch >>> # User Gregory Szorc <gregory.szorc@gmail.com> >>> # Date 1479678953 28800 >>> # Sun Nov 20 13:55:53 2016 -0800 >>> # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 >>> # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e >>> httppeer: use compression engine API for decompressing responses >>> >> >> [...] >> >> +def decompressresponse(response, engine): >>> try: >>> - for chunk in util.filechunkiter(f): >>> - while chunk: >>> - yield zd.decompress(chunk, 2**18) >>> - chunk = zd.unconsumed_tail >>> >> >> 65bd4b8e48bd says "decompress stream incrementally to reduce memory >> usage", >> but util._zlibengine has no limit for the decompressed size. If the 256kB >> limit is still valid, we should port it to _zlibengine. >> > > It does not looks like this ever got addressed, (I nigh of course be > wrong) should we create a ticket to track it? > This was addressed in https://www.mercurial-scm.org/repo/hg/rev/98d7636c4729
On 01/10/2017 07:37 PM, Gregory Szorc wrote: > On Tue, Jan 10, 2017 at 8:56 AM, Pierre-Yves David > <pierre-yves.david@ens-lyon.org <mailto:pierre-yves.david@ens-lyon.org>> > wrote: > > > > On 11/26/2016 11:19 AM, Yuya Nishihara wrote: > > On Sun, 20 Nov 2016 14:23:41 -0800, Gregory Szorc wrote: > > # HG changeset patch > # User Gregory Szorc <gregory.szorc@gmail.com > <mailto:gregory.szorc@gmail.com>> > # Date 1479678953 28800 > # Sun Nov 20 13:55:53 2016 -0800 > # Node ID da1caf5b703a641f0167ece15fdff167a1343ec1 > # Parent 0bef0b8fb9f44ed8568df6cfeabf162aa12b211e > httppeer: use compression engine API for decompressing responses > > > [...] > > +def decompressresponse(response, engine): > try: > - for chunk in util.filechunkiter(f): > - while chunk: > - yield zd.decompress(chunk, 2**18) > - chunk = zd.unconsumed_tail > > > 65bd4b8e48bd says "decompress stream incrementally to reduce > memory usage", > but util._zlibengine has no limit for the decompressed size. If > the 256kB > limit is still valid, we should port it to _zlibengine. > > > It does not looks like this ever got addressed, (I nigh of course be > wrong) should we create a ticket to track it? > > > This was addressed in > https://www.mercurial-scm.org/repo/hg/rev/98d7636c4729 Ha right, I looked at that code before asking, but got its meaning wrong. Sorry for the noise :-/
Patch
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -12,7 +12,6 @@ import errno import os import socket import tempfile -import zlib from .i18n import _ from .node import nullid @@ -30,16 +29,26 @@ httplib = util.httplib urlerr = util.urlerr urlreq = util.urlreq -def zgenerator(f): - zd = zlib.decompressobj() +# FUTURE: consider refactoring this API to use generators. This will +# require a compression engine API to emit generators. +def decompressresponse(response, engine): try: - for chunk in util.filechunkiter(f): - while chunk: - yield zd.decompress(chunk, 2**18) - chunk = zd.unconsumed_tail + reader = engine.decompressorreader(response) except httplib.HTTPException: raise IOError(None, _('connection ended unexpectedly')) - yield zd.flush() + + # We need to wrap reader.read() so HTTPException on subsequent + # reads is also converted. + origread = reader.read + class readerproxy(reader.__class__): + def read(self, *args, **kwargs): + try: + return origread(*args, **kwargs) + except httplib.HTTPException: + raise IOError(None, _('connection ended unexpectedly')) + + reader.__class__ = readerproxy + return reader class httppeer(wireproto.wirepeer): def __init__(self, ui, path): @@ -202,7 +211,7 @@ class httppeer(wireproto.wirepeer): (safeurl, version)) if _compressible: - return util.chunkbuffer(zgenerator(resp)) + return decompressresponse(resp, util.compengines['zlib']) return resp