test-boot: run in parallel

--quit-after-boot: fix for gem5, update path to gem5.sh

Improve the printing of results and errors:

- remove newlines from IDs at the end for ./test-boot
- remove newlines from progress for __call__ commands and don't print executed commands at all,
  otherwise there are too many lines per test and it is hard to tell what is going on
- print backtraces for any exception in the threads (bugs while developing this code)

Tests across different archs and emulators are still not running in parallel,
which is a huge loss. TODO.

thread_pool: introduce with API. This was motivate by test-boot, I've had enough
of doing separate error handling for each loop type! Greatly dries up the code, awesome.

common: make --all-emulators work properly with native hopefully for the last time,
./test-baremetal was still failing.

gem5: don't pass --command-line for baremetal. Maybe later we can use it to actually
pass command line arguments to main()? To be seen.
This commit is contained in:
Ciro Santilli 六四事件 法轮功
2019-05-21 00:00:00 +00:00
parent 75fd708099
commit 01984c2201
12 changed files with 640 additions and 318 deletions

View File

@@ -109,36 +109,28 @@ Build the baremetal examples with crosstool-NG.
with thread_pool.ThreadPool(
self._build_one,
nthreads=self.env['nproc'],
submit_raise_exit=self.env['quit_on_fail'],
) as my_thread_pool:
try:
for target in self.env['targets']:
for path, in_dirnames, in_filenames in self.sh.walk(target):
for in_filename in in_filenames:
in_ext = os.path.splitext(in_filename)[1]
if not in_ext in self.env['build_in_exts']:
continue
in_path = os.path.join(path, in_filename)
error = my_thread_pool.submit({
'cc_flags': cc_flags,
'extra_deps': [
self.env['baremetal_link_script'],
self.env['common_h']
],
'extra_objs': [syscalls_obj],
'extra_objs_baremetal_bootloader': [extra_obj_baremetal_bootloader],
'extra_objs_lkmc_common': [extra_obj_lkmc_common],
'in_path': in_path,
'out_path': self.resolve_baremetal_executable(in_path),
})
if error is not None:
raise common.ExitLoop()
except common.ExitLoop:
pass
error = my_thread_pool.get_error()
if error is not None:
print(error)
return 1
for target in self.env['targets']:
for path, in_dirnames, in_filenames in self.sh.walk(target):
for in_filename in in_filenames:
in_ext = os.path.splitext(in_filename)[1]
if not in_ext in self.env['build_in_exts']:
continue
in_path = os.path.join(path, in_filename)
my_thread_pool.submit({
'cc_flags': cc_flags,
'extra_deps': [
self.env['baremetal_link_script'],
self.env['common_h']
],
'extra_objs': [syscalls_obj],
'extra_objs_baremetal_bootloader': [extra_obj_baremetal_bootloader],
'extra_objs_lkmc_common': [extra_obj_lkmc_common],
'in_path': in_path,
'out_path': self.resolve_baremetal_executable(in_path),
})
return self._handle_thread_pool_errors(my_thread_pool)
def get_build_dir(self):
return self.env['baremetal_build_dir']

View File

