Patchwork [07,of,10,V2] statprof: support stacked collection

login
register
mail settings
Submitter Gregory Szorc
Date Aug. 17, 2016, 4:03 p.m.
Message ID <bf247c2f0136d23100b0.1471449825@ubuntu-vm-main>
Download mbox | patch
Permalink /patch/16333/
State Changes Requested
Headers show

Comments

Gregory Szorc - Aug. 17, 2016, 4:03 p.m.
# HG changeset patch
# User Gregory Szorc <gregory.szorc@gmail.com>
# Date 1471449673 25200
#      Wed Aug 17 09:01:13 2016 -0700
# Node ID bf247c2f0136d23100b0098fe152cfabbb511c1b
# Parent  7409c675a2be190605f07277627b095976f56da8
statprof: support stacked collection

Before, statprof theoretically supported starting and stopping
profiling multiple times. But it didn't actually do anything
upon each "stacked" call to start()/stop().

This patch adds state tracking of sorts to start()/stop().

We also have stop() return a data structure with captured
data. This allows consumers to do things like display only the
just-collected data.

I'm not completely convinced this code works as advertised and does
the right things in all situations. But it does appear to get the
results I want when running the statistical profiler during `hg
serve`.
Yuya Nishihara - Sept. 4, 2016, 5:37 a.m.
On Wed, 17 Aug 2016 09:03:45 -0700, Gregory Szorc wrote:
> # HG changeset patch
> # User Gregory Szorc <gregory.szorc@gmail.com>
> # Date 1471449673 25200
> #      Wed Aug 17 09:01:13 2016 -0700
> # Node ID bf247c2f0136d23100b0098fe152cfabbb511c1b
> # Parent  7409c675a2be190605f07277627b095976f56da8
> statprof: support stacked collection

> -def samplerthread(tid):
> +def samplerthread():
>      while not stopthread.is_set():
>          state.accumulate_time(clock())
>  
> -        frame = sys._current_frames()[tid]
> +        frame = sys._current_frames()[state.threadid]
>          state.samples.append(Sample.from_frame(frame, state.accumulated_time))

[snip]

>  def start(mechanism='thread'):
>      '''Install the profiling signal handler, and start profiling.'''
> +    # Store old state if present.
> +    if state.profile_level > 0:
> +        laststate = {
> +            'samples': state.samples,
> +            'tid': state.threadid,
> +            'accumulated_time': state.accumulated_time,
> +        }
> +        _statestack.append(laststate)
> +
> +    state.samples = []
> +    state.accumulated_time = 0.0
> +    frame = inspect.currentframe()
> +    tid = [k for k, f in sys._current_frames().items() if f == frame][0]
> +    state.threadid = tid
> +
>      state.profile_level += 1
>      if state.profile_level == 1:
>          state.last_start_time = clock()
>          rpt = state.remaining_prof_time
>          state.remaining_prof_time = None
>  
>          global lastmechanism
>          lastmechanism = mechanism
>  
>          if mechanism == 'signal':
>              signal.signal(signal.SIGPROF, profile_signal_handler)
>              signal.setitimer(signal.ITIMER_PROF,
>                  rpt or state.sample_interval, 0.0)
>          elif mechanism == 'thread':
> -            frame = inspect.currentframe()
> -            tid = [k for k, f in sys._current_frames().items() if f == frame][0]
> +
>              state.thread = threading.Thread(target=samplerthread,
> -                                 args=(tid,), name="samplerthread")
> +                                            name="samplerthread")
>              state.thread.start()
>  
>  def stop():
>      '''Stop profiling, and uninstall the profiling signal handler.'''
>      state.profile_level -= 1
>      if state.profile_level == 0:
>          if lastmechanism == 'signal':
>              rpt = signal.setitimer(signal.ITIMER_PROF, 0.0, 0.0)
> @@ -317,16 +338,39 @@ def stop():
>              state.thread.join()
>  
>          state.accumulate_time(clock())
>          state.last_start_time = None
>          statprofpath = os.environ.get('STATPROF_DEST')
>          if statprofpath:
>              save_data(statprofpath)
>  
> +        return CollectedData(state.samples, state.accumulated_time,
> +                             state.sample_interval)
> +    elif state.profile_level > 0:
> +        newstate = _statestack.pop()
> +
> +        data = CollectedData(state.samples, state.accumulated_time,
> +                             state.sample_interval)
> +
> +        # Merge the just collected data onto the base state since the base
> +        # state was active while this one was collecting.
> +        newstate['samples'].extend(state.samples)
> +        newstate['threadid'] = newstate['tid']
> +        newstate['accumulated_time'] += state.accumulated_time
> +
> +        # Now make the base state the active state.
> +        state.samples = newstate['samples']
> +        state.threadid = newstate['tid']
> +        state.accumulated_time = newstate['accumulated_time']
> +
> +        return data

