@@ -246,3 +246,6 @@ class UnsupportedBundleSpecification(Exc
class CorruptedState(Exception):
"""error raised when a command is not able to read its state from file"""
+
+class MissingRequiredKey(Exception):
+ """error raised when simple key-value file misses a required key"""
@@ -1571,3 +1571,51 @@ class checkambigatclosing(closewrapbase)
def close(self):
self._origfh.close()
self._checkambig()
+
+class simplekeyvaluefile(object):
+ """A simple file with key=value lines
+
+ Keys must be alphanumerics and start with a letter, values must not
+ contain '\n' characters"""
+
+ # if KEYS is non-empty, read values are validated against it:
+ # each key is a tuple (keyname, required)
+ KEYS = []
+
+ def __init__(self, vfs, path):
+ self.vfs = vfs
+ self.path = path
+
+ def validate(self, d):
+ for key, req in self.KEYS:
+ if req and key not in d:
+ e = "missing a required key: '%s'" % key
+ raise error.MissingRequiredKey(e)
+
+ def read(self):
+ lines = self.vfs.readlines(self.path)
+ try:
+ d = dict(line[:-1].split('=', 1) for line in lines if line)
+ except ValueError as e:
+ raise error.CorruptedState(str(e))
+ self.validate(d)
+ return d
+
+ def write(self, data):
+ """Write key=>value mapping to a file
+ data is a dict. Keys must be alphanumerical and start with a letter.
+ Values must not contain newline characters."""
+ lines = []
+ for k, v in data.items():
+ if not k[0].isalpha():
+ e = "keys must start with a letter in a key-value file"
+ raise error.ProgrammingError(e)
+ if not k.isalnum():
+ e = "invalid key name in a simple key-value file"
+ raise error.ProgrammingError(e)
+ if '\n' in v:
+ e = "invalid value in a simple key-value file"
+ raise error.ProgrammingError(e)
+ lines.append("%s=%s\n" % (k, v))
+ with self.vfs(self.path, mode='wb', atomictemp=True) as fp:
+ fp.write(''.join(lines))
new file mode 100644
@@ -0,0 +1,87 @@
+from __future__ import absolute_import
+
+import unittest
+import silenttestrunner
+
+from mercurial import (
+ error,
+ scmutil,
+)
+
+contents = {}
+
+class fileobj(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ pass
+
+ def write(self, text):
+ contents[self.name] = text
+
+ def read(self):
+ return contents[self.name]
+
+class mockvfs(object):
+ def read(self, path):
+ return fileobj(path).read()
+
+ def readlines(self, path):
+ return fileobj(path).read().split('\n')
+
+ def __call__(self, path, mode, atomictemp):
+ return fileobj(path)
+
+class testsimplekeyvaluefile(unittest.TestCase):
+ def setUp(self):
+ self.vfs = mockvfs()
+
+ def testbasicwriting(self):
+ d = {'key1': 'value1', 'Key2': 'value2'}
+ scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
+ self.assertEqual(sorted(self.vfs.read('kvfile').split('\n')),
+ ['', 'Key2=value2', 'key1=value1'])
+
+ def testinvalidkeys(self):
+ d = {'0key1': 'value1', 'Key2': 'value2'}
+ with self.assertRaisesRegexp(error.ProgrammingError,
+ "keys must start with a letter.*"):
+ scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
+ d = {'key1@': 'value1', 'Key2': 'value2'}
+ with self.assertRaisesRegexp(error.ProgrammingError, "invalid key.*"):
+ scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
+
+ def testinvalidvalues(self):
+ d = {'key1': 'value1', 'Key2': 'value2\n'}
+ with self.assertRaisesRegexp(error.ProgrammingError, "invalid val.*"):
+ scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
+
+ def testrequiredkeys(self):
+ d = {'key1': 'value1', 'Key2': 'value2'}
+ scmutil.simplekeyvaluefile(self.vfs, 'allkeyshere').write(d)
+
+ class kvf(scmutil.simplekeyvaluefile):
+ KEYS = [('key3', False), ('Key2', True)]
+ self.assertEqual(sorted(kvf(self.vfs, 'allkeyshere').read().items()),
+ [('Key2', 'value'), ('key1', 'value')])
+
+ d = {'key1': 'value1', 'Key3': 'value2'}
+ scmutil.simplekeyvaluefile(self.vfs, 'missingkeys').write(d)
+
+ class kvf(scmutil.simplekeyvaluefile):
+ KEYS = [('key3', False), ('Key2', True)]
+ with self.assertRaisesRegexp(error.MissingRequiredKey, "missing a.*"):
+ kvf(self.vfs, 'missingkeys').read()
+
+ def testcorruptedfile(self):
+ contents['badfile'] = 'ababagalamaga\n'
+ with self.assertRaisesRegexp(error.CorruptedState,
+ "dictionary.*element.*"):
+ scmutil.simplekeyvaluefile(self.vfs, 'badfile').read()
+
+if __name__ == "__main__":
+ silenttestrunner.main(__name__)