@@ -3,7 +3,7 @@
# what to build depends on --size, which ./build does not support right now.
# The best way to solve this is to move the dependency checking into the run
# scripts, which will take a while to refactor.
set -eu
set -eux
test_size=1
while [ $# -gt 0 ]; do
case "$1" in

View File

@@ -82,37 +82,32 @@ Default: build all examples that have their package dependencies met, e.g.:
with thread_pool.ThreadPool(
self._build_one,
nthreads=self.env['nproc'],
submit_raise_exit=self.env['quit_on_fail'],
) as my_thread_pool:
try:
for target in self.env['targets']:
for path, in_dirnames, in_filenames in self.sh.walk(target):
for in_filename in in_filenames:
in_ext = os.path.splitext(in_filename)[1]
if not in_ext in self.env['build_in_exts']:
continue
in_path = os.path.join(path, in_filename)
error = my_thread_pool.submit({
'cc_flags': cc_flags,
'extra_objs_lkmc_common': [extra_obj_lkmc_common],
'extra_objs_userland_asm': [extra_obj_userland_asm],
'in_path': in_path,
'out_path': self.resolve_userland_executable(in_path),
})
if error is not None:
raise common.ExitLoop()
except common.ExitLoop:
pass
error = my_thread_pool.get_error()
if error is not None:
print(error)
return 1
for target in self.env['targets']:
for path, in_dirnames, in_filenames in self.sh.walk(target):
for in_filename in in_filenames:
in_ext = os.path.splitext(in_filename)[1]
if not in_ext in self.env['build_in_exts']:
continue
in_path = os.path.join(path, in_filename)
my_thread_pool.submit({
'cc_flags': cc_flags,
'extra_objs_lkmc_common': [extra_obj_lkmc_common],
'extra_objs_userland_asm': [extra_obj_userland_asm],
'in_path': in_path,
'out_path': self.resolve_userland_executable(in_path),
})
exit_status = self._handle_thread_pool_errors(my_thread_pool)
if exit_status != 0:
return exit_status
if not self.env['in_tree']:
self.sh.copy_dir_if_update(
srcdir=build_dir,
destdir=self.env['out_rootfs_overlay_lkmc_dir'],
filter_ext=self.env['userland_executable_ext'],
)
return 0
return exit_status
def clean(self):
if self.env['in_tree']:

136
common.py
View File

@@ -22,6 +22,7 @@ import signal
import subprocess
import sys
import threading
from typing import Union
import time
import urllib
import urllib.request
@@ -30,6 +31,7 @@ from shell_helpers import LF
import cli_function
import path_properties
import shell_helpers
import thread_pool
common = sys.modules[__name__]
@@ -265,6 +267,14 @@ TODO: implement fully, some stuff is escaping it currently.
default=True,
help='''\
Stop running at the first failed test.
'''
)
self.add_argument(
'--show-cmds',
default=True,
help='''\
Print the exact Bash command equivalents being run by this script.
Implied by --quiet.
'''
)
self.add_argument(
@@ -583,19 +593,40 @@ Incompatible archs are skipped.
def __call__(self, *args, **kwargs):
'''
For Python code calls, in addition to base:
For Python code calls, in addition to base class behaviour:
- print the CLI equivalent of the call
- automatically forward common arguments
* print the CLI equivalent of the call
* automatically forward common arguments
'''
print_cmd = ['./' + self.extra_config_params, LF]
if 'print_cmd_oneline' in kwargs:
force_oneline = kwargs['print_cmd_oneline']
del kwargs['print_cmd_oneline']
else:
force_oneline=False
for line in self.get_cli(**kwargs):
print_cmd.extend(line)
print_cmd.append(LF)
if not ('quiet' in kwargs and kwargs['quiet']):
shell_helpers.ShellHelpers().print_cmd(print_cmd)
shell_helpers.ShellHelpers().print_cmd(
print_cmd,
force_oneline=force_oneline
)
return super().__call__(**kwargs)
def _handle_thread_pool_errors(self, my_thread_pool):
handle_output_result = my_thread_pool.get_handle_output_result()
if handle_output_result is not None:
work_function_input, work_function_return, exception = handle_output_result
if not type(exception) is thread_pool.ThreadPoolExitException:
print('work_function or handle_output raised unexpectedly:')
print(thread_pool.ThreadPool.exception_traceback_string(exception), end='')
print('work_function_input: {}'.format(work_function_input))
print('work_function_return: {}'.format(work_function_return))
return 1
else:
return 0
def _init_env(self, env):
'''
Update the kwargs from the command line with values derived from them.
@@ -823,7 +854,10 @@ Incompatible archs are skipped.
env['linux_image'] = env['lkmc_linux_image']
env['linux_config'] = join(env['linux_build_dir'], '.config')
if env['emulator']== 'gem5':
env['userland_quit_cmd'] = './gem5_exit.sh'
env['userland_quit_cmd'] = join(
env['guest_lkmc_home'],
'gem5_exit.sh'
)
else:
env['userland_quit_cmd'] = join(
env['guest_lkmc_home'],
@@ -1141,11 +1175,17 @@ lunch aosp_{}-eng
real_archs = consts['all_long_archs']
else:
real_archs = env['archs']
if env['all_emulators']:
real_all_emulators = env['all_emulators']
if real_all_emulators:
real_emulators = consts['all_long_emulators']
else:
real_emulators = env['emulators']
return_value = 0
if env['_args_given']['show_cmds']:
show_cmds = env['show_cmds']
else:
show_cmds = not env['quiet']
self.setup(env)
try:
for emulator in real_emulators:
for arch in real_archs:
@@ -1153,7 +1193,15 @@ lunch aosp_{}-eng
arch = env['arch_short_to_long_dict'][arch]
if emulator == 'native':
if arch != env['host_arch']:
continue
if real_all_archs:
continue
else:
raise Exception('native emulator only supported in if target arch == host arch')
if env['userland'] is None:
if real_all_emulators:
continue
else:
raise Exception('native emulator only supported in user mode')
if self.is_arch_supported(arch):
if not env['dry_run']:
start_time = time.time()
@@ -1169,7 +1217,7 @@ lunch aosp_{}-eng
self._init_env(self.env)
self.sh = shell_helpers.ShellHelpers(
dry_run=self.env['dry_run'],
quiet=self.env['quiet'],
quiet=(not show_cmds),
)
self.setup_one()
ret = self.timed_main()
@@ -1317,6 +1365,14 @@ lunch aosp_{}-eng
self.env['userland_executable_ext'],
)
def setup(self, env):
'''
Similar to setup run before all timed_main are called.
_init_env has not yet been called, so only primary CLI arguments may be used.
'''
pass
def setup_one(self):
'''
Run just before timed_main, after _init_env.
@@ -1338,9 +1394,11 @@ lunch aosp_{}-eng
'''
pass
def teardown(self):
def teardown(self) -> Union[None,int]:
'''
Similar to setup, but run once after all timed_main are called.
:return: if not None, the return integer gets used as the exit status of the program.
'''
pass
@@ -1602,6 +1660,7 @@ class TestCliFunction(LkmcCliFunction):
def __init__(self, *args, **kwargs):
defaults = {
'quit_on_fail': False,
'show_time': False,
}
if 'defaults' in kwargs:
@@ -1610,6 +1669,17 @@ class TestCliFunction(LkmcCliFunction):
super().__init__(*args, **kwargs)
self.test_results = queue.Queue()
def handle_output_function(
self,
work_function_input,
work_function_return,
work_function_exception
):
if work_function_exception is not None:
return work_function_exception
if work_function_return.status != TestStatus.PASS:
return thread_pool.ThreadPoolExitException()
def run_test(
self,
run_obj,
@@ -1624,17 +1694,33 @@ class TestCliFunction(LkmcCliFunction):
More complex tests might need to run the steps separately, e.g. gdb tests
must run multiple commands: one for the run and one GDB.
This function is meant to be called from threads. In particular,
those threads have to cross over archs: the original motivation is to parallelize
super slow gem5 boot tests. Therefore, we cannot use self.env['arch'] and selv.env['emulator']
in this function or callees!
Ideally, we should make this static and pass all arguments to the call... but lazy to refactor.
I have the feeling I will regret this one day down the line.
:param run_obj: callable object
:param run_args: arguments to be passed to the runnable object
:param test_id: test identifier, to be added in addition to of arch and emulator ids
:param thread_id: which thread the test is running under
'''
if run_obj.is_arch_supported(self.env['arch']):
if run_args is None:
run_args = {}
run_args['run_id'] = thread_id
test_id_string = self.test_setup(test_id)
exit_status = run_obj(**run_args)
if run_obj.is_arch_supported(run_args['archs'][0]):
cur_run_args = {
'background': True,
'ctrl_c_host': True,
'print_cmd_oneline': True,
'run_id': thread_id,
'show_cmds': False,
'show_stdout': False,
'show_time': False,
}
if run_args is not None:
cur_run_args.update(run_args)
test_id_string = self.test_setup(run_args, test_id)
exit_status = run_obj(**cur_run_args)
return self.test_teardown(
run_obj,
exit_status,
@@ -1642,11 +1728,11 @@ class TestCliFunction(LkmcCliFunction):
expected_exit_status=expected_exit_status
)
def test_setup(self, test_id):
test_id_string = '{} {}'.format(self.env['emulator'], self.env['arch'])
if test_id is not None:
def test_setup(self, run_args, test_id):
test_id_string = '{} {}'.format(run_args['emulators'][0], run_args['archs'][0])
if test_id is not None and str(test_id) != '':
test_id_string += ' {}'.format(test_id)
self.log_info('test_id {}'.format(test_id_string), flush=True)
self.log_info('Starting: {}'.format(repr(test_id_string)), flush=True)
return test_id_string
def test_teardown(
@@ -1661,24 +1747,24 @@ class TestCliFunction(LkmcCliFunction):
reason = ''
if not self.env['dry_run']:
if exit_status == expected_exit_status:
test_result = TestStatus.PASS
test_status = TestStatus.PASS
else:
test_result = TestStatus.FAIL
test_status = TestStatus.FAIL
reason = 'wrong exit status, got {} expected {}'.format(
exit_status,
expected_exit_status
)
ellapsed_seconds = run_obj.ellapsed_seconds
else:
test_result = TestStatus.PASS
test_status = TestStatus.PASS
ellapsed_seconds = 0
test_result = TestResult(
test_id_string,
test_result,
test_status,
ellapsed_seconds,
reason
)
self.log_info(test_result)
self.log_info('Result: ' + str(test_result))
self.test_results.put(test_result)
return test_result
@@ -1686,7 +1772,7 @@ class TestCliFunction(LkmcCliFunction):
'''
:return: 1 if any test failed, 0 otherwise
'''
self.log_info('\nTest result summary')
self.log_info('\nTest result summary:')
passes = []
fails = []
while not self.test_results.empty():

View File

@@ -68,12 +68,7 @@ class PathProperties:
'skip_run_unclassified': False,
# Aruments added automatically to run when running tests,
# but not on manual running.
'test_run_args': {
'ctrl_c_host': True,
'show_stdout': False,
'show_time': False,
'background': True,
},
'test_run_args': {},
}
'''
@@ -142,6 +137,13 @@ class PathProperties:
)
)
def _update_dict(self, other_tmp_properties, key):
if key in self.properties and key in other_tmp_properties:
other_tmp_properties[key] = {
**self.properties[key],
**other_tmp_properties[key]
}
def _update_list(self, other_tmp_properties, key):
if key in self.properties and key in other_tmp_properties:
other_tmp_properties[key] = \
@@ -153,11 +155,7 @@ class PathProperties:
self._update_list(other_tmp_properties, 'cc_flags')
self._update_list(other_tmp_properties, 'cc_flags_after')
self._update_list(other_tmp_properties, 'extra_objs')
if 'test_run_args' in self.properties and 'test_run_args' in other_tmp_properties:
other_tmp_properties['test_run_args'] = {
**self.properties['test_run_args'],
**other_tmp_properties['test_run_args']
}
self._update_dict(other_tmp_properties, 'test_run_args')
return self.properties.update(other_tmp_properties)
class PrefixTree:

48
run
View File

@@ -231,9 +231,9 @@ Setup a kernel init parameter that makes the emulator quit immediately after boo
help='''\
Output directly to the terminal, don't pipe to tee as the default.
With this, we don't not save the output to a file as is done by default,
but we are able to do things that require not having a pipe suh as you to
using debuggers. This option issSet automatically by --debug-vm, but you still need
it to debug gem5 Python scripts with pdb.
but we are able to do things that require not having a pipe such as
using debuggers. This option is set automatically by --debug-vm, but you
still need it to debug gem5 Python scripts with pdb.
'''
)
self.add_argument(
@@ -309,8 +309,6 @@ Extra options to append at the end of the emulator command line.
)
def timed_main(self):
if self.env['emulator'] == 'native' and self.env['userland'] is None:
raise Exception('native emulator only supported in user mode')
show_stdout = self.env['show_stdout']
# Common qemu / gem5 logic.
# nokaslr:
@@ -481,7 +479,8 @@ Extra options to append at the end of the emulator command line.
if self.env['arch'] == 'x86_64':
if self.env['kvm']:
cmd.extend(['--cpu-type', 'X86KvmCPU', LF])
cmd.extend(['--command-line', 'earlyprintk={} lpj=7999923 root=/dev/sda {}'.format(console, kernel_cli), LF])
if self.env['baremetal'] is None:
cmd.extend(['--command-line', 'earlyprintk={} lpj=7999923 root=/dev/sda {}'.format(console, kernel_cli), LF])
elif self.env['is_arm']:
if self.env['kvm']:
cmd.extend(['--cpu-type', 'ArmV8KvmCPU', LF])
@@ -492,14 +491,24 @@ Extra options to append at the end of the emulator command line.
cmd.extend([
# TODO why is it mandatory to pass mem= here? Not true for QEMU.
# Anything smaller than physical blows up as expected, but why can't it auto-detect the right value?
'--command-line', 'earlyprintk=pl011,0x1c090000 lpj=19988480 rw loglevel=8 mem={} root=/dev/sda {}'.format(memory, kernel_cli), LF,
'--machine-type', self.env['machine'], LF,
])
if self.env['baremetal'] is None:
cmd.extend(['--command-line', 'earlyprintk=pl011,0x1c090000 lpj=19988480 rw loglevel=8 mem={} root=/dev/sda {}'.format(memory, kernel_cli), LF])
dtb = None
if self.env['dtb'] is not None:
dtb = self.env['dtb']
elif self.env['dp650']:
dtb = os.path.join(self.env['gem5_system_dir'], 'arm', 'dt', 'armv{}_gem5_v1_{}{}cpu.dtb'.format(self.env['armv'], dp650_cmd, self.env['cpus']))
dtb = os.path.join(
self.env['gem5_system_dir'],
'arm',
'dt',
'armv{}_gem5_v1_{}{}cpu.dtb'.format(
self.env['armv'],
dp650_cmd,
self.env['cpus']
)
)
if dtb is not None:
cmd.extend(['--dtb-filename', dtb, LF])
if self.env['baremetal'] is None:
@@ -521,7 +530,13 @@ Extra options to append at the end of the emulator command line.
cpt_dir = self.gem5_list_checkpoint_dirs()[-self.env['gem5_restore']]
extra_emulator_args.extend(['--restore-from', os.path.join(self.env['m5out_dir'], cpt_dir)])
cmd.extend([
os.path.join(self.env['gem5_source_dir'], 'configs', 'example', 'arm', 'fs_bigLITTLE.py'), LF,
os.path.join(
self.env['gem5_source_dir'],
'configs',
'example',
'arm',
'fs_bigLITTLE.py'
), LF,
'--big-cpus', '2', LF,
'--cpu-type', cpu_type, LF,
'--disk', self.env['disk_image'], LF,
@@ -529,7 +544,15 @@ Extra options to append at the end of the emulator command line.
'--little-cpus', '2', LF,
])
if self.env['dtb']:
cmd.extend(['--dtb', os.path.join(self.env['gem5_system_dir'], 'arm', 'dt', 'armv8_gem5_v1_big_little_2_2.dtb'), NL])
cmd.extend([
'--dtb',
os.path.join(self.env['gem5_system_dir'],
'arm',
'dt',
'armv8_gem5_v1_big_little_2_2.dtb'
),
LF
])
if self.env['gdb_wait']:
# https://stackoverflow.com/questions/49296092/how-to-make-gem5-wait-for-gdb-to-connect-to-reliably-break-at-start-kernel-of-th
cmd.extend(['--param', 'system.cpu[0].wait_for_remote_gdb = True', LF])
@@ -570,7 +593,10 @@ Extra options to append at the end of the emulator command line.
serial_monitor = ['-serial', serial, LF]
if self.env['kvm']:
extra_emulator_args.extend(['-enable-kvm', LF])
extra_emulator_args.extend(['-serial', 'tcp::{},server,nowait'.format(self.env['extra_serial_port']), LF])
extra_emulator_args.extend([
'-serial',
'tcp::{},server,nowait'.format(self.env['extra_serial_port']), LF
])
virtfs_data = [
(self.env['p9_dir'], 'host_data'),
(self.env['out_dir'], 'host_out'),

View File

@@ -11,6 +11,7 @@ import stat
import subprocess
import sys
import threading
from typing import List, Union
import urllib.request
class LF:
@@ -85,10 +86,22 @@ class ShellHelpers:
os.chmod(path, new_mode)
@staticmethod
def cmd_to_string(cmd, cwd=None, extra_env=None, extra_paths=None):
def cmd_to_string(
cmd: List[Union[str, LF]],
cwd=None,
extra_env=None,
extra_paths=None,
force_oneline: bool =False,
):
'''
Format a command given as a list of strings so that it can
be viewed nicely and executed by bash directly and print it to stdout.
If cmd contains:
* no LF, then newlines are added after every word
* exactly one LF at the end, then no newlines are added
* otherwise: newlines are added exactly at each LF
'''
last_newline = ' \\\n'
newline_separator = last_newline + ' '
@@ -105,14 +118,22 @@ class ShellHelpers:
newline_count = 0
for arg in cmd:
if arg == LF:
cmd_quote.append(arg)
newline_count += 1
if not force_oneline:
cmd_quote.append(arg)
newline_count += 1
else:
cmd_quote.append(shlex.quote(arg))
if newline_count > 0:
cmd_quote = [' '.join(list(y)) for x, y in itertools.groupby(cmd_quote, lambda z: z == LF) if not x]
if force_oneline or newline_count > 0:
cmd_quote = [
' '.join(list(y))
for x, y in itertools.groupby(
cmd_quote,
lambda z: z == LF
)
if not x
]
out.extend(cmd_quote)
if newline_count == 1 and cmd[-1] == LF:
if force_oneline or newline_count == 1 and cmd[-1] == LF:
ending = ''
else:
ending = last_newline + ';'
@@ -157,20 +178,31 @@ class ShellHelpers:
else:
shutil.copy2(src, dest)
def print_cmd(self, cmd, cwd=None, cmd_file=None, extra_env=None, extra_paths=None):
def print_cmd(
self,
cmd,
cwd=None,
cmd_file=None,
extra_env=None,
extra_paths=None,
force_oneline=False,
):
'''
Print cmd_to_string to stdout.
Optionally save the command to cmd_file file, and add extra_env
environment variables to the command generated.
If cmd contains at least one LF, newlines are only added on LF.
Otherwise, newlines are added automatically after every word.
'''
if type(cmd) is str:
cmd_string = cmd
else:
cmd_string = self.cmd_to_string(cmd, cwd=cwd, extra_env=extra_env, extra_paths=extra_paths)
cmd_string = self.cmd_to_string(
cmd,
cwd=cwd,
extra_env=extra_env,
extra_paths=extra_paths,
force_oneline=force_oneline,
)
if not self.quiet:
self._print_thread_safe('+ ' + cmd_string)
if cmd_file is not None:
@@ -371,3 +403,29 @@ class ShellHelpers:
if not self.dry_run:
with open(path, mode) as f:
f.write(string)
if __name__ == '__main__':
shell_helpers = ShellHelpers()
if 'cmd_to_string':
# Default.
assert shell_helpers.cmd_to_string(['cmd']) == 'cmd \\\n;'
assert shell_helpers.cmd_to_string(['cmd', 'arg1']) == 'cmd \\\n arg1 \\\n;'
assert shell_helpers.cmd_to_string(['cmd', 'arg1', 'arg2']) == 'cmd \\\n arg1 \\\n arg2 \\\n;'
# Argument with a space gets escaped.
assert shell_helpers.cmd_to_string(['cmd', 'arg1 arg2']) == "cmd \\\n 'arg1 arg2' \\\n;"
# Ending in LF with no other LFs get separated only by spaces.
assert shell_helpers.cmd_to_string(['cmd', LF]) == 'cmd'
assert shell_helpers.cmd_to_string(['cmd', 'arg1', LF]) == 'cmd arg1'
assert shell_helpers.cmd_to_string(['cmd', 'arg1', 'arg2', LF]) == 'cmd arg1 arg2'
# More than one LF adds newline separators at each LF.
assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF]) == 'cmd \\\n arg1 \\\n;'
assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF, 'arg2', LF]) == 'cmd \\\n arg1 \\\n arg2 \\\n;'
assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', 'arg2', LF]) == 'cmd \\\n arg1 arg2 \\\n;'
# force_oneline separates everything simply by spaces.
assert \
shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF, 'arg2', LF], force_oneline=True) \
== 'cmd arg1 arg2'

View File

@@ -32,43 +32,32 @@ If given, run only the given tests. Otherwise, run all tests.
rootdir_abs_len = len(self.env['root_dir'])
with thread_pool.ThreadPool(
self.run_test,
handle_output=self.handle_output_function,
nthreads=self.env['nproc'],
thread_id_arg='thread_id',
submit_raise_exit=self.env['quit_on_fail'],
) as my_thread_pool:
try:
for test in self.env['tests']:
for path, in_dirnames, in_filenames in self.sh.walk(test):
path_abs = os.path.abspath(path)
dirpath_relative_root = path_abs[rootdir_abs_len + 1:]
for in_filename in in_filenames:
if os.path.splitext(in_filename)[1] in (self.env['c_ext'], self.env['asm_ext']):
path_relative_root = os.path.join(dirpath_relative_root, in_filename)
my_path_properties = path_properties.get(path_relative_root)
if my_path_properties.should_be_tested(self.env):
cur_run_args = run_args.copy()
cur_run_args.update({
'baremetal': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()),
})
cur_run_args.update(my_path_properties['test_run_args'])
test_args = {
'expected_exit_status': my_path_properties['exit_status'],
'run_args': cur_run_args,
'run_obj': lkmc.import_path.import_path_main('run'),
'test_id': path_relative_root,
}
error = my_thread_pool.submit(test_args)
if error is not None:
if self.env['quit_on_fail']:
raise common.ExitLoop()
except common.ExitLoop:
pass
error = my_thread_pool.get_error()
if error is not None:
print(error)
return 1
else:
return 0
for test in self.env['tests']:
for path, in_dirnames, in_filenames in self.sh.walk(test):
path_abs = os.path.abspath(path)
dirpath_relative_root = path_abs[rootdir_abs_len + 1:]
for in_filename in in_filenames:
if os.path.splitext(in_filename)[1] in (self.env['c_ext'], self.env['asm_ext']):
path_relative_root = os.path.join(dirpath_relative_root, in_filename)
my_path_properties = path_properties.get(path_relative_root)
if my_path_properties.should_be_tested(self.env):
cur_run_args = run_args.copy()
cur_run_args.update({
'baremetal': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()),
})
cur_run_args.update(my_path_properties['test_run_args'])
my_thread_pool.submit({
'expected_exit_status': my_path_properties['exit_status'],
'run_args': cur_run_args,
'run_obj': lkmc.import_path.import_path_main('run'),
'test_id': path_relative_root,
})
return self._handle_thread_pool_errors(my_thread_pool)
if __name__ == '__main__':
Main().cli()

View File

@@ -2,6 +2,7 @@
import common
import lkmc.import_path
import thread_pool
import shell_helpers
from shell_helpers import LF
@@ -21,14 +22,24 @@ See ./test --help for --size.
'''
)
def _bench(self, **kwargs):
def _bench(self, **run_args):
run_obj = lkmc.import_path.import_path_main('run')
words = []
for line in self.run.get_cli(**kwargs):
test_id_args = run_args.copy()
del test_id_args['run_id']
for line in run_obj.get_cli(**test_id_args):
words.extend(line)
extra_params = shell_helpers.ShellHelpers().cmd_to_string(words + [LF])
run_args = kwargs.copy()
run_args.update(self.common_args)
self.run_test(self.run, run_args, extra_params)
test_id = shell_helpers.ShellHelpers().cmd_to_string(words, force_oneline=True)
return self.run_test(run_obj, run_args, test_id)
def setup(self, env):
self.my_thread_pool = thread_pool.ThreadPool(
self._bench,
handle_output=self.handle_output_function,
nthreads=env['nproc'],
thread_id_arg='run_id',
submit_skip_exit=env['quit_on_fail'],
)
def timed_main(self):
# TODO bring this benchmark code back to life. Likely should go inside run with an option
@@ -46,19 +57,20 @@ See ./test --help for --size.
#)
#
#rm -f "${self.env['test_boot_benchmark_file']}"
self.run = lkmc.import_path.import_path_main('run')
self.common_args = self.get_common_args()
self.common_args['ctrl_c_host'] = True
self.common_args['quit_after_boot'] = True
common_args = self.get_common_args()
common_args['ctrl_c_host'] = True
common_args['quit_after_boot'] = True
# To see it blow up during development.
# self.common_args['eval'] = 'insmod /lkmc/panic.ko'
if (self.env['emulator'] == 'qemu' or
(self.env['emulator'] == 'gem5' and self.env['size'] >= 2)):
self._bench()
self.my_thread_pool.submit(common_args)
if self.env['host_arch'] == self.env['arch']:
# TODO: find out why it fails.
if self.env['emulator'] != 'gem5':
self._bench(kvm=True)
self.my_thread_pool.submit({**common_args, **{'kvm': True}})
if self.env['emulator'] == 'qemu' and self.env['size'] >= 2:
self._bench(trace='exec_tb')
self.my_thread_pool.submit({**common_args, **{'trace': 'exec_tb'}})
if self.env['emulator'] == 'gem5' and self.env['size'] >= 3:
if self.env['arch'] == 'x86_64':
cpu_types = [
@@ -71,23 +83,28 @@ See ./test --help for --size.
'HPI',
]
for cpu_type in cpu_types:
self._bench(
extra_emulator_args=[
'--cpu-type', cpu_type,
'--caches',
'--l2cache',
'--l1d_size', '1024kB',
'--l1i_size', '1024kB',
'--l2_size', '1024kB',
'--l3_size', '1024kB',
self.my_thread_pool.submit({**common_args, **{
'extra_emulator_args': [
'--cpu-type', cpu_type, LF,
'--caches', LF,
'--l2cache', LF,
'--l1d_size', '1024kB', LF,
'--l1i_size', '1024kB', LF,
'--l2_size', '1024kB', LF,
'--l3_size', '1024kB', LF,
],
)
}})
if self.env['arch'] == 'aarch64':
# Do a fuller testing for aarch64.
for build_type in ['debug', 'fast']:
self._bench(gem5_build_type=build_type)
self.my_thread_pool.submit({**common_args, **{'gem5_build_type': build_type}})
# Requires patching the executable.
# self._bench(gem5_script='biglittle')
# self.my_thread_pool.submit({{**common_args, 'gem5_script': 'biglittle'}})
def teardown(self):
self.my_thread_pool.join()
self._handle_thread_pool_errors(self.my_thread_pool)
return super().teardown()
if __name__ == '__main__':
Main().cli()

View File

@@ -57,10 +57,10 @@ found by searching for the Python test files.
test_source_base = os.path.relpath(base, self.env['root_dir'])
common_args = self.get_common_args()
common_args['baremetal'] = test_source_base + ext
test_id_string = self.test_setup(test_source_base)
run_args = common_args.copy()
run_args['gdb_wait'] = True
run_args['background'] = True
test_id_string = self.test_setup(run_args, test_source_base)
run_thread = threading.Thread(target=lambda: run(**run_args))
run_thread.start()
gdb_args = common_args.copy()

View File

@@ -40,44 +40,35 @@ If given, run only the given tests. Otherwise, run all tests.
rootdir_abs_len = len(self.env['root_dir'])
with thread_pool.ThreadPool(
self.run_test,
handle_output=self.handle_output_function,
nthreads=self.env['nproc'],
thread_id_arg='thread_id',
submit_raise_exit=self.env['quit_on_fail'],
) as my_thread_pool:
try:
for test in self.env['tests']:
for path, in_dirnames, in_filenames in self.sh.walk(test):
path_abs = os.path.abspath(path)
dirpath_relative_root = path_abs[rootdir_abs_len + 1:]
for in_filename in in_filenames:
if os.path.splitext(in_filename)[1] in self.env['build_in_exts']:
path_relative_root = os.path.join(dirpath_relative_root, in_filename)
my_path_properties = path_properties.get(path_relative_root)
if my_path_properties.should_be_tested(self.env):
cur_run_args = run_args.copy()
cur_run_args.update({
'userland': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()),
})
cur_run_args.update(my_path_properties['test_run_args'])
run_test_args = {
'expected_exit_status': my_path_properties['exit_status'],
'run_args': cur_run_args,
'run_obj': lkmc.import_path.import_path_main('run'),
'test_id': path_relative_root,
}
if my_path_properties['receives_signal']:
run_test_args['expected_exit_status'] = 128 - my_path_properties['exit_status']
error = my_thread_pool.submit(run_test_args)
if error is not None:
if self.env['quit_on_fail']:
raise common.ExitLoop()
except common.ExitLoop:
pass
error = my_thread_pool.get_error()
if error is not None:
print(error)
return 1
else:
return 0
for test in self.env['tests']:
for path, in_dirnames, in_filenames in self.sh.walk(test):
path_abs = os.path.abspath(path)
dirpath_relative_root = path_abs[rootdir_abs_len + 1:]
for in_filename in in_filenames:
if os.path.splitext(in_filename)[1] in self.env['build_in_exts']:
path_relative_root = os.path.join(dirpath_relative_root, in_filename)
my_path_properties = path_properties.get(path_relative_root)
if my_path_properties.should_be_tested(self.env):
cur_run_args = run_args.copy()
cur_run_args.update({
'userland': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()),
})
cur_run_args.update(my_path_properties['test_run_args'])
run_test_args = {
'expected_exit_status': my_path_properties['exit_status'],
'run_args': cur_run_args,
'run_obj': lkmc.import_path.import_path_main('run'),
'test_id': path_relative_root,
}
if my_path_properties['receives_signal']:
run_test_args['expected_exit_status'] = 128 - my_path_properties['exit_status']
my_thread_pool.submit(run_test_args)
return self._handle_thread_pool_errors(my_thread_pool)
if __name__ == '__main__':
Main().cli()

