Patchwork [20,of,21,V2] speedy: persistence for indices using leveldb

login
register
mail settings
Submitter Tomasz Kleczek
Date Dec. 14, 2012, 2:52 a.m.
Message ID <91413197d908e7db75c8.1355453552@dev408.prn1.facebook.com>
Download mbox | patch
Permalink /patch/102/
State Deferred, archived
Headers show

Comments

Tomasz Kleczek - Dec. 14, 2012, 2:52 a.m.
# HG changeset patch
# User Tomasz Kleczek <tkleczek at fb.com>
# Date 1355434831 28800
# Node ID 91413197d908e7db75c89f1dd23f41d37a0d02eb
# Parent  0d2191815306cddea7852060925cc5f3def44ed0
speedy: persistence for indices using leveldb

* Indices no longer reside in memory.
* Each index has its own leveldb instance.
* During server startup indices are updated to the current
  serverrepo tip.
* Previously introduced dict-like interface to leveldb bindings
  is used to transparently fetch and store values to disk.

Recovery mechanism in case of update failure will be added in subsequent
patches.

Patch

diff --git a/hgext/speedy/serialize.py b/hgext/speedy/serialize.py
new file mode 100644
--- /dev/null
+++ b/hgext/speedy/serialize.py
@@ -0,0 +1,28 @@ 
+# Copyright 2012 Facebook
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+"""Contains (de)serialization functions for different objects."""
+
+from mercurial.node import nullid
+
+_nodeidlen = len(nullid)
+
+def serializenodes(nodes):
+    """Concatenate `nodes` into one binary string
+
+    nodes: iterable with node ids (20-byte strings).
+    """
+    return ''.join(nodes)
+
+def deserializenodes(enodes):
+    """The inverse of `serializenodes`."""
+    return [ enodes[i*_nodeidlen:(i+1)*_nodeidlen]
+            for i in xrange(0, len(enodes)/_nodeidlen) ]
+
+serializedate = str
+deserializedate = float
+
+def identity(value):
+    return value
diff --git a/hgext/speedy/server.py b/hgext/speedy/server.py
--- a/hgext/speedy/server.py
+++ b/hgext/speedy/server.py
@@ -9,15 +9,19 @@ 
 """
 
 import collections
+import os
 from mercurial import revset
 from mercurial import encoding
 from mercurial import cmdutil
 from mercurial.i18n import _
 from mercurial import util
 from mercurial import match as matchmod
+from mercurial import scmutil
 import index
 import protocol
 import tcptransport
+import serialize
+import store
 
 cmdtable = {}
 command = cmdutil.command(cmdtable)
@@ -122,17 +126,40 @@ 
         return self._literalpath(matchingfiles)
 
 indicecfg = {
-    'userchgs': index.userchgsentries,
-    'chgdate': index.chgdateentries,
-    'filechgs': index.filechgsentries,
-    'files': index.filesentries,
+    'userchgs': {
+        'makeentries': index.userchgsentries,
+        'serialize': serialize.serializenodes,
+        'deerialize': serialize.deserializenodes,
+        },
+    'chgdate': {
+        'makeentries': index.chgdateentries,
+        'serialize': serialize.serializedate,
+        'deerialize': serialize.deserializedate,
+        },
+    'filechgs': {
+        'makeentries': index.filechgsentries,
+        'serialize': serialize.serializenodes,
+        'deerialize': serialize.deserializenodes,
+        },
+    'files': {
+        'makeentries': index.filesentries,
+        'serialize': serialize.identity,
+        'deerialize': serialize.identity,
+        },
 }
+def makeserver(repo):
+    """Return an initialized metaserver instance.
 
-def makeserver(repo):
-    """Return an initialized metaserver instance."""
-    ctxs = [repo[r] for r in xrange(0, len(repo))]
-    indices = dict([(name, dict(newentries({}, ctxs))) for name, newentries in
-        indicecfg.iteritems()])
+    Updates all indices to the last revision along the way.
+    """
+    opener = scmutil.opener(os.path.join(repo.path, 'hgext/speedyserver'))
+    opener.makedirs()
+    lastrev = len(repo) - 1
+    indices = {}
+    for name, cfg in indicecfg.iteritems():
+        istore = store.indexstore(name, opener, **cfg)
+        istore.update(repo, lastrev)
+        indices[name] = istore.view()
     return metaserver(repo, indices)
 
 @command('metaserve', [], _(''))
diff --git a/hgext/speedy/store.py b/hgext/speedy/store.py
new file mode 100644
--- /dev/null
+++ b/hgext/speedy/store.py
@@ -0,0 +1,59 @@ 
+# Copyright 2012 Facebook
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+import os
+import errno
+from mercurial.node import nullrev
+from mercurial import util
+import leveldbdict
+
+class indexstore(object):
+    """Persistent storage for an index.
+
+    Data is stored on disk in a leveldb instance (each indice has its own
+    instance).
+
+    Each create/update/delete operation also updates the version and stores
+    it in the version file.
+    A version is simply the revision number of the tip of the server
+    repository to which the index was last updated.
+    """
+
+    def __init__(self, name, opener, makeentries, serialize, deerialize):
+        self.name = name
+        self.opener = opener
+        self.makeentries = makeentries
+        self.versionfn = name + '.version'
+        try:
+            self.currentrev = int(self.opener.read(self.versionfn))
+        except IOError, e:
+            if e.errno != errno.ENOENT:
+                raise
+            self.currentrev = nullrev
+            self.opener.write(self.versionfn, str(nullrev))
+        self._dict = leveldbdict.leveldbdict(
+                os.path.join(opener.base, name), serialize, deerialize)
+
+    def update(self, repo, destrev):
+        """Update index data and the version to the `destrev` revision."""
+
+        if self.currentrev > destrev:
+             util.Abort('cannot update to rev %d: current rev is greater (%d)' %
+                    (destrev, self.currentrev))
+        ctxs = [repo[rev] for rev in xrange(self.currentrev + 1, destrev + 1)]
+        entries = self.makeentries(self._dict, ctxs)
+        self._dict.update(entries)
+        self.currentrev = destrev
+        self.opener.write(self.versionfn, str(destrev))
+
+    def clear(self):
+        """Delete all entries from the index."""
+        self.currentrev = nullrev
+        self._dict.clear()
+        self.opener.write(self.versionfn, str(nullrev))
+
+    def view(self):
+        """Return a dict-like object for read access to the index elements."""
+        return self._dict
diff --git a/tests/hghave.py b/tests/hghave.py
--- a/tests/hghave.py
+++ b/tests/hghave.py
@@ -277,6 +277,13 @@ 
     except (ImportError, AttributeError):
         return False
 
+def has_leveldb():
+    try:
+        import leveldb
+        return True
+    except ImportError:
+        return False
+
 checks = {
     "true": (lambda: True, "yak shaving"),
     "false": (lambda: False, "nail clipper"),
@@ -317,4 +324,5 @@ 
     "unix-permissions": (has_unix_permissions, "unix-style permissions"),
     "windows": (has_windows, "Windows"),
     "msys": (has_msys, "Windows with MSYS"),
+    "leveldb": (has_leveldb, "python leveldb bindings"),
 }
diff --git a/tests/test-speedy.t b/tests/test-speedy.t
--- a/tests/test-speedy.t
+++ b/tests/test-speedy.t
@@ -1,3 +1,5 @@ 
+  $ "$TESTDIR/hghave" leveldb || exit 80
+
 Global config file
   $ cat >> $HGRCPATH <<EOF_END
   > [ui]