Patchwork largefiles: use multiple threads for fetching largefiles remotely

login
register
mail settings
Submitter Mads Kiilerich
Date Oct. 10, 2014, 12:59 a.m.
Message ID <483463c1d99ba5e5979b.1412902793@ssl.google-analytics.com>
Download mbox | patch
Permalink /patch/6173/
State Changes Requested
Headers show

Comments

Mads Kiilerich - Oct. 10, 2014, 12:59 a.m.
# HG changeset patch
# User Mads Kiilerich <madski@unity3d.com>
# Date 1412902786 -7200
#      Fri Oct 10 02:59:46 2014 +0200
# Node ID 483463c1d99ba5e5979b756fc3d1255f0a7bd854
# Parent  a1eb21f5caea4366310e32aa85248791d5bbfa0c
largefiles: use multiple threads for fetching largefiles remotely

Largefiles are currently fetched with one request per file. That adds a
constant overhead per file that gives bad network utilization.

To mitigate that, run multiple worker threads when fetching largefiles remotely.
The default is 2 processes, but it can be tweaked with the undocumented config
setting largefiles._remotegetthreads.

Some numbers with a slow server and 50 small files:
  1 thread  36 s
  2 threads 20 s
  3 threads 15 s
  4 threads 12 s
Siddharth Agarwal - Oct. 10, 2014, 1:10 a.m.
On 10/09/2014 05:59 PM, Mads Kiilerich wrote:
> # HG changeset patch
> # User Mads Kiilerich <madski@unity3d.com>
> # Date 1412902786 -7200
> #      Fri Oct 10 02:59:46 2014 +0200
> # Node ID 483463c1d99ba5e5979b756fc3d1255f0a7bd854
> # Parent  a1eb21f5caea4366310e32aa85248791d5bbfa0c
> largefiles: use multiple threads for fetching largefiles remotely
>
> Largefiles are currently fetched with one request per file. That adds a
> constant overhead per file that gives bad network utilization.
>
> To mitigate that, run multiple worker threads when fetching largefiles remotely.
> The default is 2 processes, but it can be tweaked with the undocumented config
> setting largefiles._remotegetthreads.

Why is this undocumented?

