Submitter | Jun Wu |
---|---|
Date | July 22, 2016, 7:50 p.m. |
Message ID | <74d799c4781183b539ae.1469217048@x1c> |
Download | mbox | patch |
Permalink | /patch/15973/ |
State | Changes Requested |
Headers | show |
Comments
Jun Wu <quark@fb.com> writes: > # HG changeset patch > # User Jun Wu <quark@fb.com> > # Date 1469216267 -3600 > # Fri Jul 22 20:37:47 2016 +0100 > # Node ID 74d799c4781183b539aedd93530f0e5fa06839cc > # Parent d3df009ab1175a6792549b51ae66486dd98f398b > # Available At https://bitbucket.org/quark-zju/hg-draft > # hg pull https://bitbucket.org/quark-zju/hg-draft -r 74d799c47811 > worker: wait worker pid explicitly > > Before this patch, waitforworkers uses os.wait() to collect child workers, and > only wait len(pids) processes. This can have serious issues if other code > spawns new processes and does not reap them: 1. worker.py may get wrong exit > code and kill innocent workers. 2. worker.py may continue without waiting for > all workers to complete. > > This patch fixes the issue by using waitpid to wait worker pid explicitly. I think this is a good fix but Yuya knows more about this than I do.
On Fri, 22 Jul 2016 20:50:48 +0100, Jun Wu wrote: > # HG changeset patch > # User Jun Wu <quark@fb.com> > # Date 1469216267 -3600 > # Fri Jul 22 20:37:47 2016 +0100 > # Node ID 74d799c4781183b539aedd93530f0e5fa06839cc > # Parent d3df009ab1175a6792549b51ae66486dd98f398b > # Available At https://bitbucket.org/quark-zju/hg-draft > # hg pull https://bitbucket.org/quark-zju/hg-draft -r 74d799c47811 > worker: wait worker pid explicitly > > Before this patch, waitforworkers uses os.wait() to collect child workers, and > only wait len(pids) processes. This can have serious issues if other code > spawns new processes and does not reap them: 1. worker.py may get wrong exit > code and kill innocent workers. 2. worker.py may continue without waiting for > all workers to complete. > > This patch fixes the issue by using waitpid to wait worker pid explicitly. > > diff --git a/mercurial/worker.py b/mercurial/worker.py > --- a/mercurial/worker.py > +++ b/mercurial/worker.py > @@ -112,8 +112,8 @@ def _posixworker(ui, func, staticargs, a > if err.errno != errno.ESRCH: > raise > def waitforworkers(): > - for _pid in pids: > - st = _exitstatus(os.wait()[1]) > + for pid in pids: > + st = _exitstatus(os.waitpid(pid, 0)[1]) New implementation can't detect failures ASAP, which is what 9955fc5ee24b tried to solve. https://selenic.com/repo/hg/rev/9955fc5ee24b Somewhat related, maybe we can handle this issue by worker-to-master pipe? https://bz.mercurial-scm.org/show_bug.cgi?id=4929#c9
Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > New implementation can't detect failures ASAP, which is what 9955fc5ee24b > tried to solve. > > https://selenic.com/repo/hg/rev/9955fc5ee24b It seems SIGCHLD handler + waitpid WNOHANG is the solution then. > Somewhat related, maybe we can handle this issue by worker-to-master pipe? > > https://bz.mercurial-scm.org/show_bug.cgi?id=4929#c9 That sounds like a complex solution with a downside: the workers can be blocked if the master process does not read fast enough. Maybe we can just disable stdio buffer or make it flush at '\n'. It does not work for long line exceeding page size tho.
On Sat, 23 Jul 2016 19:34:36 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > New implementation can't detect failures ASAP, which is what 9955fc5ee24b > > tried to solve. > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. or wait until all children collected or ECHILD. I prefer SIGCHLD handler over threading, but that change wouldn't be trivial enough for stable. > > Somewhat related, maybe we can handle this issue by worker-to-master pipe? > > > > https://bz.mercurial-scm.org/show_bug.cgi?id=4929#c9 > > That sounds like a complex solution with a downside: the workers can be > blocked if the master process does not read fast enough. I think writing status/debug messages would be cheaper than the I/O jobs running in workers. Currently the master process does nothing other than yielding progress messages, so the read loop should run fast. https://selenic.com/repo/hg/file/3.9-rc/mercurial/worker.py#l131 > Maybe we can just disable stdio buffer or make it flush at '\n'. It does not > work for long line exceeding page size tho. Also it doesn't work under vanilla commandserver which replaces ui.fout by a pseudo file object.
Excerpts from Yuya Nishihara's message of 2016-07-24 15:32:10 +0900: > but that change wouldn't be trivial enough for stable. Yes. I think so and didn't work on V2. > I think writing status/debug messages would be cheaper than the I/O jobs > running in workers. Currently the master process does nothing other than > yielding progress messages, so the read loop should run fast. I was aware of that pipe. > Also it doesn't work under vanilla commandserver which replaces ui.fout by > a pseudo file object. I think commandserver needs some lock mechanism to protect its I/O. flock is a good choice wherever supported. The problem is not only limited to workers but also any other places using threads / processes.
On Mon, 25 Jul 2016 10:27:17 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-24 15:32:10 +0900: > > Also it doesn't work under vanilla commandserver which replaces ui.fout by > > a pseudo file object. > > I think commandserver needs some lock mechanism to protect its I/O. flock > is a good choice wherever supported. The problem is not only limited to > workers but also any other places using threads / processes. No idea how flock() will work for duplicated fds. Are we going to make a temporary lock file? As long as underlying fwrite() is thread-safe, the issue can be avoided by writing header + data by single write() call. That can't be true for fork(), but IIRC worker.py is the only place we have to care.
Excerpts from Yuya Nishihara's message of 2016-07-25 21:49:35 +0900: > On Mon, 25 Jul 2016 10:27:17 +0100, Jun Wu wrote: > > Excerpts from Yuya Nishihara's message of 2016-07-24 15:32:10 +0900: > > > Also it doesn't work under vanilla commandserver which replaces ui.fout by > > > a pseudo file object. > > > > I think commandserver needs some lock mechanism to protect its I/O. flock > > is a good choice wherever supported. The problem is not only limited to > > workers but also any other places using threads / processes. > > No idea how flock() will work for duplicated fds. Are we going to make a > temporary lock file? TIL flock and fcntl(F_SETLKW) are not the same. The latter works in this case without an extra file. > As long as underlying fwrite() is thread-safe, the issue can be avoided > by writing header + data by single write() call. That can't be true for > fork(), but IIRC worker.py is the only place we have to care. Ideally, workers are threads. But I understand we use processes to workaround Python's GIL. It feels like reinventing stdio because of the GIL. The main thing I dislike about the pipe plan is unnecessary memory copy / CPU usage. If the files being updated are many, their paths can be a lot of bytes. Assuming status lines are less than 4KB, make sure write uses a string ending with "\n" (aka. line buffered, stdbuf -oL) should solve this issue without any locks.
On Mon, 25 Jul 2016 14:45:41 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-25 21:49:35 +0900: > > On Mon, 25 Jul 2016 10:27:17 +0100, Jun Wu wrote: > > > Excerpts from Yuya Nishihara's message of 2016-07-24 15:32:10 +0900: > > > > Also it doesn't work under vanilla commandserver which replaces ui.fout by > > > > a pseudo file object. > > > > > > I think commandserver needs some lock mechanism to protect its I/O. flock > > > is a good choice wherever supported. The problem is not only limited to > > > workers but also any other places using threads / processes. > > > > No idea how flock() will work for duplicated fds. Are we going to make a > > temporary lock file? > > TIL flock and fcntl(F_SETLKW) are not the same. The latter works in this > case without an extra file. That seems getting more complicated and platform-dependent. But thanks for the tip, I had no knowledge about F_GET/SETLK. > > As long as underlying fwrite() is thread-safe, the issue can be avoided > > by writing header + data by single write() call. That can't be true for > > fork(), but IIRC worker.py is the only place we have to care. > > Ideally, workers are threads. But I understand we use processes to > workaround Python's GIL. It feels like reinventing stdio because of the GIL. > > The main thing I dislike about the pipe plan is unnecessary memory copy / > CPU usage. If the files being updated are many, their paths can be a lot of > bytes. IMHO that's okay because they explicitly request a bunch of outputs by --verbose. (I haven't measured how slow that would be, so I might be wrong.) > Assuming status lines are less than 4KB, make sure write uses a string > ending with "\n" (aka. line buffered, stdbuf -oL) should solve this issue > without any locks. and line-buffered output would increase the cost of write() syscalls anyway.
On Sat, 2016-07-23 at 19:34 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > > > New implementation can't detect failures ASAP, which is what 9955fc5ee24b > > tried to solve. > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. Can you take a stab at this variant? We could also have a waitpid thread per child process.. which we already do elsewhere to handle timeouts (see run-tests:Popen4). -- Mathematics is the supreme nostalgia of our time.
Excerpts from Matt Mackall's message of 2016-07-25 16:18:02 -0500: > On Sat, 2016-07-23 at 19:34 +0100, Jun Wu wrote: > > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > > > > > New implementation can't detect failures ASAP, which is what 9955fc5ee24b > > > tried to solve. > > > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. > > Can you take a stab at this variant? Will send the patch after freeze.
On Wed, 2016-07-27 at 11:09 +0100, Jun Wu wrote: > Excerpts from Matt Mackall's message of 2016-07-25 16:18:02 -0500: > > > > On Sat, 2016-07-23 at 19:34 +0100, Jun Wu wrote: > > > > > > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > > > > > > > > > > > New implementation can't detect failures ASAP, which is what > > > > 9955fc5ee24b > > > > tried to solve. > > > > > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > > > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. > > Can you take a stab at this variant? > Will send the patch after freeze. I would say this constitutes a freeze-worthy fix, if you can fit it in.
On Wed, 27 Jul 2016 14:43:17 -0500, Matt Mackall wrote: > On Wed, 2016-07-27 at 11:09 +0100, Jun Wu wrote: > > Excerpts from Matt Mackall's message of 2016-07-25 16:18:02 -0500: > > > On Sat, 2016-07-23 at 19:34 +0100, Jun Wu wrote: > > > > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > > > > New implementation can't detect failures ASAP, which is what > > > > > 9955fc5ee24b > > > > > tried to solve. > > > > > > > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > > > > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. > > > Can you take a stab at this variant? > > Will send the patch after freeze. > > I would say this constitutes a freeze-worthy fix, if you can fit it in. Perhaps the minimal patch would be to wait until all pids are reaped, something like this: while knownpids: try: pid, st = os.wait() except ... # catch ECHILD if pid not in knownpids: continue knownpids.remove(pid)
On Thu, 2016-07-28 at 22:01 +0900, Yuya Nishihara wrote: > On Wed, 27 Jul 2016 14:43:17 -0500, Matt Mackall wrote: > > > > On Wed, 2016-07-27 at 11:09 +0100, Jun Wu wrote: > > > > > > Excerpts from Matt Mackall's message of 2016-07-25 16:18:02 -0500: > > > > > > > > On Sat, 2016-07-23 at 19:34 +0100, Jun Wu wrote: > > > > > > > > > > Excerpts from Yuya Nishihara's message of 2016-07-23 23:39:54 +0900: > > > > > > > > > > > > New implementation can't detect failures ASAP, which is what > > > > > > 9955fc5ee24b > > > > > > tried to solve. > > > > > > > > > > > > https://selenic.com/repo/hg/rev/9955fc5ee24b > > > > > It seems SIGCHLD handler + waitpid WNOHANG is the solution then. > > > > Can you take a stab at this variant? > > > Will send the patch after freeze. > > I would say this constitutes a freeze-worthy fix, if you can fit it in. > Perhaps the minimal patch would be to wait until all pids are reaped, > something > like this: > > while knownpids: > try: > pid, st = os.wait() > except ... > # catch ECHILD > if pid not in knownpids: > continue > knownpids.remove(pid) Sounds great.
I actually considered this before submitting the V1. This may "eat" return values useful for other code. I will send the SIGCHLD patch. Excerpts from Yuya Nishihara's message of 2016-07-28 22:01:14 +0900: > Perhaps the minimal patch would be to wait until all pids are reaped, something > like this: > > while knownpids: > try: > pid, st = os.wait() > except ... > # catch ECHILD > if pid not in knownpids: > continue > knownpids.remove(pid)
On Thu, 28 Jul 2016 17:53:02 +0100, Jun Wu wrote: > I actually considered this before submitting the V1. This may "eat" return > values useful for other code. I will send the SIGCHLD patch. Yep, so my proposal (for default) was to use pipe. for line in fp: if line is progress message: l = line.split(' ', 1) yield int(l[0]), l[1][:-1] elif line is result code: save problem killworkers() waitworkers() But I found forked workers could propagate exceptions to dispatch(), so we can't simply switch to this model.
Patch
diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -112,8 +112,8 @@ def _posixworker(ui, func, staticargs, a if err.errno != errno.ESRCH: raise def waitforworkers(): - for _pid in pids: - st = _exitstatus(os.wait()[1]) + for pid in pids: + st = _exitstatus(os.waitpid(pid, 0)[1]) if st and not problem[0]: problem[0] = st killworkers()