Patchwork D10605: revlog: introduce a mandatory `_writing` context to update revlog content

login
register
mail settings
Submitter phabricator
Date May 3, 2021, 12:07 p.m.
Message ID <differential-rev-PHID-DREV-kbs5kkf7wgm3fcydwqzl-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/48925/
State Superseded
Headers show

Comments

phabricator - May 3, 2021, 12:07 p.m.
marmoute created this revision.
Herald added a reviewer: indygreg.
Herald added a reviewer: hg-reviewers.
Herald added a subscriber: mercurial-patches.

REVISION SUMMARY
  Before this change, various revlog methods where managing the opening and
  closing of the revlog files manually and passing the file descriptor alors the
  call path. To simplify the tracking of the write operation by a future docket,
  we need something more organised. As a result, we introduce a `revlog._writing`
  context manager that will wrap each revlog update operation. The file
  descriptor are kept in the existing `revlog._writinghandles` parameter that
  was already used by the `addgroup` logic.
  
  All this change is internal to the revlog only, the "public" interface is not
  affected. The `addrevision` and `addgroup` logic are still responsible for
  setup up this context. However this new context give us multiple benefits:
  
  - all writer use a same, unified, logic,
  - this context is programmatically enforced,
  - each write "session" as a clearly identified start and end.
  
  The post-pull sidedata update logic is still doing writing by end and will be
  adjusted in a later changesets.
  
  This change affect the concurrency checker test, because register the state of
  the file in the transaction sooner in `addrevision` (about as early as what
  `addgroup` would do), so the abort is rollbacking the other commit. I don't want
  to weaken the current main logic.

REPOSITORY
  rHG Mercurial

BRANCH
  default

REVISION DETAIL
  https://phab.mercurial-scm.org/D10605

AFFECTED FILES
  mercurial/changelog.py
  mercurial/revlog.py
  tests/test-racy-mutations.t
  tests/test-revlog-raw.py

CHANGE DETAILS




To: marmoute, indygreg, #hg-reviewers
Cc: mercurial-patches, mercurial-devel

Patch

diff --git a/tests/test-revlog-raw.py b/tests/test-revlog-raw.py
--- a/tests/test-revlog-raw.py
+++ b/tests/test-revlog-raw.py
@@ -19,6 +19,32 @@ 
     flagutil,
 )
 
+
+class _NoTransaction(object):
+    """transaction like object to update the nodemap outside a transaction"""
+
+    def __init__(self):
+        self._postclose = {}
+
+    def addpostclose(self, callback_id, callback_func):
+        self._postclose[callback_id] = callback_func
+
+    def registertmp(self, *args, **kwargs):
+        pass
+
+    def addbackup(self, *args, **kwargs):
+        pass
+
+    def add(self, *args, **kwargs):
+        pass
+
+    def addabort(self, *args, **kwargs):
+        pass
+
+    def _report(self, *args):
+        pass
+
+
 # TESTTMP is optional. This makes it convenient to run without run-tests.py
 tvfs = vfs.vfs(encoding.environ.get(b'TESTTMP', b'/tmp'))
 
@@ -201,19 +227,17 @@ 
             text = None
             cachedelta = (deltaparent, rlog.revdiff(deltaparent, r))
         flags = rlog.flags(r)
-        ifh = dfh = None
-        try:
-            ifh = dlog.opener(dlog._indexfile, b'a+')
-            if not dlog._inline:
-                dfh = dlog.opener(dlog._datafile, b'a+')
+        with dlog._writing(_NoTransaction()):
             dlog._addrevision(
-                rlog.node(r), text, tr, r, p1, p2, flags, cachedelta, ifh, dfh
+                rlog.node(r),
+                text,
+                tr,
+                r,
+                p1,
+                p2,
+                flags,
+                cachedelta,
             )
-        finally:
-            if dfh is not None:
-                dfh.close()
-            if ifh is not None:
-                ifh.close()
     return dlog
 
 
diff --git a/tests/test-racy-mutations.t b/tests/test-racy-mutations.t
--- a/tests/test-racy-mutations.t
+++ b/tests/test-racy-mutations.t
@@ -92,7 +92,7 @@ 
   $ hg debugrevlogindex -c
      rev linkrev nodeid       p1           p2
        0       0 222799e2f90b 000000000000 000000000000
-       1       1 6f124f6007a0 222799e2f90b 000000000000
+       1       1 6f124f6007a0 222799e2f90b 000000000000 (missing-correct-output !)
 And, because of transactions, there's none in the manifestlog either.
   $ hg debugrevlogindex -m
      rev linkrev nodeid       p1           p2
diff --git a/mercurial/revlog.py b/mercurial/revlog.py
--- a/mercurial/revlog.py
+++ b/mercurial/revlog.py
@@ -360,6 +360,8 @@ 
 
         # 2-tuple of file handles being used for active writing.
         self._writinghandles = None
