Patchwork D6475: merge: fix race that could cause wrong size in dirstate

login
register
mail settings
Submitter phabricator
Date June 3, 2019, 1:01 p.m.
Message ID <differential-rev-PHID-DREV-v4i7ofovpabyikthrkc4-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/40303/
State Superseded
Headers show

Comments

phabricator - June 3, 2019, 1:01 p.m.
valentin.gatienbaron created this revision.
Herald added a reviewer: durin42.
Herald added a reviewer: martinvonz.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  The problem is that hg merge/update/etc work the following way:
  
  1. figure out what files to update
  2. apply the update to disk
  3. apply the update to in-memory dirstate
  4. write dirstate
  
  where step3 looks at the filesystem and assumes it sees the result of
  step2. If a file is changed between step2 and step3, step3 will record
  incorrect information in the dirstate.
  
  I avoid this by passing the size step3 needs directly from step2, for
  the common path (not implemented for change/delete conflicts for
  instance). The code is a bit awkward, as there doesn't seem to be a
  standard way in python to return a value at the end of a generator.
  
  I didn't fix the same race for the exec bit for now, because it's less
  likely to be problematic and I had trouble due to the fact that the
  dirstate stores the permissions differently from the manifest (st_mode
  vs '' 'l' 'x'), in combination with tests that pretend that symlinks
  are not supported.
  
  However, I moved the lstat from step3 to step2, which should tighten
  the race window markedly, both for the exec bit and for the mtime.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D6475

AFFECTED FILES
  hgext/largefiles/overrides.py
  hgext/narrow/narrowdirstate.py
  hgext/remotefilelog/__init__.py
  hgext/sparse.py
  mercurial/context.py
  mercurial/dirstate.py
  mercurial/merge.py
  mercurial/narrowspec.py
  mercurial/sparse.py
  mercurial/worker.py
  tests/test-dirstate-race2.t
  tests/test-dirstate.t

CHANGE DETAILS




To: valentin.gatienbaron, durin42, martinvonz, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/tests/test-dirstate.t b/tests/test-dirstate.t
--- a/tests/test-dirstate.t
+++ b/tests/test-dirstate.t
@@ -73,7 +73,7 @@ 
   >   merge,
   > )
   > 
-  > def wraprecordupdates(orig, repo, actions, branchmerge):
+  > def wraprecordupdates(*args):
   >     raise error.Abort("simulated error while recording dirstateupdates")
   > 
   > def reposetup(ui, repo):
diff --git a/tests/test-dirstate-race2.t b/tests/test-dirstate-race2.t
--- a/tests/test-dirstate-race2.t
+++ b/tests/test-dirstate-race2.t
@@ -27,13 +27,11 @@ 
   > EOF
 
 Do an update where file 'a' is changed between hg writing it to disk
-and hg writing the dirstate. It results in a corrupted dirstate, which
-stores the wrong size, and thus hg status shows spuriously modified
-files.
+and hg writing the dirstate. The dirstate is correct nonetheless, and
+so hg status correctly shows a as clean.
 
   $ hg up -r 0 --config extensions.race=$TESTTMP/dirstaterace.py
   1 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg debugdirstate --no-dates
-  n 644          0 (set  |unset)               a (re)
+  n 644          2 (set  |unset)               a (re)
   $ echo a > a; hg status; hg diff
-  M a
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -83,31 +83,36 @@ 
     benefit = linear - (_STARTUP_COST * workers + linear / workers)
     return benefit >= 0.15
 
-def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
+def worker(ui, costperarg, func, staticargs, args, hasretval=False,
+           threadsafe=True):
     '''run a function, possibly in parallel in multiple worker
     processes.
 
     returns a progress iterator
 
     costperarg - cost of a single task
 
-    func - function to run
+    func - function to run. It is expected to return a progress iterator.
 
     staticargs - arguments to pass to every invocation of the function
 
     args - arguments to split into chunks, to pass to individual
     workers
 
+    hasretval - when True, func and the current function return an progress
+    iterator then a list (encoded as an iterator that yield many (False, ..)
+    then a (True, list)). The resulting list is in the natural order.
+
     threadsafe - whether work items are thread safe and can be executed using
     a thread-based worker. Should be disabled for CPU heavy tasks that don't
     release the GIL.
     '''
     enabled = ui.configbool('worker', 'enabled')
     if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
