Patchwork D8051: worker: Use buffered input from the pickle stream

login
register
mail settings
Submitter phabricator
Date Jan. 30, 2020, 9:29 p.m.
Message ID <differential-rev-PHID-DREV-rltdz5iozlmmjvwqjrdu-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/44795/
State Superseded
Headers show

Comments

phabricator - Jan. 30, 2020, 9:29 p.m.
heftig created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  On Python 3, "pickle.load" will raise an exception ("_pickle.UnpicklingError:
  pickle data was truncated") when it gets a short read, i.e. it receives fewer
  bytes than it requested.
  
  On our build machine, Mercurial seems to frequently hit this problem while
  updating a mozilla-central clone iff it gets scheduled in batch mode. It is easy
  to trigger with:
  
  1. wipe the workdir rm -rf * hg update null
  
    chrt -b 0 hg update default
  
  I've also written the following program, which demonstrates the core problem:
  
    from __future__ import print_function
    
    import io
    import os
    import pickle
    import time
    
    obj = {"a": 1, "b": 2}
    obj_data = pickle.dumps(obj)
    assert len(obj_data) > 10
    
    rfd, wfd = os.pipe()
    
    pid = os.fork()
    if pid == 0:
        os.close(rfd)
    
        for _ in range(4):
            time.sleep(0.5)
            print("First write")
            os.write(wfd, obj_data[:10])
    
            time.sleep(0.5)
            print("Second write")
            os.write(wfd, obj_data[10:])
    
        os._exit(0)
    
    try:
        os.close(wfd)
    
        rfile = os.fdopen(rfd, "rb", 0)
    
        print("Reading")
        while True:
            try:
                obj_copy = pickle.load(rfile)
                assert obj == obj_copy
            except EOFError:
                break
        print("Success")
    finally:
        os.kill(pid, 15)
  
  The program reliably fails with Python 3.8 and succeeds with Python 2.7.
  
  Providing the unpickler with a buffered reader fixes the issue, so let
  "os.fdopen" create one.
  
  https://bugzilla.mozilla.org/show_bug.cgi?id=1604486

REPOSITORY
  rHG Mercurial

BRANCH
  stable

REVISION DETAIL
  https://phab.mercurial-scm.org/D8051

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS




To: heftig, #hg-reviewers
Cc: mercurial-devel
Yuya Nishihara - Feb. 4, 2020, 12:36 p.m.
>      for rfd, wfd in pipes:
>          os.close(wfd)
> -        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
> +        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)

Using buffered I/O can cause a deadlock (until the worker process exits.)
The master process expects EVENT_READ will be asserted (i.e. level-triggered)
if there are more than one readable items, but buffered file won't
since almost all readable items will be moved to its internal buffer.

```
import time
from mercurial import (
    ui as uimod,
    worker,
)

def some_work(n):
    # send back many items at once
    for i in range(10):
        yield (n, i)
    # and don't close() the pipe for a while
    time.sleep(10)

ui = uimod.ui()
ui.setconfig(b'worker', b'numcpus', b'2')
gen = worker._posixworker(ui, some_work, staticargs=(), args=[0, 1],
                          hasretval=False)
for x in gen:
    ui.write(b'%r\n' % (x,))
    ui.flush()
```
phabricator - Feb. 4, 2020, 12:38 p.m.
yuja added a comment.


  >   for rfd, wfd in pipes:
  >       os.close(wfd)
  >
  > - selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
  >
  > +        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
  
  Using buffered I/O can cause a deadlock (until the worker process exits.)
  The master process expects EVENT_READ will be asserted (i.e. level-triggered)
  if there are more than one readable items, but buffered file won't
  since almost all readable items will be moved to its internal buffer.
  
    import time
    from mercurial import (
        ui as uimod,
        worker,
    )
    
    def some_work(n):
        # send back many items at once
        for i in range(10):
            yield (n, i)
        # and don't close() the pipe for a while
        time.sleep(10)
    
    ui = uimod.ui()
    ui.setconfig(b'worker', b'numcpus', b'2')
    gen = worker._posixworker(ui, some_work, staticargs=(), args=[0, 1],
                              hasretval=False)
    for x in gen:
        ui.write(b'%r\n' % (x,))
        ui.flush()

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8051/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8051

To: heftig, #hg-reviewers
Cc: yuja, sheehan, mercurial-devel

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -226,7 +226,7 @@ 
     selector = selectors.DefaultSelector()
     for rfd, wfd in pipes:
         os.close(wfd)
-        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
+        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
 
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)