+        # prevent nesting of addgroup
+        self._adding_group = None
 
         self._loadindex()
 
@@ -1955,7 +1957,7 @@ 
                 raise error.CensoredNodeError(self.display_id, node, text)
             raise
 
-    def _enforceinlinesize(self, tr, fp=None):
+    def _enforceinlinesize(self, tr):
         """Check if the revlog is too big for inline and convert if so.
 
         This should be called after revisions are added to the revlog. If the
@@ -1975,21 +1977,27 @@ 
         trindex = 0
         tr.add(self._datafile, 0)
 
-        if fp:
+        existing_handles = False
+        if self._writinghandles is not None:
+            existing_handles = True
+            fp = self._writinghandles[0]
             fp.flush()
             fp.close()
             # We can't use the cached file handle after close(). So prevent
             # its usage.
             self._writinghandles = None
 
-        if True:
-            with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh:
+        new_dfh = self._datafp(b'w+')
+        new_dfh.truncate(0)  # drop any potentially existing data
+        try:
+            with self._indexfp(b'r') as read_ifh:
                 for r in self:
-                    dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1])
+                    new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1])
                     if troffset <= self.start(r):
                         trindex = r
-
-            with self._indexfp(b'w') as fp:
+                new_dfh.flush()
+
+            with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp:
                 self._format_flags &= ~FLAG_INLINE_DATA
                 self._inline = False
                 for i in self:
@@ -1999,7 +2007,6 @@ 
                         header = self.index.pack_header(header)
                         e = header + e
                     fp.write(e)
-
                 # the temp file replace the real index when we exit the context
                 # manager
 
@@ -2007,9 +2014,50 @@ 
             nodemaputil.setup_persistent_nodemap(tr, self)
             self._chunkclear()
 
+            if existing_handles:
+                # switched from inline to conventional reopen the index
+                ifh = self._indexfp(b"a+")
+                self._writinghandles = (ifh, new_dfh)
+                new_dfh = None
+        finally:
+            if new_dfh is not None:
+                new_dfh.close()
+
     def _nodeduplicatecallback(self, transaction, node):
         """called when trying to add a node already stored."""
 
+    @contextlib.contextmanager
+    def _writing(self, transaction):
+        if self._writinghandles is not None:
+            yield
+        else:
+            r = len(self)
+            dsize = 0
+            if r:
+                dsize = self.end(r - 1)
+            dfh = None
+            if not self._inline:
+                dfh = self._datafp(b"a+")
+                transaction.add(self._datafile, dsize)
+            try:
+                isize = r * self.index.entry_size
+                ifh = self._indexfp(b"a+")
+                if self._inline:
+                    transaction.add(self._indexfile, dsize + isize)
+                else:
+                    transaction.add(self._indexfile, isize)
+                try:
+                    self._writinghandles = (ifh, dfh)
+                    try:
+                        yield
+                    finally:
+                        self._writinghandles = None
+                finally:
+                    ifh.close()
+            finally:
+                if dfh is not None:
+                    dfh.close()
+
     def addrevision(
         self,
         text,
@@ -2105,11 +2153,7 @@ 
         useful when reusing a revision not stored in this revlog (ex: received
         over wire, or read from an external bundle).
         """
-        dfh = None
-        if not self._inline:
-            dfh = self._datafp(b"a+")
-        ifh = self._indexfp(b"a+")
-        try:
+        with self._writing(transaction):
             return self._addrevision(
                 node,
                 rawtext,
@@ -2119,15 +2163,9 @@ 
                 p2,
                 flags,
                 cachedelta,
-                ifh,
-                dfh,
                 deltacomputer=deltacomputer,
                 sidedata=sidedata,
             )
-        finally:
-            if dfh:
-                dfh.close()
-            ifh.close()
 
     def compress(self, data):
         """Generate a possibly-compressed representation of data."""
@@ -2214,8 +2252,6 @@ 
         p2,
         flags,
         cachedelta,
-        ifh,
-        dfh,
         alwayscache=False,
         deltacomputer=None,
         sidedata=None,
@@ -2244,11 +2280,14 @@ 
             raise error.RevlogError(
                 _(b"%s: attempt to add wdir revision") % self.display_id
             )
+        if self._writinghandles is None:
+            msg = b'adding revision outside `revlog._writing` context'
+            raise error.ProgrammingError(msg)
 
         if self._inline:
-            fh = ifh
+            fh = self._writinghandles[0]
         else:
-            fh = dfh
+            fh = self._writinghandles[1]
 
         btext = [rawtext]
 
@@ -2258,6 +2297,7 @@ 
         offset = self._get_data_offset(prev)
 
         if self._concurrencychecker:
+            ifh, dfh = self._writinghandles
             if self._inline:
                 # offset is "as if" it were in the .d file, so we need to add on
                 # the size of the entry metadata.