>
> Some numbers with a slow server and 50 small files:
>    1 thread  36 s
>    2 threads 20 s
>    3 threads 15 s
>    4 threads 12 s
>
> diff --git a/hgext/largefiles/basestore.py b/hgext/largefiles/basestore.py
> --- a/hgext/largefiles/basestore.py
> +++ b/hgext/largefiles/basestore.py
> @@ -8,7 +8,7 @@
>   
>   '''base class for store implementations and store-related utility code'''
>   
> -import re
> +import re, threading
>   
>   from mercurial import util, node, hg
>   from mercurial.i18n import _
> @@ -37,6 +37,7 @@ class basestore(object):
>           self.ui = ui
>           self.repo = repo
>           self.url = url
> +        self.threads = 0
>   
>       def put(self, source, hash):
>           '''Put source file into the store so it can be retrieved by hash.'''
> @@ -60,24 +61,43 @@ class basestore(object):
>           missing = []
>           ui = self.ui
>   
> -        at = 0
>           available = self.exists(set(hash for (_filename, hash) in files))
> -        for filename, hash in files:
> -            ui.progress(_('getting largefiles'), at, unit='lfile',
> -                total=len(files))
> -            at += 1
> -            ui.note(_('getting %s:%s\n') % (filename, hash))
> +        tasks = list(enumerate(reversed(files)))
>   
> -            if not available.get(hash):
> -                ui.warn(_('%s: largefile %s not available from %s\n')
> -                        % (filename, hash, util.hidepassword(self.url)))
> -                missing.append(filename)
> -                continue
> +        def worker():
> +            while True:
> +                try:
> +                    task = tasks.pop()
> +                except IndexError:
> +                    return
> +                at, (filename, hash) = task
> +                ui.progress(_('getting largefiles'), at, unit='lfile',
> +                    total=len(files))
> +                ui.note(_('getting %s:%s\n') % (filename, hash))
>   
> -            if self._gethash(filename, hash):
> -                success.append((filename, hash))
> -            else:
> -                missing.append(filename)
> +                if available.get(hash):
> +                    if self._gethash(filename, hash):
> +                        success.append((filename, hash))
> +                    else:
> +                        missing.append(filename)
> +                else:
> +                    ui.warn(_('%s: largefile %s not available from %s\n')
> +                            % (filename, hash, util.hidepassword(self.url)))
> +                    missing.append(filename)
> +
> +        if self.threads > 1:
> +            running = []
> +            for i in range(self.threads):
> +                t = threading.Thread(target=worker)
> +                t.setDaemon(True)
> +                t.start()
> +                running.append(t)
> +
> +            for t in running:
> +                while t.isAlive():
> +                    t.join(0.1)
> +        else:
> +            worker()
>   
>           ui.progress(_('getting largefiles'), None)
>           return (success, missing)
> diff --git a/hgext/largefiles/remotestore.py b/hgext/largefiles/remotestore.py
> --- a/hgext/largefiles/remotestore.py
> +++ b/hgext/largefiles/remotestore.py
> @@ -18,6 +18,7 @@ class remotestore(basestore.basestore):
>       '''a largefile store accessed over a network'''
>       def __init__(self, ui, repo, url):
>           super(remotestore, self).__init__(ui, repo, url)
> +        self.threads = ui.configint(lfutil.longname, '_remotegetthreads', 2)
>   
>       def put(self, source, hash):
>           if self.sendfile(source, hash):
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@selenic.com
> http://selenic.com/mailman/listinfo/mercurial-devel
Mads Kiilerich - Oct. 10, 2014, 1:41 a.m.
On 10/10/2014 03:10 AM, Siddharth Agarwal wrote:
> On 10/09/2014 05:59 PM, Mads Kiilerich wrote:
>> # HG changeset patch
>> # User Mads Kiilerich <madski@unity3d.com>
>> # Date 1412902786 -7200
>> #      Fri Oct 10 02:59:46 2014 +0200
>> # Node ID 483463c1d99ba5e5979b756fc3d1255f0a7bd854
>> # Parent  a1eb21f5caea4366310e32aa85248791d5bbfa0c
>> largefiles: use multiple threads for fetching largefiles remotely
>>
>> Largefiles are currently fetched with one request per file. That adds a
>> constant overhead per file that gives bad network utilization.
>>
>> To mitigate that, run multiple worker threads when fetching 
>> largefiles remotely.
>> The default is 2 processes, but it can be tweaked with the 
>> undocumented config
>> setting largefiles._remotegetthreads.
>
> Why is this undocumented?

Because I want to keep the documentation short and concise. We should 
just pick a number/algorithm that works for everybody, and nobody should 
care about what it is or tweaking it. But I like to have a workaround, 
just in case someone for some reason should want another number or 
disable this feature.

/Mads
Siddharth Agarwal - Oct. 10, 2014, 1:45 a.m.
On 10/09/2014 06:41 PM, Mads Kiilerich wrote:
> On 10/10/2014 03:10 AM, Siddharth Agarwal wrote:
>> On 10/09/2014 05:59 PM, Mads Kiilerich wrote:
>>> # HG changeset patch
>>> # User Mads Kiilerich <madski@unity3d.com>
>>> # Date 1412902786 -7200
>>> #      Fri Oct 10 02:59:46 2014 +0200
>>> # Node ID 483463c1d99ba5e5979b756fc3d1255f0a7bd854
>>> # Parent  a1eb21f5caea4366310e32aa85248791d5bbfa0c
>>> largefiles: use multiple threads for fetching largefiles remotely
>>>
>>> Largefiles are currently fetched with one request per file. That adds a
>>> constant overhead per file that gives bad network utilization.
>>>
>>> To mitigate that, run multiple worker threads when fetching 
>>> largefiles remotely.
>>> The default is 2 processes, but it can be tweaked with the 
>>> undocumented config
>>> setting largefiles._remotegetthreads.
>>
>> Why is this undocumented?
>
> Because I want to keep the documentation short and concise. We should 
> just pick a number/algorithm that works for everybody, and nobody 
> should care about what it is or tweaking it. But I like to have a 
> workaround, just in case someone for some reason should want another 
> number or disable this feature.
>
> /Mads

