223 lines
6.9 KiB
Python
223 lines
6.9 KiB
Python
|
"""Profiling support for unit and performance tests.
|
||
|
|
||
|
These are special purpose profiling methods which operate
|
||
|
in a more fine-grained way than nose's profiling plugin.
|
||
|
|
||
|
"""
|
||
|
|
||
|
import os, sys
|
||
|
from sqlalchemy.test import config
|
||
|
from sqlalchemy.test.util import function_named, gc_collect
|
||
|
from nose import SkipTest
|
||
|
|
||
|
__all__ = 'profiled', 'function_call_count', 'conditional_call_count'
|
||
|
|
||
|
all_targets = set()
|
||
|
profile_config = { 'targets': set(),
|
||
|
'report': True,
|
||
|
'sort': ('time', 'calls'),
|
||
|
'limit': None }
|
||
|
profiler = None
|
||
|
|
||
|
def profiled(target=None, **target_opts):
|
||
|
"""Optional function profiling.
|
||
|
|
||
|
@profiled('label')
|
||
|
or
|
||
|
@profiled('label', report=True, sort=('calls',), limit=20)
|
||
|
|
||
|
Enables profiling for a function when 'label' is targetted for
|
||
|
profiling. Report options can be supplied, and override the global
|
||
|
configuration and command-line options.
|
||
|
"""
|
||
|
|
||
|
# manual or automatic namespacing by module would remove conflict issues
|
||
|
if target is None:
|
||
|
target = 'anonymous_target'
|
||
|
elif target in all_targets:
|
||
|
print "Warning: redefining profile target '%s'" % target
|
||
|
all_targets.add(target)
|
||
|
|
||
|
filename = "%s.prof" % target
|
||
|
|
||
|
def decorator(fn):
|
||
|
def profiled(*args, **kw):
|
||
|
if (target not in profile_config['targets'] and
|
||
|
not target_opts.get('always', None)):
|
||
|
return fn(*args, **kw)
|
||
|
|
||
|
elapsed, load_stats, result = _profile(
|
||
|
filename, fn, *args, **kw)
|
||
|
|
||
|
report = target_opts.get('report', profile_config['report'])
|
||
|
if report:
|
||
|
sort_ = target_opts.get('sort', profile_config['sort'])
|
||
|
limit = target_opts.get('limit', profile_config['limit'])
|
||
|
print "Profile report for target '%s' (%s)" % (
|
||
|
target, filename)
|
||
|
|
||
|
stats = load_stats()
|
||
|
stats.sort_stats(*sort_)
|
||
|
if limit:
|
||
|
stats.print_stats(limit)
|
||
|
else:
|
||
|
stats.print_stats()
|
||
|
#stats.print_callers()
|
||
|
os.unlink(filename)
|
||
|
return result
|
||
|
return function_named(profiled, fn.__name__)
|
||
|
return decorator
|
||
|
|
||
|
def function_call_count(count=None, versions={}, variance=0.05):
|
||
|
"""Assert a target for a test case's function call count.
|
||
|
|
||
|
count
|
||
|
Optional, general target function call count.
|
||
|
|
||
|
versions
|
||
|
Optional, a dictionary of Python version strings to counts,
|
||
|
for example::
|
||
|
|
||
|
{ '2.5.1': 110,
|
||
|
'2.5': 100,
|
||
|
'2.4': 150 }
|
||
|
|
||
|
The best match for the current running python will be used.
|
||
|
If none match, 'count' will be used as the fallback.
|
||
|
|
||
|
variance
|
||
|
An +/- deviation percentage, defaults to 5%.
|
||
|
"""
|
||
|
|
||
|
# this could easily dump the profile report if --verbose is in effect
|
||
|
|
||
|
version_info = list(sys.version_info)
|
||
|
py_version = '.'.join([str(v) for v in sys.version_info])
|
||
|
try:
|
||
|
from sqlalchemy.cprocessors import to_float
|
||
|
cextension = True
|
||
|
except ImportError:
|
||
|
cextension = False
|
||
|
|
||
|
while version_info:
|
||
|
version = '.'.join([str(v) for v in version_info])
|
||
|
if cextension:
|
||
|
version += "+cextension"
|
||
|
if version in versions:
|
||
|
count = versions[version]
|
||
|
break
|
||
|
version_info.pop()
|
||
|
|
||
|
if count is None:
|
||
|
return lambda fn: fn
|
||
|
|
||
|
def decorator(fn):
|
||
|
def counted(*args, **kw):
|
||
|
try:
|
||
|
filename = "%s.prof" % fn.__name__
|
||
|
|
||
|
elapsed, stat_loader, result = _profile(
|
||
|
filename, fn, *args, **kw)
|
||
|
|
||
|
stats = stat_loader()
|
||
|
calls = stats.total_calls
|
||
|
|
||
|
stats.sort_stats('calls', 'cumulative')
|
||
|
stats.print_stats()
|
||
|
#stats.print_callers()
|
||
|
deviance = int(count * variance)
|
||
|
if (calls < (count - deviance) or
|
||
|
calls > (count + deviance)):
|
||
|
raise AssertionError(
|
||
|
"Function call count %s not within %s%% "
|
||
|
"of expected %s. (Python version %s)" % (
|
||
|
calls, (variance * 100), count, py_version))
|
||
|
|
||
|
return result
|
||
|
finally:
|
||
|
if os.path.exists(filename):
|
||
|
os.unlink(filename)
|
||
|
return function_named(counted, fn.__name__)
|
||
|
return decorator
|
||
|
|
||
|
def conditional_call_count(discriminator, categories):
|
||
|
"""Apply a function call count conditionally at runtime.
|
||
|
|
||
|
Takes two arguments, a callable that returns a key value, and a dict
|
||
|
mapping key values to a tuple of arguments to function_call_count.
|
||
|
|
||
|
The callable is not evaluated until the decorated function is actually
|
||
|
invoked. If the `discriminator` returns a key not present in the
|
||
|
`categories` dictionary, no call count assertion is applied.
|
||
|
|
||
|
Useful for integration tests, where running a named test in isolation may
|
||
|
have a function count penalty not seen in the full suite, due to lazy
|
||
|
initialization in the DB-API, SA, etc.
|
||
|
"""
|
||
|
|
||
|
def decorator(fn):
|
||
|
def at_runtime(*args, **kw):
|
||
|
criteria = categories.get(discriminator(), None)
|
||
|
if criteria is None:
|
||
|
return fn(*args, **kw)
|
||
|
|
||
|
rewrapped = function_call_count(*criteria)(fn)
|
||
|
return rewrapped(*args, **kw)
|
||
|
return function_named(at_runtime, fn.__name__)
|
||
|
return decorator
|
||
|
|
||
|
|
||
|
def _profile(filename, fn, *args, **kw):
|
||
|
global profiler
|
||
|
if not profiler:
|
||
|
if sys.version_info > (2, 5):
|
||
|
try:
|
||
|
import cProfile
|
||
|
profiler = 'cProfile'
|
||
|
except ImportError:
|
||
|
pass
|
||
|
if not profiler:
|
||
|
try:
|
||
|
import hotshot
|
||
|
profiler = 'hotshot'
|
||
|
except ImportError:
|
||
|
profiler = 'skip'
|
||
|
|
||
|
if profiler == 'skip':
|
||
|
raise SkipTest('Profiling not supported on this platform')
|
||
|
elif profiler == 'cProfile':
|
||
|
return _profile_cProfile(filename, fn, *args, **kw)
|
||
|
else:
|
||
|
return _profile_hotshot(filename, fn, *args, **kw)
|
||
|
|
||
|
def _profile_cProfile(filename, fn, *args, **kw):
|
||
|
import cProfile, gc, pstats, time
|
||
|
|
||
|
load_stats = lambda: pstats.Stats(filename)
|
||
|
gc_collect()
|
||
|
|
||
|
began = time.time()
|
||
|
cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
|
||
|
filename=filename)
|
||
|
ended = time.time()
|
||
|
|
||
|
return ended - began, load_stats, locals()['result']
|
||
|
|
||
|
def _profile_hotshot(filename, fn, *args, **kw):
|
||
|
import gc, hotshot, hotshot.stats, time
|
||
|
load_stats = lambda: hotshot.stats.load(filename)
|
||
|
|
||
|
gc_collect()
|
||
|
prof = hotshot.Profile(filename)
|
||
|
began = time.time()
|
||
|
prof.start()
|
||
|
try:
|
||
|
result = fn(*args, **kw)
|
||
|
finally:
|
||
|
prof.stop()
|
||
|
ended = time.time()
|
||
|
prof.close()
|
||
|
|
||
|
return ended - began, load_stats, result
|
||
|
|