View File

@@ -1,11 +1,26 @@
#!/usr/bin/env python3
'''
This file is MIT Licensed because I'm posting it on Stack Overflow:
https://stackoverflow.com/questions/19369724/the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676
'''
from typing import Any, Callable, Dict, Iterable, Union
import os
import queue
import sys
import threading
import time
import traceback
class ThreadPoolExitException(Exception):
'''
An object of this class may be raised by output_handler_function to
request early termination.
It is also raised by submit() if submit_raise_exit=True.
'''
pass
class ThreadPool:
'''
@@ -19,65 +34,153 @@ class ThreadPool:
* queue sizes closely follow number of threads
* if an exception happens, optionally stop soon afterwards
Functional form and further discussion at:
https://stackoverflow.com/questions/19369724/the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676
This class form allows to use your own while loops with submit().
Quick test with:
Exit soon after the first failure happens:
....
python3 thread_pool.py 2 -10 20 0
python3 thread_pool.py 2 -10 20 1
python3 thread_pool.py 2 -10 20 2
python3 thread_pool.py 2 -10 20 3
python3 thread_pool.py 2 -10 20 0 1
python3 thread_pool.py 2 -10 20 handle_output_print
....
These ensure that execution stops neatly on error.
Sample output:
....
{'i': -9} -1.1111111111111112 None
{'i': -8} -1.25 None
{'i': -10} -1.0 None
{'i': -6} -1.6666666666666667 None
{'i': -7} -1.4285714285714286 None
{'i': -4} -2.5 None
{'i': -5} -2.0 None
{'i': -2} -5.0 None
{'i': -3} -3.3333333333333335 None
{'i': 0} None ZeroDivisionError('float division by zero')
{'i': -1} -10.0 None
{'i': 1} 10.0 None
{'i': 2} 5.0 None
work_function or handle_output raised:
Traceback (most recent call last):
File "thread_pool.py", line 181, in _func_runner
work_function_return = self.work_function(**work_function_input)
File "thread_pool.py", line 281, in work_function_maybe_raise
return 10.0 / i
ZeroDivisionError: float division by zero
work_function_input: {'i': 0}
work_function_return: None
....
Don't exit after first failure, run until end:
....
python3 thread_pool.py 2 -10 20 handle_output_print_no_exit
....
Store results in a queue for later inspection instead of printing immediately,
then print everything at the end:
....
python3 thread_pool.py 2 -10 20 handle_output_queue
....
Exit soon after the handle_output raise.
....
python3 thread_pool.py 2 -10 20 handle_output_raise
....
Relying on this interface to abort execution is discouraged, this should
usually only happen due to a programming error in the handler.
Test that the argument called "thread_id" is passed to work_function and printed:
....
python3 thread_pool.py 2 -10 20 handle_output_print thread_id
....
Test with, ThreadPoolExitException and submit_raise_exit=True, same behaviour handle_output_print
except for the different exit cause report:
....
python3 thread_pool.py 2 -10 20 handle_output_raise_exit_exception
....
'''
def __init__(
self,
func: Callable,
work_function: Callable,
handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None,
nthreads: Union[int,None] = None,
thread_id_arg: Union[str,None] = None,
submit_raise_exit: bool = False,
submit_skip_exit: bool = False,
):
'''
Start in a thread pool immediately.
join() must be called afterwards at some point.
:param func: main work function to be evaluated.
:param handle_output: called on func return values as they
:param work_function: main work function to be evaluated.
:param handle_output: called on work_function return values as they
are returned.
Signature is: handle_output(input, output, exception) where:
The function signature is:
* input: input given to func
* output: return value of func
* exception: the exception that func raised, or None otherwise
....
handle_output(
work_function_input: Union[Dict,None],
work_function_return,
work_function_exception: Exception
) -> Union[Exception,None]
....
If this function returns non-None or raises, stop feeding
new input and exit ASAP when all currently running threads
have finished.
where work_function_exception the exception that work_function raised,
or None otherwise
Default: a handler that does nothing and just exits on exception.
The first non-None return value of a call to this function is returned by
submit(), get_handle_output_result() and join().
The intended semantic for this, is to return:
* on success:
** None to continue execution
** ThreadPoolExitException() to request stop execution
* if work_function_input or work_function_exception raise:
** the exception raised
The ThreadPool user can then optionally terminate execution early on error
or request with either:
* an explicit submit() return value check + break if a submit loop is used
* `with` + submit_raise_exit=True
Default: a handler that just returns `exception`, which can normally be used
by the submit loop to detect an error and exit immediately.
:param nthreads: number of threads to use. Default: nproc.
:param thread_id_arg: if not None, set the argument of func with this name
:param thread_id_arg: if not None, set the argument of work_function with this name
to a 0-indexed thread ID. This allows function calls to coordinate
usage of external resources such as files or ports.
:param submit_raise_exit: if True, submit() raises ThreadPoolExitException() if
get_handle_output_result() is not None.
:param submit_skip_exit: if True, submit() does nothing if
get_handle_output_result() is not None.
You should avoid this interface if
you can use use submit_raise_exit with `with` instead ideally.
However, when you can't work with with and are in a deeply nested loop,
it might just be easier to set this.
'''
self.func = func
self.work_function = work_function
if handle_output is None:
handle_output = lambda input, output, exception: exception
self.handle_output = handle_output
if nthreads is None:
nthreads = len(os.sched_getaffinity(0))
self.thread_id_arg = thread_id_arg
self.submit_raise_exit = submit_raise_exit
self.submit_skip_exit = submit_skip_exit
self.nthreads = nthreads
self.error_output = None
self.error_output_lock = threading.Lock()
self.handle_output_result = None
self.handle_output_result_lock = threading.Lock()
self.in_queue = queue.Queue(maxsize=nthreads)
self.threads = []
for i in range(self.nthreads):
@@ -94,69 +197,121 @@ class ThreadPool:
This is cool because it automatically ends the loop if an exception occurs.
But don't forget that errors may happen after the last submit is called, so you
likely want to check for that with get_error after the with.
get_error() returns the same as the explicit join().
But don't forget that errors may happen after the last submit was called, so you
likely want to check for that with get_handle_output_result() after the with.
'''
return self
def __exit__(self, type, value, traceback):
def __exit__(self, exception_type, exception_value, exception_traceback):
self.join()
return exception_type is ThreadPoolExitException
def get_error(self):
return self.error_output
def _func_runner(self, thread_id):
while True:
work_function_input = self.in_queue.get(block=True)
if work_function_input is None:
break
if self.thread_id_arg is not None:
work_function_input[self.thread_id_arg] = thread_id
try:
work_function_exception = None
work_function_return = self.work_function(**work_function_input)
except Exception as e:
work_function_exception = e
work_function_return = None
handle_output_exception = None
try:
handle_output_return = self.handle_output(
work_function_input,
work_function_return,
work_function_exception
)
except Exception as e:
handle_output_exception = e
handle_output_result = None
if handle_output_exception is not None:
handle_output_result = handle_output_exception
elif handle_output_return is not None:
handle_output_result = handle_output_return
if handle_output_result is not None and self.handle_output_result is None:
with self.handle_output_result_lock:
self.handle_output_result = (
work_function_input,
work_function_return,
handle_output_result
)
self.in_queue.task_done()
def submit(self, work):
@staticmethod
def exception_traceback_string(exception):
'''
Submit work. Block if there is already enough work scheduled (~nthreads).
:return: if an error occurred in some previously executed thread, the error.
Otherwise, None. This allows the caller to stop submitting further
work if desired.
Helper to get the traceback from an exception object.
This is usually what you want to print if an error happens in a thread:
https://stackoverflow.com/questions/3702675/how-to-print-the-full-traceback-without-halting-the-program/56199295#56199295
'''
self.in_queue.put(work)
return self.error_output
return ''.join(traceback.format_exception(
None, exception, exception.__traceback__)
)
def get_handle_output_result(self):
'''
:return: if a handle_output call has raised previously, return a tuple:
....
(work_function_input, work_function_return, exception_raised)
....
corresponding to the first such raise.
Otherwise, if a handle_output returned non-None, a tuple:
(work_function_input, work_function_return, handle_output_return)
Otherwise, None.
'''
return self.handle_output_result
def join(self):
'''
Request all threads to stop after they finish currently submitted work.
:return: same as submit()
:return: same as get_handle_output_result()
'''
for thread in range(self.nthreads):
self.in_queue.put(None)
for thread in self.threads:
thread.join()
return self.error_output
return self.get_handle_output_result()
def _func_runner(self, thread_id):
while True:
work = self.in_queue.get(block=True)
if work is None:
break
if self.thread_id_arg is not None:
work[self.thread_id_arg] = thread_id
try:
exception = None
out = self.func(**work)
except Exception as e:
exception = e
out = None
try:
handle_output_return = self.handle_output(work, out, exception)
except Exception as e:
with self.error_output_lock:
self.error_output = (work, out, e)
else:
if handle_output_return is not None:
with self.error_output_lock:
self.error_output = handle_output_return
finally:
self.in_queue.task_done()
def submit(
self,
work_function_input: Union[Dict,None] =None
):
'''
Submit work. Block if there is already enough work scheduled (~nthreads).
:return: the same as get_handle_output_result
'''
handle_output_result = self.get_handle_output_result()
if handle_output_result is not None:
if self.submit_raise_exit:
raise ThreadPoolExitException()
if self.submit_skip_exit:
return handle_output_result
if work_function_input is None:
work_function_input = {}
self.in_queue.put(work_function_input)
return handle_output_result
if __name__ == '__main__':
def func_maybe_raise(i):
def get_work(min_, max_):
'''
Generate simple range work for work_function.
'''
for i in range(min_, max_):
yield {'i': i}
def work_function_maybe_raise(i):
'''
The main function that will be evaluated.
@@ -165,17 +320,10 @@ if __name__ == '__main__':
time.sleep((abs(i) % 4) / 10.0)
return 10.0 / i
def func_get_thread(i, thread_id):
def work_function_get_thread(i, thread_id):
time.sleep((abs(i) % 4) / 10.0)
return thread_id
def get_work(min_, max_):
'''
Generate simple range work for my_func.
'''
for i in range(min_, max_):
yield {'i': i}
def handle_output_print(input, output, exception):
'''
Print outputs and exit immediately on failure.
@@ -200,13 +348,24 @@ if __name__ == '__main__':
def handle_output_raise(input, output, exception):
'''
Raise if input == 10, to test that execution
Raise if input == 0, to test that execution
stops nicely if this raises.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))
if input['i'] == 10:
if input['i'] == 0:
raise Exception
def handle_output_raise_exit_exception(input, output, exception):
'''
Return a ThreadPoolExitException() if input == -5.
Return the work_function exception if it raised.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))
if exception:
return exception
if output == 10.0 / -5:
return ThreadPoolExitException()
# CLI arguments.
argv_len = len(sys.argv)
if argv_len > 1:
@@ -215,55 +374,66 @@ if __name__ == '__main__':
nthreads = None
else:
nthreads = None
if argv_len > 2:
min_ = int(sys.argv[2])
else:
min_ = 1
if argv_len > 3:
max_ = int(sys.argv[3])
else:
max_ = 100
if argv_len > 4:
c = sys.argv[4][0]
handle_output_funtion_string = sys.argv[4]
else:
c = '0'
if c == '1':
handle_output = handle_output_print_no_exit
elif c == '2':
handle_output = handle_output_queue
elif c == '3':
handle_output = handle_output_raise
else:
handle_output = handle_output_print
handle_output_funtion_string = 'handle_output_print'
handle_output = eval(handle_output_funtion_string)
if argv_len > 5:
c = sys.argv[5][0]
work_function = work_function_get_thread
thread_id_arg = sys.argv[5]
else:
c = '0'
if c == '1':
my_func = func_get_thread
thread_id_arg = 'thread_id'
else:
my_func = func_maybe_raise
work_function = work_function_maybe_raise
thread_id_arg = None
# Action.
thread_pool = ThreadPool(
my_func,
handle_output,
nthreads,
thread_id_arg,
)
for work in get_work(min_, max_):
error = thread_pool.submit(work)
if error is not None:
break
error = thread_pool.join()
if error is not None:
print('error: {!r}'.format(error))
if handle_output is handle_output_raise_exit_exception:
# `with` version with implicit join and submit raise
# immediately when desired with ThreadPoolExitException.
#
# This is the more safe and convenient and DRY usage if
# you can use `with`, so prefer it generally.
with ThreadPool(
work_function,
handle_output,
nthreads,
thread_id_arg,
submit_raise_exit=True
) as my_thread_pool:
for work in get_work(min_, max_):
my_thread_pool.submit(work)
handle_output_result = my_thread_pool.get_handle_output_result()
else:
# Explicit error checking in submit loop to exit immediately
# on error.
my_thread_pool = ThreadPool(
work_function,
handle_output,
nthreads,
thread_id_arg,
)
for work_function_input in get_work(min_, max_):
handle_output_result = my_thread_pool.submit(work_function_input)
if handle_output_result is not None:
break
handle_output_result = my_thread_pool.join()
if handle_output_result is not None:
work_function_input, work_function_return, exception = handle_output_result
if type(exception) is ThreadPoolExitException:
print('Early exit requested by handle_output with ThreadPoolExitException:')
else:
print('work_function or handle_output raised:')
print(ThreadPool.exception_traceback_string(exception), end='')
print('work_function_input: {!r}'.format(work_function_input))
print('work_function_return: {!r}'.format(work_function_return))
if handle_output == handle_output_queue:
while not out_queue.empty():
print(out_queue.get())