I think we should document and make this an explicit knob for sysadmins 
to set to get higher performance. For example, people on long fat pipes 
will probably want higher concurrency. TCP receive windows within 
kernels are tunable for this reason, and this should be too.
Matt Mackall - Oct. 14, 2014, 7:45 p.m.
On Fri, 2014-10-10 at 02:59 +0200, Mads Kiilerich wrote:
> # HG changeset patch
> # User Mads Kiilerich <madski@unity3d.com>
> # Date 1412902786 -7200
> #      Fri Oct 10 02:59:46 2014 +0200
> # Node ID 483463c1d99ba5e5979b756fc3d1255f0a7bd854
> # Parent  a1eb21f5caea4366310e32aa85248791d5bbfa0c
> largefiles: use multiple threads for fetching largefiles remotely
> 
> Largefiles are currently fetched with one request per file. That adds a
> constant overhead per file that gives bad network utilization.

By constant overhead, you mean round-trip and connection setup time? If
so, wouldn't that be better mitigated with some form of pipelining or
batching? How much of this is Mercurial startup time on the server?
Does this imply multiple SSH connections too?

For files that are _actually large_, streaming multiple files to/from
spinning disk simultaneously is suboptimal because it creates an
interleaved I/O pattern. On the write side, you might get interleaved
storage too, which persists.

It's also kind of frowned on to use multiple TCP streams for data
transfer as it defeats TCP's fairness model, so on-by-default might not
be the right answer.

Patch

diff --git a/hgext/largefiles/basestore.py b/hgext/largefiles/basestore.py
--- a/hgext/largefiles/basestore.py
+++ b/hgext/largefiles/basestore.py
@@ -8,7 +8,7 @@ 
 
 '''base class for store implementations and store-related utility code'''
 
-import re
+import re, threading
 
 from mercurial import util, node, hg
 from mercurial.i18n import _
@@ -37,6 +37,7 @@  class basestore(object):
         self.ui = ui
         self.repo = repo
         self.url = url
+        self.threads = 0
 
     def put(self, source, hash):
         '''Put source file into the store so it can be retrieved by hash.'''
@@ -60,24 +61,43 @@  class basestore(object):
         missing = []
         ui = self.ui
 
-        at = 0
         available = self.exists(set(hash for (_filename, hash) in files))
-        for filename, hash in files:
-            ui.progress(_('getting largefiles'), at, unit='lfile',
-                total=len(files))
-            at += 1
-            ui.note(_('getting %s:%s\n') % (filename, hash))
+        tasks = list(enumerate(reversed(files)))
 
-            if not available.get(hash):
-                ui.warn(_('%s: largefile %s not available from %s\n')
-                        % (filename, hash, util.hidepassword(self.url)))
-                missing.append(filename)
-                continue
+        def worker():
+            while True:
+                try:
+                    task = tasks.pop()
+                except IndexError:
+                    return
+                at, (filename, hash) = task
+                ui.progress(_('getting largefiles'), at, unit='lfile',
+                    total=len(files))
+                ui.note(_('getting %s:%s\n') % (filename, hash))
 
-            if self._gethash(filename, hash):
-                success.append((filename, hash))
-            else:
-                missing.append(filename)
+                if available.get(hash):
+                    if self._gethash(filename, hash):
+                        success.append((filename, hash))
+                    else:
+                        missing.append(filename)
+                else:
+                    ui.warn(_('%s: largefile %s not available from %s\n')
+                            % (filename, hash, util.hidepassword(self.url)))
+                    missing.append(filename)
+
+        if self.threads > 1:
+            running = []
+            for i in range(self.threads):
+                t = threading.Thread(target=worker)
+                t.setDaemon(True)
+                t.start()
+                running.append(t)
+
+            for t in running:
+                while t.isAlive():
+                    t.join(0.1)
+        else:
+            worker()
 
         ui.progress(_('getting largefiles'), None)
         return (success, missing)
diff --git a/hgext/largefiles/remotestore.py b/hgext/largefiles/remotestore.py
--- a/hgext/largefiles/remotestore.py
+++ b/hgext/largefiles/remotestore.py
@@ -18,6 +18,7 @@  class remotestore(basestore.basestore):
     '''a largefile store accessed over a network'''
     def __init__(self, ui, repo, url):
         super(remotestore, self).__init__(ui, repo, url)
+        self.threads = ui.configint(lfutil.longname, '_remotegetthreads', 2)
 
     def put(self, source, hash):
         if self.sendfile(source, hash):