I don't think states can form a stack because start/stop will be called by
arbitrary threads. Instead, samplerthread would need to record activities of
all threads of profiling enabled.

But, in either ways, I don't think statprof can show a good result if
more than one threads are actively running and switched at random.

The other patches look generally good to me.
Augie Fackler - Sept. 12, 2016, 1:21 p.m.
On Sun, Sep 04, 2016 at 02:37:45PM +0900, Yuya Nishihara wrote:
> On Wed, 17 Aug 2016 09:03:45 -0700, Gregory Szorc wrote:
> > # HG changeset patch
> > # User Gregory Szorc <gregory.szorc@gmail.com>
> > # Date 1471449673 25200
> > #      Wed Aug 17 09:01:13 2016 -0700
> > # Node ID bf247c2f0136d23100b0098fe152cfabbb511c1b
> > # Parent  7409c675a2be190605f07277627b095976f56da8
> > statprof: support stacked collection

[...]

> > @@ -317,16 +338,39 @@ def stop():
> >              state.thread.join()
> >
> >          state.accumulate_time(clock())
> >          state.last_start_time = None
> >          statprofpath = os.environ.get('STATPROF_DEST')
> >          if statprofpath:
> >              save_data(statprofpath)
> >
> > +        return CollectedData(state.samples, state.accumulated_time,
> > +                             state.sample_interval)
> > +    elif state.profile_level > 0:
> > +        newstate = _statestack.pop()
> > +
> > +        data = CollectedData(state.samples, state.accumulated_time,
> > +                             state.sample_interval)
> > +
> > +        # Merge the just collected data onto the base state since the base
> > +        # state was active while this one was collecting.
> > +        newstate['samples'].extend(state.samples)
> > +        newstate['threadid'] = newstate['tid']
> > +        newstate['accumulated_time'] += state.accumulated_time
> > +
> > +        # Now make the base state the active state.
> > +        state.samples = newstate['samples']
> > +        state.threadid = newstate['tid']
> > +        state.accumulated_time = newstate['accumulated_time']
> > +
> > +        return data
>
> I don't think states can form a stack because start/stop will be called by
> arbitrary threads. Instead, samplerthread would need to record activities of
> all threads of profiling enabled.
>
> But, in either ways, I don't think statprof can show a good result if
> more than one threads are actively running and switched at random.
>
> The other patches look generally good to me.

Agreed. Greg, can you rebase these and remail them?

> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Patch

diff --git a/mercurial/statprof.py b/mercurial/statprof.py
--- a/mercurial/statprof.py
+++ b/mercurial/statprof.py
@@ -167,17 +167,17 @@  class ProfileState(object):
 
     def accumulate_time(self, stop_time):
         self.accumulated_time += stop_time - self.last_start_time
 
     def seconds_per_sample(self):
         return self.accumulated_time / len(self.samples)
 
 state = ProfileState()
-
+_statestack = []
 
 class CodeSite(object):
     cache = {}
 
     __slots__ = ('path', 'lineno', 'function', 'source')
 
     def __init__(self, path, lineno, function):
         self.path = path
@@ -244,69 +244,90 @@  class Sample(object):
 
         while frame:
             stack.append(CodeSite.get(frame.f_code.co_filename, frame.f_lineno,
                                       frame.f_code.co_name))
             frame = frame.f_back
 
         return Sample(stack, time)
 