-        return _platformworker(ui, func, staticargs, args)
+        return _platformworker(ui, func, staticargs, args, hasretval)
     return func(*staticargs + (args,))
 
-def _posixworker(ui, func, staticargs, args):
+def _posixworker(ui, func, staticargs, args, hasretval):
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -157,14 +162,16 @@ 
     ui.flush()
     parentpid = os.getpid()
     pipes = []
-    for pargs in partition(args, workers):
+    retvals = []
+    for i, pargs in enumerate(partition(args, workers)):
         # Every worker gets its own pipe to send results on, so we don't have to
         # implement atomic writes larger than PIPE_BUF. Each forked process has
         # its own pipe's descriptors in the local variables, and the parent
         # process has the full list of pipe descriptors (and it doesn't really
         # care what order they're in).
         rfd, wfd = os.pipe()
         pipes.append((rfd, wfd))
+        retvals.append(None)
         # make sure we use os._exit in all worker code paths. otherwise the
         # worker may do some clean-ups which could cause surprises like
         # deadlock. see sshpeer.cleanup for example.
@@ -185,7 +192,7 @@ 
                         os.close(w)
                     os.close(rfd)
                     for result in func(*(staticargs + (pargs,))):
-                        os.write(wfd, util.pickle.dumps(result))
+                        os.write(wfd, util.pickle.dumps((i, result)))
                     return 0
 
                 ret = scmutil.callcatch(ui, workerfunc)
@@ -219,7 +226,11 @@ 
         while openpipes > 0:
             for key, events in selector.select():
                 try:
-                    yield util.pickle.load(key.fileobj)
+                    i, res = util.pickle.load(key.fileobj)
+                    if hasretval and res[0]:
+                        retvals[i] = res[1]
+                    else:
+                        yield res
                 except EOFError:
                     selector.unregister(key.fileobj)
                     key.fileobj.close()
@@ -237,6 +248,8 @@ 
         if status < 0:
             os.kill(os.getpid(), -status)
         sys.exit(status)
