Jiri Olsa d4fcf0a8b9 perf tests: Move attr.py temp dir cleanup into finally section
Currently if there's 'Unsup' exception raised, we do not clean up the
temp directory. Solving this by adding 'finally' to make the cleanup in
any case.

Signed-off-by: Jiri Olsa <jolsa@redhat.com>
Cc: Corey Ashford <cjashfor@linux.vnet.ibm.com>
Cc: Frederic Weisbecker <fweisbec@gmail.com>
Cc: Ingo Molnar <mingo@elte.hu>
Cc: Paul Mackerras <paulus@samba.org>
Cc: Peter Zijlstra <a.p.zijlstra@chello.nl>
Link: http://lkml.kernel.org/r/1352390461-15404-1-git-send-email-jolsa@redhat.com
Signed-off-by: Arnaldo Carvalho de Melo <acme@redhat.com>
2012-11-08 13:16:19 -03:00

323 lines
8.9 KiB
Python

#! /usr/bin/python
import os
import sys
import glob
import optparse
import tempfile
import logging
import shutil
import ConfigParser
class Fail(Exception):
def __init__(self, test, msg):
self.msg = msg
self.test = test
def getMsg(self):
return '\'%s\' - %s' % (self.test.path, self.msg)
class Unsup(Exception):
def __init__(self, test):
self.test = test
def getMsg(self):
return '\'%s\'' % self.test.path
class Event(dict):
terms = [
'flags',
'type',
'size',
'config',
'sample_period',
'sample_type',
'read_format',
'disabled',
'inherit',
'pinned',
'exclusive',
'exclude_user',
'exclude_kernel',
'exclude_hv',
'exclude_idle',
'mmap',
'comm',
'freq',
'inherit_stat',
'enable_on_exec',
'task',
'watermark',
'precise_ip',
'mmap_data',
'sample_id_all',
'exclude_host',
'exclude_guest',
'exclude_callchain_kernel',
'exclude_callchain_user',
'wakeup_events',
'bp_type',
'config1',
'config2',
'branch_sample_type',
'sample_regs_user',
'sample_stack_user',
]
def add(self, data):
for key, val in data:
log.debug(" %s = %s" % (key, val))
self[key] = val
def __init__(self, name, data, base):
log.info(" Event %s" % name);
self.name = name;
self.group = ''
self.add(base)
self.add(data)
def compare_data(self, a, b):
# Allow multiple values in assignment separated by '|'
a_list = a.split('|')
b_list = b.split('|')
for a_item in a_list:
for b_item in b_list:
if (a_item == b_item):
return True
elif (a_item == '*') or (b_item == '*'):
return True
return False
def equal(self, other):
for t in Event.terms:
log.debug(" [%s] %s %s" % (t, self[t], other[t]));
if not self.has_key(t) or not other.has_key(t):
return False
if not self.compare_data(self[t], other[t]):
return False
return True
# Test file description needs to have following sections:
# [config]
# - just single instance in file
# - needs to specify:
# 'command' - perf command name
# 'args' - special command arguments
# 'ret' - expected command return value (0 by default)
#
# [eventX:base]
# - one or multiple instances in file
# - expected values assignments
class Test(object):
def __init__(self, path, options):
parser = ConfigParser.SafeConfigParser()
parser.read(path)
log.warning("running '%s'" % path)
self.path = path
self.test_dir = options.test_dir
self.perf = options.perf
self.command = parser.get('config', 'command')
self.args = parser.get('config', 'args')
try:
self.ret = parser.get('config', 'ret')
except:
self.ret = 0
self.expect = {}
self.result = {}
log.info(" loading expected events");
self.load_events(path, self.expect)
def is_event(self, name):
if name.find("event") == -1:
return False
else:
return True
def load_events(self, path, events):
parser_event = ConfigParser.SafeConfigParser()
parser_event.read(path)
# The event record section header contains 'event' word,
# optionaly followed by ':' allowing to load 'parent
# event' first as a base
for section in filter(self.is_event, parser_event.sections()):
parser_items = parser_event.items(section);
base_items = {}
# Read parent event if there's any
if (':' in section):
base = section[section.index(':') + 1:]
parser_base = ConfigParser.SafeConfigParser()
parser_base.read(self.test_dir + '/' + base)
base_items = parser_base.items('event')
e = Event(section, parser_items, base_items)
events[section] = e
def run_cmd(self, tempdir):
cmd = "PERF_TEST_ATTR=%s %s %s -o %s/perf.data %s" % (tempdir,
self.perf, self.command, tempdir, self.args)
ret = os.WEXITSTATUS(os.system(cmd))
log.info(" running '%s' ret %d " % (cmd, ret))
if ret != int(self.ret):
raise Unsup(self)
def compare(self, expect, result):
match = {}
log.info(" compare");
# For each expected event find all matching
# events in result. Fail if there's not any.
for exp_name, exp_event in expect.items():
exp_list = []
log.debug(" matching [%s]" % exp_name)
for res_name, res_event in result.items():
log.debug(" to [%s]" % res_name)
if (exp_event.equal(res_event)):
exp_list.append(res_name)
log.debug(" ->OK")
else:
log.debug(" ->FAIL");
log.info(" match: [%s] matches %s" % (exp_name, str(exp_list)))
# we did not any matching event - fail
if (not exp_list):
raise Fail(self, 'match failure');
match[exp_name] = exp_list
# For each defined group in the expected events
# check we match the same group in the result.
for exp_name, exp_event in expect.items():
group = exp_event.group
if (group == ''):
continue
for res_name in match[exp_name]:
res_group = result[res_name].group
if res_group not in match[group]:
raise Fail(self, 'group failure')
log.info(" group: [%s] matches group leader %s" %
(exp_name, str(match[group])))
log.info(" matched")
def resolve_groups(self, events):
for name, event in events.items():
group_fd = event['group_fd'];
if group_fd == '-1':
continue;
for iname, ievent in events.items():
if (ievent['fd'] == group_fd):
event.group = iname
log.debug('[%s] has group leader [%s]' % (name, iname))
break;
def run(self):
tempdir = tempfile.mkdtemp();
try:
# run the test script
self.run_cmd(tempdir);
# load events expectation for the test
log.info(" loading result events");
for f in glob.glob(tempdir + '/event*'):
self.load_events(f, self.result);
# resolve group_fd to event names
self.resolve_groups(self.expect);
self.resolve_groups(self.result);
# do the expectation - results matching - both ways
self.compare(self.expect, self.result)
self.compare(self.result, self.expect)
finally:
# cleanup
shutil.rmtree(tempdir)
def run_tests(options):
for f in glob.glob(options.test_dir + '/' + options.test):
try:
Test(f, options).run()
except Unsup, obj:
log.warning("unsupp %s" % obj.getMsg())
def setup_log(verbose):
global log
level = logging.CRITICAL
if verbose == 1:
level = logging.WARNING
if verbose == 2:
level = logging.INFO
if verbose >= 3:
level = logging.DEBUG
log = logging.getLogger('test')
log.setLevel(level)
ch = logging.StreamHandler()
ch.setLevel(level)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
USAGE = '''%s [OPTIONS]
-d dir # tests dir
-p path # perf binary
-t test # single test
-v # verbose level
''' % sys.argv[0]
def main():
parser = optparse.OptionParser(usage=USAGE)
parser.add_option("-t", "--test",
action="store", type="string", dest="test")
parser.add_option("-d", "--test-dir",
action="store", type="string", dest="test_dir")
parser.add_option("-p", "--perf",
action="store", type="string", dest="perf")
parser.add_option("-v", "--verbose",
action="count", dest="verbose")
options, args = parser.parse_args()
if args:
parser.error('FAILED wrong arguments %s' % ' '.join(args))
return -1
setup_log(options.verbose)
if not options.test_dir:
print 'FAILED no -d option specified'
sys.exit(-1)
if not options.test:
options.test = 'test*'
try:
run_tests(options)
except Fail, obj:
print "FAILED %s" % obj.getMsg();
sys.exit(-1)
sys.exit(0)
if __name__ == '__main__':
main()