"""Profiling support for unit and performance tests. These are special purpose profiling methods which operate in a more fine-grained way than nose's profiling plugin. """ import os, sys from sqlalchemy.test import config from sqlalchemy.test.util import function_named, gc_collect from nose import SkipTest __all__ = 'profiled', 'function_call_count', 'conditional_call_count' all_targets = set() profile_config = { 'targets': set(), 'report': True, 'sort': ('time', 'calls'), 'limit': None } profiler = None def profiled(target=None, **target_opts): """Optional function profiling. @profiled('label') or @profiled('label', report=True, sort=('calls',), limit=20) Enables profiling for a function when 'label' is targetted for profiling. Report options can be supplied, and override the global configuration and command-line options. """ # manual or automatic namespacing by module would remove conflict issues if target is None: target = 'anonymous_target' elif target in all_targets: print "Warning: redefining profile target '%s'" % target all_targets.add(target) filename = "%s.prof" % target def decorator(fn): def profiled(*args, **kw): if (target not in profile_config['targets'] and not target_opts.get('always', None)): return fn(*args, **kw) elapsed, load_stats, result = _profile( filename, fn, *args, **kw) report = target_opts.get('report', profile_config['report']) if report: sort_ = target_opts.get('sort', profile_config['sort']) limit = target_opts.get('limit', profile_config['limit']) print "Profile report for target '%s' (%s)" % ( target, filename) stats = load_stats() stats.sort_stats(*sort_) if limit: stats.print_stats(limit) else: stats.print_stats() #stats.print_callers() os.unlink(filename) return result return function_named(profiled, fn.__name__) return decorator def function_call_count(count=None, versions={}, variance=0.05): """Assert a target for a test case's function call count. count Optional, general target function call count. versions Optional, a dictionary of Python version strings to counts, for example:: { '2.5.1': 110, '2.5': 100, '2.4': 150 } The best match for the current running python will be used. If none match, 'count' will be used as the fallback. variance An +/- deviation percentage, defaults to 5%. """ # this could easily dump the profile report if --verbose is in effect version_info = list(sys.version_info) py_version = '.'.join([str(v) for v in sys.version_info]) try: from sqlalchemy.cprocessors import to_float cextension = True except ImportError: cextension = False while version_info: version = '.'.join([str(v) for v in version_info]) if cextension: version += "+cextension" if version in versions: count = versions[version] break version_info.pop() if count is None: return lambda fn: fn def decorator(fn): def counted(*args, **kw): try: filename = "%s.prof" % fn.__name__ elapsed, stat_loader, result = _profile( filename, fn, *args, **kw) stats = stat_loader() calls = stats.total_calls stats.sort_stats('calls', 'cumulative') stats.print_stats() #stats.print_callers() deviance = int(count * variance) if (calls < (count - deviance) or calls > (count + deviance)): raise AssertionError( "Function call count %s not within %s%% " "of expected %s. (Python version %s)" % ( calls, (variance * 100), count, py_version)) return result finally: if os.path.exists(filename): os.unlink(filename) return function_named(counted, fn.__name__) return decorator def conditional_call_count(discriminator, categories): """Apply a function call count conditionally at runtime. Takes two arguments, a callable that returns a key value, and a dict mapping key values to a tuple of arguments to function_call_count. The callable is not evaluated until the decorated function is actually invoked. If the `discriminator` returns a key not present in the `categories` dictionary, no call count assertion is applied. Useful for integration tests, where running a named test in isolation may have a function count penalty not seen in the full suite, due to lazy initialization in the DB-API, SA, etc. """ def decorator(fn): def at_runtime(*args, **kw): criteria = categories.get(discriminator(), None) if criteria is None: return fn(*args, **kw) rewrapped = function_call_count(*criteria)(fn) return rewrapped(*args, **kw) return function_named(at_runtime, fn.__name__) return decorator def _profile(filename, fn, *args, **kw): global profiler if not profiler: if sys.version_info > (2, 5): try: import cProfile profiler = 'cProfile' except ImportError: pass if not profiler: try: import hotshot profiler = 'hotshot' except ImportError: profiler = 'skip' if profiler == 'skip': raise SkipTest('Profiling not supported on this platform') elif profiler == 'cProfile': return _profile_cProfile(filename, fn, *args, **kw) else: return _profile_hotshot(filename, fn, *args, **kw) def _profile_cProfile(filename, fn, *args, **kw): import cProfile, gc, pstats, time load_stats = lambda: pstats.Stats(filename) gc_collect() began = time.time() cProfile.runctx('result = fn(*args, **kw)', globals(), locals(), filename=filename) ended = time.time() return ended - began, load_stats, locals()['result'] def _profile_hotshot(filename, fn, *args, **kw): import gc, hotshot, hotshot.stats, time load_stats = lambda: hotshot.stats.load(filename) gc_collect() prof = hotshot.Profile(filename) began = time.time() prof.start() try: result = fn(*args, **kw) finally: prof.stop() ended = time.time() prof.close() return ended - began, load_stats, result