+    if hasretval:
+        yield True, sum(retvals, [])
 
 def _posixexitstatus(code):
     '''convert a posix exit status into the same form returned by
@@ -248,7 +261,7 @@ 
     elif os.WIFSIGNALED(code):
         return -os.WTERMSIG(code)
 
-def _windowsworker(ui, func, staticargs, args):
+def _windowsworker(ui, func, staticargs, args, hasretval):
     class Worker(threading.Thread):
         def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
                      **kwargs):
@@ -268,9 +281,9 @@ 
             try:
                 while not self._taskqueue.empty():
                     try:
-                        args = self._taskqueue.get_nowait()
+                        i, args = self._taskqueue.get_nowait()
                         for res in self._func(*self._staticargs + (args,)):
-                            self._resultqueue.put(res)
+                            self._resultqueue.put((i, res))
                             # threading doesn't provide a native way to
                             # interrupt execution. handle it manually at every
                             # iteration.
@@ -305,18 +318,24 @@ 
     workers = _numworkers(ui)
     resultqueue = pycompat.queue.Queue()
     taskqueue = pycompat.queue.Queue()
+    retvals = []
     # partition work to more pieces than workers to minimize the chance
     # of uneven distribution of large tasks between the workers
-    for pargs in partition(args, workers * 20):
+    for pargs in enumerate(partition(args, workers * 20)):
+        retvals.append(None)
         taskqueue.put(pargs)
     for _i in range(workers):
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
     try:
         while len(threads) > 0:
             while not resultqueue.empty():
-                yield resultqueue.get()
+                (i, res) = resultqueue.get()
+                if hasretval and res[0]:
+                    retvals[i] = res[1]
+                else:
+                    yield res
             threads[0].join(0.05)
             finishedthreads = [_t for _t in threads if not _t.is_alive()]
             for t in finishedthreads:
@@ -327,7 +346,13 @@ 
         trykillworkers()
         raise
     while not resultqueue.empty():
-        yield resultqueue.get()
+        (i, res) = resultqueue.get()
+        if hasretval and res[0]:
+            retvals[i] = res[1]
+        else:
+            yield res
+    if hasretval:
+        yield True, sum(retvals, [])
 
 if pycompat.iswindows:
     _platformworker = _windowsworker
diff --git a/mercurial/sparse.py b/mercurial/sparse.py
--- a/mercurial/sparse.py
+++ b/mercurial/sparse.py
@@ -248,7 +248,8 @@ 
 
     typeactions = mergemod.emptyactions()
     typeactions['r'] = actions
-    mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
+    mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False,
+                          wantfiledata=False)
 
     # Fix dirstate
     for file in dropped:
@@ -382,7 +383,7 @@ 
         typeactions = mergemod.emptyactions()
         typeactions['g'] = actions
         mergemod.applyupdates(repo, typeactions, repo[None], repo['.'],
-                              False)
+                              False, wantfiledata=False)
 
         dirstate = repo.dirstate
         for file, flags, msg in actions:
@@ -486,7 +487,8 @@ 
     for f, (m, args, msg) in actions.iteritems():
         typeactions[m].append((f, args, msg))
 
-    mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
+    mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False,
+                          wantfiledata=False)
 
     # Fix dirstate
     for file in added:
diff --git a/mercurial/narrowspec.py b/mercurial/narrowspec.py
--- a/mercurial/narrowspec.py
+++ b/mercurial/narrowspec.py
@@ -259,7 +259,7 @@ 
         if not repo.wvfs.exists(f):
             addgaction((f, (mf.flags(f), False), "narrowspec updated"))
     merge.applyupdates(repo, actions, wctx=repo[None],
-                       mctx=repo['.'], overwrite=False)
+                       mctx=repo['.'], overwrite=False, wantfiledata=False)
 
 def checkworkingcopynarrowspec(repo):
     storespec = repo.svfs.tryread(FILENAME)
diff --git a/mercurial/merge.py b/mercurial/merge.py
--- a/mercurial/merge.py
+++ b/mercurial/merge.py
@@ -10,6 +10,7 @@ 
 import errno
 import hashlib
 import shutil
+import stat
 import struct
 
 from .i18n import _
@@ -683,7 +684,7 @@ 
     def recordactions(self):
         """record remove/add/get actions in the dirstate"""
         branchmerge = self._repo.dirstate.p2() != nullid
-        recordupdates(self._repo, self.actions(), branchmerge)
+        recordupdates(self._repo, self.actions(), branchmerge, None)
 
     def queueremove(self, f):
         """queues a file to be removed from the dirstate
@@ -1464,13 +1465,17 @@ 
         repo.ui.warn(_("current directory was removed\n"
                        "(consider changing to repo root: %s)\n") % repo.root)
 
-def batchget(repo, mctx, wctx, actions):
+def batchget(repo, mctx, wctx, wantfiledata, actions):
     """apply gets to the working directory
 
     mctx is the context to get from
 
-    yields tuples for progress updates
+    Yields arbitrarily many (False, tuple) for progress updates, followed by
+    exactly one (True, filedata). When wantfiledata is false, filedata is an
+    empty list. When wantfiledata is true, filedata[i] is a triple (mode, size,
+    mtime) of the file written for action[i].
     """
+    filedata = []
     verbose = repo.ui.verbose
     fctx = mctx.filectx
     ui = repo.ui
@@ -1494,16 +1499,24 @@ 
                 if repo.wvfs.lexists(conflicting):
                     orig = scmutil.backuppath(ui, repo, conflicting)
                     util.rename(repo.wjoin(conflicting), orig)
-            wctx[f].clearunknown()
+            wfctx = wctx[f]
+            wfctx.clearunknown()
             atomictemp = ui.configbool("experimental", "update.atomic-file")
-            wctx[f].write(fctx(f).data(), flags, backgroundclose=True,
-                          atomictemp=atomictemp)
+            size = wfctx.write(fctx(f).data(), flags,
+                               backgroundclose=True,
+                               atomictemp=atomictemp)
+            if wantfiledata:
+                s = wfctx.lstat()
+                mode = s.st_mode
+                mtime = s[stat.ST_MTIME]
+                filedata.append((mode, size, mtime)) # for dirstate.normal
             if i == 100:
-                yield i, f
+                yield False, (i, f)
                 i = 0
             i += 1
     if i > 0:
-        yield i, f
+        yield False, (i, f)
+    yield True, filedata
 
 def _prefetchfiles(repo, ctx, actions):
     """Invoke ``scmutil.prefetchfiles()`` for the files relevant to the dict
@@ -1550,14 +1563,17 @@ 
                     ACTION_PATH_CONFLICT,
                     ACTION_PATH_CONFLICT_RESOLVE))
 
-def applyupdates(repo, actions, wctx, mctx, overwrite, labels=None):
+def applyupdates(repo, actions, wctx, mctx, overwrite, wantfiledata,
+                 labels=None):
     """apply the merge action list to the working directory
 
     wctx is the working copy context
     mctx is the context to be merged into the working copy
 
-    Return a tuple of counts (updated, merged, removed, unresolved) that
-    describes how many files were affected by the update.
+    Return a tuple of (counts, filedata), where counts is a tuple
+    (updated, merged, removed, unresolved) that describes how many
+    files were affected by the update, and filedata is as described in
+    batchget.
     """
 
     _prefetchfiles(repo, mctx, actions)
@@ -1649,11 +1665,18 @@ 
     # get in parallel.
     threadsafe = repo.ui.configbool('experimental',
                                     'worker.wdir-get-thread-safe')
-    prog = worker.worker(repo.ui, cost, batchget, (repo, mctx, wctx),
+    prog = worker.worker(repo.ui, cost, batchget,
+                         (repo, mctx, wctx, wantfiledata),
                          actions[ACTION_GET],
-                         threadsafe=threadsafe)
-    for i, item in prog:
-        progress.increment(step=i, item=item)
+                         threadsafe=threadsafe,
+                         hasretval=True)
+    getfiledata = []
+    for final, res in prog:
+        if final:
+            getfiledata = res
+        else:
+            i, item = res
+            progress.increment(step=i, item=item)
     updated = len(actions[ACTION_GET])
 
     if [a for a in actions[ACTION_GET] if a[0] == '.hgsubstate']:
@@ -1778,6 +1801,8 @@ 
         mfiles = set(a[0] for a in actions[ACTION_MERGE])
         for k, acts in extraactions.iteritems():
             actions[k].extend(acts)
+            if k == ACTION_GET and wantfiledata:
+                getfiledata.extend([None] * len(acts))
             # Remove these files from actions[ACTION_MERGE] as well. This is
             # important because in recordupdates, files in actions[ACTION_MERGE]
             # are processed after files in other actions, and the merge driver
@@ -1800,9 +1825,10 @@ 
                                  if a[0] in mfiles]
 
     progress.complete()
-    return updateresult(updated, merged, removed, unresolved)
+    assert len(getfiledata) == (len(actions[ACTION_GET]) if wantfiledata else 0)
+    return updateresult(updated, merged, removed, unresolved), getfiledata
 
-def recordupdates(repo, actions, branchmerge):
+def recordupdates(repo, actions, branchmerge, getfiledata):
     "record merge actions to the dirstate"
     # remove (must come first)
     for f, args, msg in actions.get(ACTION_REMOVE, []):
@@ -1846,11 +1872,12 @@ 
         pass
 
     # get
-    for f, args, msg in actions.get(ACTION_GET, []):
+    for i, (f, args, msg) in enumerate(actions.get(ACTION_GET, [])):
         if branchmerge:
             repo.dirstate.otherparent(f)
         else:
-            repo.dirstate.normal(f)
+            parentfiledata = getfiledata[i] if getfiledata else None
+            repo.dirstate.normal(f, parentfiledata=parentfiledata)
 
     # merge
     for f, args, msg in actions.get(ACTION_MERGE, []):
@@ -2169,12 +2196,15 @@ 
                   'fsmonitor enabled; enable fsmonitor to improve performance; '
                   'see "hg help -e fsmonitor")\n'))
 
-        stats = applyupdates(repo, actions, wc, p2, overwrite, labels=labels)
+        updatedirstate = not partial and not wc.isinmemory()
+        wantfiledata = updatedirstate and not branchmerge
+        stats, getfiledata = applyupdates(repo, actions, wc, p2, overwrite,
+                                          wantfiledata, labels=labels)
 
-        if not partial and not wc.isinmemory():
+        if updatedirstate:
             with repo.dirstate.parentchange():
                 repo.setparents(fp1, fp2)
-                recordupdates(repo, actions, branchmerge)
+                recordupdates(repo, actions, branchmerge, getfiledata)
                 # update completed, clear state
                 util.unlink(repo.vfs.join('updatestate'))
 
diff --git a/mercurial/dirstate.py b/mercurial/dirstate.py
--- a/mercurial/dirstate.py
+++ b/mercurial/dirstate.py
@@ -396,12 +396,24 @@ 
         self._updatedfiles.add(f)
         self._map.addfile(f, oldstate, state, mode, size, mtime)
 
-    def normal(self, f):
-        '''Mark a file normal and clean.'''
-        s = os.lstat(self._join(f))
-        mtime = s[stat.ST_MTIME]
-        self._addpath(f, 'n', s.st_mode,
-                      s.st_size & _rangemask, mtime & _rangemask)
+    def normal(self, f, parentfiledata=None):
+        '''Mark a file normal and clean.
+
+        parentfiledata: (mode, size, mtime) of the clean file
+
+        parentfiledata should be computed from memory (for mode,
+        size), as or close as possible from the point where we
+        determined the file was clean, to limit the risk of the
+        file having been changed by an external process between the
+        moment where the file was determined to be clean and now.'''
+        if parentfiledata:
+            (mode, size, mtime) = parentfiledata
+        else:
+            s = os.lstat(self._join(f))
+            mode = s.st_mode
+            size = s.st_size
+            mtime = s[stat.ST_MTIME]
+        self._addpath(f, 'n', mode, size & _rangemask, mtime & _rangemask)
         self._map.copymap.pop(f, None)
         if f in self._map.nonnormalset:
             self._map.nonnormalset.remove(f)
diff --git a/mercurial/context.py b/mercurial/context.py
--- a/mercurial/context.py
+++ b/mercurial/context.py
@@ -1766,6 +1766,8 @@ 
 
     def size(self):
         return self._repo.wvfs.lstat(self._path).st_size
+    def lstat(self):
+        return self._repo.wvfs.lstat(self._path)
     def date(self):
         t, tz = self._changectx.date()
         try:
@@ -1801,9 +1803,9 @@ 
 
     def write(self, data, flags, backgroundclose=False, **kwargs):
         """wraps repo.wwrite"""
-        self._repo.wwrite(self._path, data, flags,
-                          backgroundclose=backgroundclose,
-                          **kwargs)
+        return self._repo.wwrite(self._path, data, flags,
+                                 backgroundclose=backgroundclose,
+                                 **kwargs)
 
     def markcopied(self, src):
         """marks this file a copy of `src`"""
diff --git a/hgext/sparse.py b/hgext/sparse.py
--- a/hgext/sparse.py
+++ b/hgext/sparse.py
@@ -228,16 +228,16 @@ 
     hint = _('include file with `hg debugsparse --include <pattern>` or use ' +
              '`hg add -s <file>` to include file directory while adding')
     for func in editfuncs:
-        def _wrapper(orig, self, *args):
+        def _wrapper(orig, self, *args, **kwargs):
             sparsematch = self._sparsematcher
             if not sparsematch.always():
                 for f in args:
                     if (f is not None and not sparsematch(f) and
                         f not in self):
                         raise error.Abort(_("cannot add '%s' - it is outside "
                                             "the sparse checkout") % f,
                                           hint=hint)
-            return orig(self, *args)
+            return orig(self, *args, **kwargs)
         extensions.wrapfunction(dirstate.dirstate, func, _wrapper)
 
 @command('debugsparse', [
diff --git a/hgext/remotefilelog/__init__.py b/hgext/remotefilelog/__init__.py
--- a/hgext/remotefilelog/__init__.py
+++ b/hgext/remotefilelog/__init__.py
@@ -442,15 +442,17 @@ 
     return s
 
 # prefetch files before update
-def applyupdates(orig, repo, actions, wctx, mctx, overwrite, labels=None):
+def applyupdates(orig, repo, actions, wctx, mctx, overwrite, wantfiledata,
+                 labels=None):
     if isenabled(repo):
         manifest = mctx.manifest()
         files = []
         for f, args, msg in actions['g']:
             files.append((f, hex(manifest[f])))
         # batch fetch the needed files from the server
         repo.fileservice.prefetch(files)
-    return orig(repo, actions, wctx, mctx, overwrite, labels=labels)
+    return orig(repo, actions, wctx, mctx, overwrite, wantfiledata,
+                labels=labels)
 
 # Prefetch merge checkunknownfiles
 def checkunknownfiles(orig, repo, wctx, mctx, force, actions,
diff --git a/hgext/narrow/narrowdirstate.py b/hgext/narrow/narrowdirstate.py
--- a/hgext/narrow/narrowdirstate.py
+++ b/hgext/narrow/narrowdirstate.py
@@ -16,21 +16,21 @@ 
     """Add narrow spec dirstate ignore, block changes outside narrow spec."""
 
     def _editfunc(fn):
-        def _wrapper(self, *args):
+        def _wrapper(self, *args, **kwargs):
             narrowmatch = repo.narrowmatch()
             for f in args:
                 if f is not None and not narrowmatch(f) and f not in self:
                     raise error.Abort(_("cannot track '%s' - it is outside " +
                         "the narrow clone") % f)
-            return fn(self, *args)
+            return fn(self, *args, **kwargs)
         return _wrapper
 
     class narrowdirstate(dirstate.__class__):
         # Prevent adding/editing/copying/deleting files that are outside the
         # sparse checkout
         @_editfunc
-        def normal(self, *args):
-            return super(narrowdirstate, self).normal(*args)
+        def normal(self, *args, **kwargs):
+            return super(narrowdirstate, self).normal(*args, **kwargs)
 
         @_editfunc
         def add(self, *args):
diff --git a/hgext/largefiles/overrides.py b/hgext/largefiles/overrides.py
--- a/hgext/largefiles/overrides.py
+++ b/hgext/largefiles/overrides.py
@@ -515,7 +515,7 @@ 
     return actions, diverge, renamedelete
 
 @eh.wrapfunction(merge, 'recordupdates')
-def mergerecordupdates(orig, repo, actions, branchmerge):
+def mergerecordupdates(orig, repo, actions, branchmerge, getfiledata):
     if 'lfmr' in actions:
         lfdirstate = lfutil.openlfdirstate(repo.ui, repo)
         for lfile, args, msg in actions['lfmr']:
@@ -526,7 +526,7 @@ 
             lfdirstate.add(lfile)
         lfdirstate.write()
 
-    return orig(repo, actions, branchmerge)
+    return orig(repo, actions, branchmerge, getfiledata)
 
 # Override filemerge to prompt the user about how they wish to merge
 # largefiles. This will handle identical edits without prompting the user.