@@ -92,10 +92,11 @@
returns a list of values (same order as <args>)"""
@abc.abstractmethod
- def forwardpayload(self, fp):
- """Read the raw payload and forward to a file.
+ def getpayload(self):
+ """Provide a generator for the raw payload.
- The payload is read in full before the function returns.
+ The caller is responsible for ensuring that the full payload is
+ processed.
"""
@abc.abstractmethod
@@ -91,16 +91,15 @@
args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
return args
- def forwardpayload(self, fp):
+ def getpayload(self):
if r'HTTP_CONTENT_LENGTH' in self._req.env:
length = int(self._req.env[r'HTTP_CONTENT_LENGTH'])
else:
length = int(self._req.env[r'CONTENT_LENGTH'])
# If httppostargs is used, we need to read Content-Length
# minus the amount that was consumed by args.
length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
- for s in util.filechunkiter(self._req, limit=length):
- fp.write(s)
+ return util.filechunkiter(self._req, limit=length)
@contextlib.contextmanager
def mayberedirectstdio(self):
@@ -346,7 +345,7 @@
data[arg] = val
return [data[k] for k in keys]
- def forwardpayload(self, fpout):
+ def getpayload(self):
# We initially send an empty response. This tells the client it is
# OK to start sending data. If a client sees any other response, it
# interprets it as an error.
@@ -359,7 +358,7 @@
# 0\n
count = int(self._fin.readline())
while count:
- fpout.write(self._fin.read(count))
+ yield self._fin.read(count)
count = int(self._fin.readline())
@contextlib.contextmanager
@@ -972,14 +972,29 @@
with proto.mayberedirectstdio() as output:
try:
exchange.check_heads(repo, their_heads, 'preparing changes')
+ cleanup = lambda: None
+ try:
+ payload = proto.getpayload()
+ if repo.ui.configbool('server', 'streamunbundle'):
+ def cleanup():
+ for p in payload:
+ pass
+ fp = util.chunkbuffer(payload)
+ else:
+ # write bundle data to temporary file because it can be big
+ fp, tempname = None, None
+ def cleanup():
+ if fp:
+ fp.close()
+ if tempname:
+ os.unlink(tempname)
+ fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+ fp = os.fdopen(fd, pycompat.sysstr('wb+'))
+ r = 0
+ for p in payload:
+ fp.write(p)
+ fp.seek(0)
- # write bundle data to temporary file because it can be big
- fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
- fp = os.fdopen(fd, pycompat.sysstr('wb+'))
- r = 0
- try:
- proto.forwardpayload(fp)
- fp.seek(0)
gen = exchange.readbundle(repo.ui, fp, None)
if (isinstance(gen, changegroupmod.cg1unpacker)
and not bundle1allowed(repo, 'push')):
@@ -1001,8 +1016,7 @@
return pushres(r, output.getvalue() if output else '')
finally:
- fp.close()
- os.unlink(tempname)
+ cleanup()
except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
# handle non-bundle2 case first
@@ -1792,6 +1792,11 @@
are highly recommended. Partial clones will still be allowed.
(default: False)
+``streamunbundle``
+ When set, servers will apply data sent from the client directly,
+ otherwise it will be written to a temporary file first. This option
+ effectively prevents concurrent pushes.
+
``concurrent-push-mode``
Level of allowed race condition between two pushing clients.
@@ -902,6 +902,9 @@
coreconfigitem('server', 'disablefullbundle',
default=False,
)
+coreconfigitem('server', 'streamunbundle',
+ default=False,
+)
coreconfigitem('server', 'maxhttpheaderlen',
default=1024,
)
@@ -41,7 +41,8 @@
tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
try:
- proto.forwardpayload(tmpfp)
+ for p in proto.getpayload():
+ tmpfp.write(p)
tmpfp._fp.seek(0)
if sha != lfutil.hexsha1(tmpfp._fp):
raise IOError(0, _('largefile contents do not match hash'))