+class CollectedData(object):
+    """Represents collected data for a sampling interval."""
+    def __init__(self, samples, accumulated_time, sample_interval):
+        self.samples = samples
+        self.accumulated_time = accumulated_time
+        self.sample_interval = sample_interval
+
 ###########################################################################
 ## SIGPROF handler
 
 def profile_signal_handler(signum, frame):
     if state.profile_level > 0:
         state.accumulate_time(clock())
 
         state.samples.append(Sample.from_frame(frame, state.accumulated_time))
 
         signal.setitimer(signal.ITIMER_PROF,
             state.sample_interval, 0.0)
         state.last_start_time = clock()
 
 stopthread = threading.Event()
-def samplerthread(tid):
+def samplerthread():
     while not stopthread.is_set():
         state.accumulate_time(clock())
 
-        frame = sys._current_frames()[tid]
+        frame = sys._current_frames()[state.threadid]
         state.samples.append(Sample.from_frame(frame, state.accumulated_time))
 
         state.last_start_time = clock()
         time.sleep(state.sample_interval)
 
     stopthread.clear()
 
 ###########################################################################
 ## Profiling API
 
 def is_active():
     return state.profile_level > 0
 
 lastmechanism = None
 def start(mechanism='thread'):
     '''Install the profiling signal handler, and start profiling.'''
+    # Store old state if present.
+    if state.profile_level > 0:
+        laststate = {
+            'samples': state.samples,
+            'tid': state.threadid,
+            'accumulated_time': state.accumulated_time,
+        }
+        _statestack.append(laststate)
+
+    state.samples = []
+    state.accumulated_time = 0.0
+    frame = inspect.currentframe()
+    tid = [k for k, f in sys._current_frames().items() if f == frame][0]
+    state.threadid = tid
+
     state.profile_level += 1
     if state.profile_level == 1:
         state.last_start_time = clock()
         rpt = state.remaining_prof_time
         state.remaining_prof_time = None
 
         global lastmechanism
         lastmechanism = mechanism
 
         if mechanism == 'signal':
             signal.signal(signal.SIGPROF, profile_signal_handler)
             signal.setitimer(signal.ITIMER_PROF,
                 rpt or state.sample_interval, 0.0)
         elif mechanism == 'thread':
-            frame = inspect.currentframe()
-            tid = [k for k, f in sys._current_frames().items() if f == frame][0]
+
             state.thread = threading.Thread(target=samplerthread,
-                                 args=(tid,), name="samplerthread")
+                                            name="samplerthread")
             state.thread.start()
 
 def stop():
     '''Stop profiling, and uninstall the profiling signal handler.'''
     state.profile_level -= 1
     if state.profile_level == 0:
         if lastmechanism == 'signal':
             rpt = signal.setitimer(signal.ITIMER_PROF, 0.0, 0.0)
@@ -317,16 +338,39 @@  def stop():
             state.thread.join()
 
         state.accumulate_time(clock())
         state.last_start_time = None
         statprofpath = os.environ.get('STATPROF_DEST')
         if statprofpath:
             save_data(statprofpath)
 
+        return CollectedData(state.samples, state.accumulated_time,
+                             state.sample_interval)
+    elif state.profile_level > 0:
+        newstate = _statestack.pop()
+
+        data = CollectedData(state.samples, state.accumulated_time,
+                             state.sample_interval)
+
+        # Merge the just collected data onto the base state since the base
+        # state was active while this one was collecting.
+        newstate['samples'].extend(state.samples)
+        newstate['threadid'] = newstate['tid']
+        newstate['accumulated_time'] += state.accumulated_time
+
+        # Now make the base state the active state.
+        state.samples = newstate['samples']
+        state.threadid = newstate['tid']
+        state.accumulated_time = newstate['accumulated_time']
+
+        return data
+    else:
+        raise Exception('unbalanced calls to start()/stop()')
+
 def save_data(path):
     with open(path, 'w+') as file:
         file.write(str(state.accumulated_time) + '\n')
         for sample in state.samples:
             time = str(sample.time)
             stack = sample.stack
             sites = ['\1'.join([s.path, str(s.lineno), s.function])
                      for s in stack]