@@ -2323,8 +2363,6 @@ 
             entry = header + entry
         self._writeentry(
             transaction,
-            ifh,
-            dfh,
             entry,
             deltainfo.data,
             link,
@@ -2362,9 +2400,7 @@ 
             offset = max(self.end(rev), offset, sidedata_end)
         return offset
 
-    def _writeentry(
-        self, transaction, ifh, dfh, entry, data, link, offset, sidedata
-    ):
+    def _writeentry(self, transaction, entry, data, link, offset, sidedata):
         # Files opened in a+ mode have inconsistent behavior on various
         # platforms. Windows requires that a file positioning call be made
         # when the file handle transitions between reads and writes. See
@@ -2377,6 +2413,10 @@ 
         # Note: This is likely not necessary on Python 3. However, because
         # the file handle is reused for reads and may be seeked there, we need
         # to be careful before changing this.
+        if self._writinghandles is None:
+            msg = b'adding revision outside `revlog._writing` context'
+            raise error.ProgrammingError(msg)
+        ifh, dfh = self._writinghandles
         ifh.seek(0, os.SEEK_END)
         if dfh:
             dfh.seek(0, os.SEEK_END)
@@ -2399,7 +2439,7 @@ 
             ifh.write(data[1])
             if sidedata:
                 ifh.write(sidedata)
-            self._enforceinlinesize(transaction, ifh)
+            self._enforceinlinesize(transaction)
         nodemaputil.setup_persistent_nodemap(transaction, self)
 
     def addgroup(
@@ -2422,28 +2462,13 @@ 
         this revlog and the node that was added.
         """
 
-        if self._writinghandles:
+        if self._adding_group:
             raise error.ProgrammingError(b'cannot nest addgroup() calls')
 
-        r = len(self)
-        end = 0
-        if r:
-            end = self.end(r - 1)
-        ifh = self._indexfp(b"a+")
-        isize = r * self.index.entry_size
-        if self._inline:
-            transaction.add(self._indexfile, end + isize)
-            dfh = None
-        else:
-            transaction.add(self._indexfile, isize)
-            transaction.add(self._datafile, end)
-            dfh = self._datafp(b"a+")
-
-        self._writinghandles = (ifh, dfh)
+        self._adding_group = True
         empty = True
-
         try:
-            if True:
+            with self._writing(transaction):
                 deltacomputer = deltautil.deltacomputer(self)
                 # loop through our set of deltas
                 for data in deltas:
@@ -2514,8 +2539,6 @@ 
                         p2,
                         flags,
                         (baserev, delta),
-                        ifh,
-                        dfh,
                         alwayscache=alwayscache,
                         deltacomputer=deltacomputer,
                         sidedata=sidedata,
@@ -2524,20 +2547,8 @@ 
                     if addrevisioncb:
                         addrevisioncb(self, rev)
                     empty = False
-
-                    if not dfh and not self._inline:
-                        # addrevision switched from inline to conventional
-                        # reopen the index
-                        ifh.close()
-                        dfh = self._datafp(b"a+")
-                        ifh = self._indexfp(b"a+")
-                        self._writinghandles = (ifh, dfh)
         finally:
-            self._writinghandles = None
-
-            if dfh:
-                dfh.close()
-            ifh.close()
+            self._adding_group = False
         return not empty
 
     def iscensored(self, rev):
@@ -2868,13 +2879,7 @@ 
                     )
                     flags = flags | new_flags[0] & ~new_flags[1]
 
-                ifh = destrevlog.opener(
-                    destrevlog._indexfile, b'a+', checkambig=False
-                )
-                dfh = None
-                if not destrevlog._inline:
-                    dfh = destrevlog.opener(destrevlog._datafile, b'a+')
-                try:
+                with destrevlog._writing(tr):
                     destrevlog._addrevision(
                         node,
                         rawtext,
@@ -2884,15 +2889,9 @@ 
                         p2,
                         flags,
                         cachedelta,
-                        ifh,
-                        dfh,
                         deltacomputer=deltacomputer,
                         sidedata=sidedata,
                     )
-                finally:
-                    if dfh:
-                        dfh.close()
-                    ifh.close()
 
             if addrevisioncb:
                 addrevisioncb(self, rev, node)
diff --git a/mercurial/changelog.py b/mercurial/changelog.py
--- a/mercurial/changelog.py
+++ b/mercurial/changelog.py
@@ -506,9 +506,9 @@ 
 
         return False
 
-    def _enforceinlinesize(self, tr, fp=None):
+    def _enforceinlinesize(self, tr):
         if not self._delayed:
-            revlog.revlog._enforceinlinesize(self, tr, fp)
+            revlog.revlog._enforceinlinesize(self, tr)
 
     def read(self, nodeorrev):
         """Obtain data from a parsed changelog revision.