diff --git a/build-baremetal b/build-baremetal index 50c6c41..cb08c91 100755 --- a/build-baremetal +++ b/build-baremetal @@ -109,36 +109,28 @@ Build the baremetal examples with crosstool-NG. with thread_pool.ThreadPool( self._build_one, nthreads=self.env['nproc'], + submit_raise_exit=self.env['quit_on_fail'], ) as my_thread_pool: - try: - for target in self.env['targets']: - for path, in_dirnames, in_filenames in self.sh.walk(target): - for in_filename in in_filenames: - in_ext = os.path.splitext(in_filename)[1] - if not in_ext in self.env['build_in_exts']: - continue - in_path = os.path.join(path, in_filename) - error = my_thread_pool.submit({ - 'cc_flags': cc_flags, - 'extra_deps': [ - self.env['baremetal_link_script'], - self.env['common_h'] - ], - 'extra_objs': [syscalls_obj], - 'extra_objs_baremetal_bootloader': [extra_obj_baremetal_bootloader], - 'extra_objs_lkmc_common': [extra_obj_lkmc_common], - 'in_path': in_path, - 'out_path': self.resolve_baremetal_executable(in_path), - }) - if error is not None: - raise common.ExitLoop() - except common.ExitLoop: - pass - error = my_thread_pool.get_error() - if error is not None: - print(error) - return 1 - + for target in self.env['targets']: + for path, in_dirnames, in_filenames in self.sh.walk(target): + for in_filename in in_filenames: + in_ext = os.path.splitext(in_filename)[1] + if not in_ext in self.env['build_in_exts']: + continue + in_path = os.path.join(path, in_filename) + my_thread_pool.submit({ + 'cc_flags': cc_flags, + 'extra_deps': [ + self.env['baremetal_link_script'], + self.env['common_h'] + ], + 'extra_objs': [syscalls_obj], + 'extra_objs_baremetal_bootloader': [extra_obj_baremetal_bootloader], + 'extra_objs_lkmc_common': [extra_obj_lkmc_common], + 'in_path': in_path, + 'out_path': self.resolve_baremetal_executable(in_path), + }) + return self._handle_thread_pool_errors(my_thread_pool) def get_build_dir(self): return self.env['baremetal_build_dir'] diff --git a/build-test-boot b/build-test-boot index 2cc5d9a..b10986a 100755 --- a/build-test-boot +++ b/build-test-boot @@ -3,7 +3,7 @@ # what to build depends on --size, which ./build does not support right now. # The best way to solve this is to move the dependency checking into the run # scripts, which will take a while to refactor. -set -eu +set -eux test_size=1 while [ $# -gt 0 ]; do case "$1" in diff --git a/build-userland b/build-userland index 9341be7..22c4c52 100755 --- a/build-userland +++ b/build-userland @@ -82,37 +82,32 @@ Default: build all examples that have their package dependencies met, e.g.: with thread_pool.ThreadPool( self._build_one, nthreads=self.env['nproc'], + submit_raise_exit=self.env['quit_on_fail'], ) as my_thread_pool: - try: - for target in self.env['targets']: - for path, in_dirnames, in_filenames in self.sh.walk(target): - for in_filename in in_filenames: - in_ext = os.path.splitext(in_filename)[1] - if not in_ext in self.env['build_in_exts']: - continue - in_path = os.path.join(path, in_filename) - error = my_thread_pool.submit({ - 'cc_flags': cc_flags, - 'extra_objs_lkmc_common': [extra_obj_lkmc_common], - 'extra_objs_userland_asm': [extra_obj_userland_asm], - 'in_path': in_path, - 'out_path': self.resolve_userland_executable(in_path), - }) - if error is not None: - raise common.ExitLoop() - except common.ExitLoop: - pass - error = my_thread_pool.get_error() - if error is not None: - print(error) - return 1 + for target in self.env['targets']: + for path, in_dirnames, in_filenames in self.sh.walk(target): + for in_filename in in_filenames: + in_ext = os.path.splitext(in_filename)[1] + if not in_ext in self.env['build_in_exts']: + continue + in_path = os.path.join(path, in_filename) + my_thread_pool.submit({ + 'cc_flags': cc_flags, + 'extra_objs_lkmc_common': [extra_obj_lkmc_common], + 'extra_objs_userland_asm': [extra_obj_userland_asm], + 'in_path': in_path, + 'out_path': self.resolve_userland_executable(in_path), + }) + exit_status = self._handle_thread_pool_errors(my_thread_pool) + if exit_status != 0: + return exit_status if not self.env['in_tree']: self.sh.copy_dir_if_update( srcdir=build_dir, destdir=self.env['out_rootfs_overlay_lkmc_dir'], filter_ext=self.env['userland_executable_ext'], ) - return 0 + return exit_status def clean(self): if self.env['in_tree']: diff --git a/common.py b/common.py index dcb5b27..f46eb9b 100644 --- a/common.py +++ b/common.py @@ -22,6 +22,7 @@ import signal import subprocess import sys import threading +from typing import Union import time import urllib import urllib.request @@ -30,6 +31,7 @@ from shell_helpers import LF import cli_function import path_properties import shell_helpers +import thread_pool common = sys.modules[__name__] @@ -265,6 +267,14 @@ TODO: implement fully, some stuff is escaping it currently. default=True, help='''\ Stop running at the first failed test. +''' + ) + self.add_argument( + '--show-cmds', + default=True, + help='''\ +Print the exact Bash command equivalents being run by this script. +Implied by --quiet. ''' ) self.add_argument( @@ -583,19 +593,40 @@ Incompatible archs are skipped. def __call__(self, *args, **kwargs): ''' - For Python code calls, in addition to base: + For Python code calls, in addition to base class behaviour: - - print the CLI equivalent of the call - - automatically forward common arguments + * print the CLI equivalent of the call + * automatically forward common arguments ''' print_cmd = ['./' + self.extra_config_params, LF] + if 'print_cmd_oneline' in kwargs: + force_oneline = kwargs['print_cmd_oneline'] + del kwargs['print_cmd_oneline'] + else: + force_oneline=False for line in self.get_cli(**kwargs): print_cmd.extend(line) print_cmd.append(LF) if not ('quiet' in kwargs and kwargs['quiet']): - shell_helpers.ShellHelpers().print_cmd(print_cmd) + shell_helpers.ShellHelpers().print_cmd( + print_cmd, + force_oneline=force_oneline + ) return super().__call__(**kwargs) + def _handle_thread_pool_errors(self, my_thread_pool): + handle_output_result = my_thread_pool.get_handle_output_result() + if handle_output_result is not None: + work_function_input, work_function_return, exception = handle_output_result + if not type(exception) is thread_pool.ThreadPoolExitException: + print('work_function or handle_output raised unexpectedly:') + print(thread_pool.ThreadPool.exception_traceback_string(exception), end='') + print('work_function_input: {}'.format(work_function_input)) + print('work_function_return: {}'.format(work_function_return)) + return 1 + else: + return 0 + def _init_env(self, env): ''' Update the kwargs from the command line with values derived from them. @@ -823,7 +854,10 @@ Incompatible archs are skipped. env['linux_image'] = env['lkmc_linux_image'] env['linux_config'] = join(env['linux_build_dir'], '.config') if env['emulator']== 'gem5': - env['userland_quit_cmd'] = './gem5_exit.sh' + env['userland_quit_cmd'] = join( + env['guest_lkmc_home'], + 'gem5_exit.sh' + ) else: env['userland_quit_cmd'] = join( env['guest_lkmc_home'], @@ -1141,11 +1175,17 @@ lunch aosp_{}-eng real_archs = consts['all_long_archs'] else: real_archs = env['archs'] - if env['all_emulators']: + real_all_emulators = env['all_emulators'] + if real_all_emulators: real_emulators = consts['all_long_emulators'] else: real_emulators = env['emulators'] return_value = 0 + if env['_args_given']['show_cmds']: + show_cmds = env['show_cmds'] + else: + show_cmds = not env['quiet'] + self.setup(env) try: for emulator in real_emulators: for arch in real_archs: @@ -1153,7 +1193,15 @@ lunch aosp_{}-eng arch = env['arch_short_to_long_dict'][arch] if emulator == 'native': if arch != env['host_arch']: - continue + if real_all_archs: + continue + else: + raise Exception('native emulator only supported in if target arch == host arch') + if env['userland'] is None: + if real_all_emulators: + continue + else: + raise Exception('native emulator only supported in user mode') if self.is_arch_supported(arch): if not env['dry_run']: start_time = time.time() @@ -1169,7 +1217,7 @@ lunch aosp_{}-eng self._init_env(self.env) self.sh = shell_helpers.ShellHelpers( dry_run=self.env['dry_run'], - quiet=self.env['quiet'], + quiet=(not show_cmds), ) self.setup_one() ret = self.timed_main() @@ -1317,6 +1365,14 @@ lunch aosp_{}-eng self.env['userland_executable_ext'], ) + def setup(self, env): + ''' + Similar to setup run before all timed_main are called. + + _init_env has not yet been called, so only primary CLI arguments may be used. + ''' + pass + def setup_one(self): ''' Run just before timed_main, after _init_env. @@ -1338,9 +1394,11 @@ lunch aosp_{}-eng ''' pass - def teardown(self): + def teardown(self) -> Union[None,int]: ''' Similar to setup, but run once after all timed_main are called. + + :return: if not None, the return integer gets used as the exit status of the program. ''' pass @@ -1602,6 +1660,7 @@ class TestCliFunction(LkmcCliFunction): def __init__(self, *args, **kwargs): defaults = { + 'quit_on_fail': False, 'show_time': False, } if 'defaults' in kwargs: @@ -1610,6 +1669,17 @@ class TestCliFunction(LkmcCliFunction): super().__init__(*args, **kwargs) self.test_results = queue.Queue() + def handle_output_function( + self, + work_function_input, + work_function_return, + work_function_exception + ): + if work_function_exception is not None: + return work_function_exception + if work_function_return.status != TestStatus.PASS: + return thread_pool.ThreadPoolExitException() + def run_test( self, run_obj, @@ -1624,17 +1694,33 @@ class TestCliFunction(LkmcCliFunction): More complex tests might need to run the steps separately, e.g. gdb tests must run multiple commands: one for the run and one GDB. + This function is meant to be called from threads. In particular, + those threads have to cross over archs: the original motivation is to parallelize + super slow gem5 boot tests. Therefore, we cannot use self.env['arch'] and selv.env['emulator'] + in this function or callees! + + Ideally, we should make this static and pass all arguments to the call... but lazy to refactor. + I have the feeling I will regret this one day down the line. + :param run_obj: callable object :param run_args: arguments to be passed to the runnable object :param test_id: test identifier, to be added in addition to of arch and emulator ids :param thread_id: which thread the test is running under ''' - if run_obj.is_arch_supported(self.env['arch']): - if run_args is None: - run_args = {} - run_args['run_id'] = thread_id - test_id_string = self.test_setup(test_id) - exit_status = run_obj(**run_args) + if run_obj.is_arch_supported(run_args['archs'][0]): + cur_run_args = { + 'background': True, + 'ctrl_c_host': True, + 'print_cmd_oneline': True, + 'run_id': thread_id, + 'show_cmds': False, + 'show_stdout': False, + 'show_time': False, + } + if run_args is not None: + cur_run_args.update(run_args) + test_id_string = self.test_setup(run_args, test_id) + exit_status = run_obj(**cur_run_args) return self.test_teardown( run_obj, exit_status, @@ -1642,11 +1728,11 @@ class TestCliFunction(LkmcCliFunction): expected_exit_status=expected_exit_status ) - def test_setup(self, test_id): - test_id_string = '{} {}'.format(self.env['emulator'], self.env['arch']) - if test_id is not None: + def test_setup(self, run_args, test_id): + test_id_string = '{} {}'.format(run_args['emulators'][0], run_args['archs'][0]) + if test_id is not None and str(test_id) != '': test_id_string += ' {}'.format(test_id) - self.log_info('test_id {}'.format(test_id_string), flush=True) + self.log_info('Starting: {}'.format(repr(test_id_string)), flush=True) return test_id_string def test_teardown( @@ -1661,24 +1747,24 @@ class TestCliFunction(LkmcCliFunction): reason = '' if not self.env['dry_run']: if exit_status == expected_exit_status: - test_result = TestStatus.PASS + test_status = TestStatus.PASS else: - test_result = TestStatus.FAIL + test_status = TestStatus.FAIL reason = 'wrong exit status, got {} expected {}'.format( exit_status, expected_exit_status ) ellapsed_seconds = run_obj.ellapsed_seconds else: - test_result = TestStatus.PASS + test_status = TestStatus.PASS ellapsed_seconds = 0 test_result = TestResult( test_id_string, - test_result, + test_status, ellapsed_seconds, reason ) - self.log_info(test_result) + self.log_info('Result: ' + str(test_result)) self.test_results.put(test_result) return test_result @@ -1686,7 +1772,7 @@ class TestCliFunction(LkmcCliFunction): ''' :return: 1 if any test failed, 0 otherwise ''' - self.log_info('\nTest result summary') + self.log_info('\nTest result summary:') passes = [] fails = [] while not self.test_results.empty(): diff --git a/path_properties.py b/path_properties.py index 2a86fcd..767aacb 100644 --- a/path_properties.py +++ b/path_properties.py @@ -68,12 +68,7 @@ class PathProperties: 'skip_run_unclassified': False, # Aruments added automatically to run when running tests, # but not on manual running. - 'test_run_args': { - 'ctrl_c_host': True, - 'show_stdout': False, - 'show_time': False, - 'background': True, - }, + 'test_run_args': {}, } ''' @@ -142,6 +137,13 @@ class PathProperties: ) ) + def _update_dict(self, other_tmp_properties, key): + if key in self.properties and key in other_tmp_properties: + other_tmp_properties[key] = { + **self.properties[key], + **other_tmp_properties[key] + } + def _update_list(self, other_tmp_properties, key): if key in self.properties and key in other_tmp_properties: other_tmp_properties[key] = \ @@ -153,11 +155,7 @@ class PathProperties: self._update_list(other_tmp_properties, 'cc_flags') self._update_list(other_tmp_properties, 'cc_flags_after') self._update_list(other_tmp_properties, 'extra_objs') - if 'test_run_args' in self.properties and 'test_run_args' in other_tmp_properties: - other_tmp_properties['test_run_args'] = { - **self.properties['test_run_args'], - **other_tmp_properties['test_run_args'] - } + self._update_dict(other_tmp_properties, 'test_run_args') return self.properties.update(other_tmp_properties) class PrefixTree: diff --git a/run b/run index d80a4c1..66f82ac 100755 --- a/run +++ b/run @@ -231,9 +231,9 @@ Setup a kernel init parameter that makes the emulator quit immediately after boo help='''\ Output directly to the terminal, don't pipe to tee as the default. With this, we don't not save the output to a file as is done by default, -but we are able to do things that require not having a pipe suh as you to -using debuggers. This option issSet automatically by --debug-vm, but you still need -it to debug gem5 Python scripts with pdb. +but we are able to do things that require not having a pipe such as +using debuggers. This option is set automatically by --debug-vm, but you +still need it to debug gem5 Python scripts with pdb. ''' ) self.add_argument( @@ -309,8 +309,6 @@ Extra options to append at the end of the emulator command line. ) def timed_main(self): - if self.env['emulator'] == 'native' and self.env['userland'] is None: - raise Exception('native emulator only supported in user mode') show_stdout = self.env['show_stdout'] # Common qemu / gem5 logic. # nokaslr: @@ -481,7 +479,8 @@ Extra options to append at the end of the emulator command line. if self.env['arch'] == 'x86_64': if self.env['kvm']: cmd.extend(['--cpu-type', 'X86KvmCPU', LF]) - cmd.extend(['--command-line', 'earlyprintk={} lpj=7999923 root=/dev/sda {}'.format(console, kernel_cli), LF]) + if self.env['baremetal'] is None: + cmd.extend(['--command-line', 'earlyprintk={} lpj=7999923 root=/dev/sda {}'.format(console, kernel_cli), LF]) elif self.env['is_arm']: if self.env['kvm']: cmd.extend(['--cpu-type', 'ArmV8KvmCPU', LF]) @@ -492,14 +491,24 @@ Extra options to append at the end of the emulator command line. cmd.extend([ # TODO why is it mandatory to pass mem= here? Not true for QEMU. # Anything smaller than physical blows up as expected, but why can't it auto-detect the right value? - '--command-line', 'earlyprintk=pl011,0x1c090000 lpj=19988480 rw loglevel=8 mem={} root=/dev/sda {}'.format(memory, kernel_cli), LF, '--machine-type', self.env['machine'], LF, ]) + if self.env['baremetal'] is None: + cmd.extend(['--command-line', 'earlyprintk=pl011,0x1c090000 lpj=19988480 rw loglevel=8 mem={} root=/dev/sda {}'.format(memory, kernel_cli), LF]) dtb = None if self.env['dtb'] is not None: dtb = self.env['dtb'] elif self.env['dp650']: - dtb = os.path.join(self.env['gem5_system_dir'], 'arm', 'dt', 'armv{}_gem5_v1_{}{}cpu.dtb'.format(self.env['armv'], dp650_cmd, self.env['cpus'])) + dtb = os.path.join( + self.env['gem5_system_dir'], + 'arm', + 'dt', + 'armv{}_gem5_v1_{}{}cpu.dtb'.format( + self.env['armv'], + dp650_cmd, + self.env['cpus'] + ) + ) if dtb is not None: cmd.extend(['--dtb-filename', dtb, LF]) if self.env['baremetal'] is None: @@ -521,7 +530,13 @@ Extra options to append at the end of the emulator command line. cpt_dir = self.gem5_list_checkpoint_dirs()[-self.env['gem5_restore']] extra_emulator_args.extend(['--restore-from', os.path.join(self.env['m5out_dir'], cpt_dir)]) cmd.extend([ - os.path.join(self.env['gem5_source_dir'], 'configs', 'example', 'arm', 'fs_bigLITTLE.py'), LF, + os.path.join( + self.env['gem5_source_dir'], + 'configs', + 'example', + 'arm', + 'fs_bigLITTLE.py' + ), LF, '--big-cpus', '2', LF, '--cpu-type', cpu_type, LF, '--disk', self.env['disk_image'], LF, @@ -529,7 +544,15 @@ Extra options to append at the end of the emulator command line. '--little-cpus', '2', LF, ]) if self.env['dtb']: - cmd.extend(['--dtb', os.path.join(self.env['gem5_system_dir'], 'arm', 'dt', 'armv8_gem5_v1_big_little_2_2.dtb'), NL]) + cmd.extend([ + '--dtb', + os.path.join(self.env['gem5_system_dir'], + 'arm', + 'dt', + 'armv8_gem5_v1_big_little_2_2.dtb' + ), + LF + ]) if self.env['gdb_wait']: # https://stackoverflow.com/questions/49296092/how-to-make-gem5-wait-for-gdb-to-connect-to-reliably-break-at-start-kernel-of-th cmd.extend(['--param', 'system.cpu[0].wait_for_remote_gdb = True', LF]) @@ -570,7 +593,10 @@ Extra options to append at the end of the emulator command line. serial_monitor = ['-serial', serial, LF] if self.env['kvm']: extra_emulator_args.extend(['-enable-kvm', LF]) - extra_emulator_args.extend(['-serial', 'tcp::{},server,nowait'.format(self.env['extra_serial_port']), LF]) + extra_emulator_args.extend([ + '-serial', + 'tcp::{},server,nowait'.format(self.env['extra_serial_port']), LF + ]) virtfs_data = [ (self.env['p9_dir'], 'host_data'), (self.env['out_dir'], 'host_out'), diff --git a/shell_helpers.py b/shell_helpers.py index 40e84de..9b6580b 100644 --- a/shell_helpers.py +++ b/shell_helpers.py @@ -11,6 +11,7 @@ import stat import subprocess import sys import threading +from typing import List, Union import urllib.request class LF: @@ -85,10 +86,22 @@ class ShellHelpers: os.chmod(path, new_mode) @staticmethod - def cmd_to_string(cmd, cwd=None, extra_env=None, extra_paths=None): + def cmd_to_string( + cmd: List[Union[str, LF]], + cwd=None, + extra_env=None, + extra_paths=None, + force_oneline: bool =False, + ): ''' Format a command given as a list of strings so that it can be viewed nicely and executed by bash directly and print it to stdout. + + If cmd contains: + + * no LF, then newlines are added after every word + * exactly one LF at the end, then no newlines are added + * otherwise: newlines are added exactly at each LF ''' last_newline = ' \\\n' newline_separator = last_newline + ' ' @@ -105,14 +118,22 @@ class ShellHelpers: newline_count = 0 for arg in cmd: if arg == LF: - cmd_quote.append(arg) - newline_count += 1 + if not force_oneline: + cmd_quote.append(arg) + newline_count += 1 else: cmd_quote.append(shlex.quote(arg)) - if newline_count > 0: - cmd_quote = [' '.join(list(y)) for x, y in itertools.groupby(cmd_quote, lambda z: z == LF) if not x] + if force_oneline or newline_count > 0: + cmd_quote = [ + ' '.join(list(y)) + for x, y in itertools.groupby( + cmd_quote, + lambda z: z == LF + ) + if not x + ] out.extend(cmd_quote) - if newline_count == 1 and cmd[-1] == LF: + if force_oneline or newline_count == 1 and cmd[-1] == LF: ending = '' else: ending = last_newline + ';' @@ -157,20 +178,31 @@ class ShellHelpers: else: shutil.copy2(src, dest) - def print_cmd(self, cmd, cwd=None, cmd_file=None, extra_env=None, extra_paths=None): + def print_cmd( + self, + cmd, + cwd=None, + cmd_file=None, + extra_env=None, + extra_paths=None, + force_oneline=False, + ): ''' Print cmd_to_string to stdout. Optionally save the command to cmd_file file, and add extra_env environment variables to the command generated. - - If cmd contains at least one LF, newlines are only added on LF. - Otherwise, newlines are added automatically after every word. ''' if type(cmd) is str: cmd_string = cmd else: - cmd_string = self.cmd_to_string(cmd, cwd=cwd, extra_env=extra_env, extra_paths=extra_paths) + cmd_string = self.cmd_to_string( + cmd, + cwd=cwd, + extra_env=extra_env, + extra_paths=extra_paths, + force_oneline=force_oneline, + ) if not self.quiet: self._print_thread_safe('+ ' + cmd_string) if cmd_file is not None: @@ -371,3 +403,29 @@ class ShellHelpers: if not self.dry_run: with open(path, mode) as f: f.write(string) + +if __name__ == '__main__': + shell_helpers = ShellHelpers() + if 'cmd_to_string': + # Default. + assert shell_helpers.cmd_to_string(['cmd']) == 'cmd \\\n;' + assert shell_helpers.cmd_to_string(['cmd', 'arg1']) == 'cmd \\\n arg1 \\\n;' + assert shell_helpers.cmd_to_string(['cmd', 'arg1', 'arg2']) == 'cmd \\\n arg1 \\\n arg2 \\\n;' + + # Argument with a space gets escaped. + assert shell_helpers.cmd_to_string(['cmd', 'arg1 arg2']) == "cmd \\\n 'arg1 arg2' \\\n;" + + # Ending in LF with no other LFs get separated only by spaces. + assert shell_helpers.cmd_to_string(['cmd', LF]) == 'cmd' + assert shell_helpers.cmd_to_string(['cmd', 'arg1', LF]) == 'cmd arg1' + assert shell_helpers.cmd_to_string(['cmd', 'arg1', 'arg2', LF]) == 'cmd arg1 arg2' + + # More than one LF adds newline separators at each LF. + assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF]) == 'cmd \\\n arg1 \\\n;' + assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF, 'arg2', LF]) == 'cmd \\\n arg1 \\\n arg2 \\\n;' + assert shell_helpers.cmd_to_string(['cmd', LF, 'arg1', 'arg2', LF]) == 'cmd \\\n arg1 arg2 \\\n;' + + # force_oneline separates everything simply by spaces. + assert \ + shell_helpers.cmd_to_string(['cmd', LF, 'arg1', LF, 'arg2', LF], force_oneline=True) \ + == 'cmd arg1 arg2' diff --git a/test-baremetal b/test-baremetal index 5802e02..f5224b7 100755 --- a/test-baremetal +++ b/test-baremetal @@ -32,43 +32,32 @@ If given, run only the given tests. Otherwise, run all tests. rootdir_abs_len = len(self.env['root_dir']) with thread_pool.ThreadPool( self.run_test, + handle_output=self.handle_output_function, nthreads=self.env['nproc'], thread_id_arg='thread_id', + submit_raise_exit=self.env['quit_on_fail'], ) as my_thread_pool: - try: - for test in self.env['tests']: - for path, in_dirnames, in_filenames in self.sh.walk(test): - path_abs = os.path.abspath(path) - dirpath_relative_root = path_abs[rootdir_abs_len + 1:] - for in_filename in in_filenames: - if os.path.splitext(in_filename)[1] in (self.env['c_ext'], self.env['asm_ext']): - path_relative_root = os.path.join(dirpath_relative_root, in_filename) - my_path_properties = path_properties.get(path_relative_root) - if my_path_properties.should_be_tested(self.env): - cur_run_args = run_args.copy() - cur_run_args.update({ - 'baremetal': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()), - }) - cur_run_args.update(my_path_properties['test_run_args']) - test_args = { - 'expected_exit_status': my_path_properties['exit_status'], - 'run_args': cur_run_args, - 'run_obj': lkmc.import_path.import_path_main('run'), - 'test_id': path_relative_root, - } - error = my_thread_pool.submit(test_args) - if error is not None: - if self.env['quit_on_fail']: - raise common.ExitLoop() - - except common.ExitLoop: - pass - error = my_thread_pool.get_error() - if error is not None: - print(error) - return 1 - else: - return 0 + for test in self.env['tests']: + for path, in_dirnames, in_filenames in self.sh.walk(test): + path_abs = os.path.abspath(path) + dirpath_relative_root = path_abs[rootdir_abs_len + 1:] + for in_filename in in_filenames: + if os.path.splitext(in_filename)[1] in (self.env['c_ext'], self.env['asm_ext']): + path_relative_root = os.path.join(dirpath_relative_root, in_filename) + my_path_properties = path_properties.get(path_relative_root) + if my_path_properties.should_be_tested(self.env): + cur_run_args = run_args.copy() + cur_run_args.update({ + 'baremetal': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()), + }) + cur_run_args.update(my_path_properties['test_run_args']) + my_thread_pool.submit({ + 'expected_exit_status': my_path_properties['exit_status'], + 'run_args': cur_run_args, + 'run_obj': lkmc.import_path.import_path_main('run'), + 'test_id': path_relative_root, + }) + return self._handle_thread_pool_errors(my_thread_pool) if __name__ == '__main__': Main().cli() diff --git a/test-boot b/test-boot index 1e5ff33..d9ba813 100755 --- a/test-boot +++ b/test-boot @@ -2,6 +2,7 @@ import common import lkmc.import_path +import thread_pool import shell_helpers from shell_helpers import LF @@ -21,14 +22,24 @@ See ./test --help for --size. ''' ) - def _bench(self, **kwargs): + def _bench(self, **run_args): + run_obj = lkmc.import_path.import_path_main('run') words = [] - for line in self.run.get_cli(**kwargs): + test_id_args = run_args.copy() + del test_id_args['run_id'] + for line in run_obj.get_cli(**test_id_args): words.extend(line) - extra_params = shell_helpers.ShellHelpers().cmd_to_string(words + [LF]) - run_args = kwargs.copy() - run_args.update(self.common_args) - self.run_test(self.run, run_args, extra_params) + test_id = shell_helpers.ShellHelpers().cmd_to_string(words, force_oneline=True) + return self.run_test(run_obj, run_args, test_id) + + def setup(self, env): + self.my_thread_pool = thread_pool.ThreadPool( + self._bench, + handle_output=self.handle_output_function, + nthreads=env['nproc'], + thread_id_arg='run_id', + submit_skip_exit=env['quit_on_fail'], + ) def timed_main(self): # TODO bring this benchmark code back to life. Likely should go inside run with an option @@ -46,19 +57,20 @@ See ./test --help for --size. #) # #rm -f "${self.env['test_boot_benchmark_file']}" - self.run = lkmc.import_path.import_path_main('run') - self.common_args = self.get_common_args() - self.common_args['ctrl_c_host'] = True - self.common_args['quit_after_boot'] = True + common_args = self.get_common_args() + common_args['ctrl_c_host'] = True + common_args['quit_after_boot'] = True + # To see it blow up during development. + # self.common_args['eval'] = 'insmod /lkmc/panic.ko' if (self.env['emulator'] == 'qemu' or (self.env['emulator'] == 'gem5' and self.env['size'] >= 2)): - self._bench() + self.my_thread_pool.submit(common_args) if self.env['host_arch'] == self.env['arch']: # TODO: find out why it fails. if self.env['emulator'] != 'gem5': - self._bench(kvm=True) + self.my_thread_pool.submit({**common_args, **{'kvm': True}}) if self.env['emulator'] == 'qemu' and self.env['size'] >= 2: - self._bench(trace='exec_tb') + self.my_thread_pool.submit({**common_args, **{'trace': 'exec_tb'}}) if self.env['emulator'] == 'gem5' and self.env['size'] >= 3: if self.env['arch'] == 'x86_64': cpu_types = [ @@ -71,23 +83,28 @@ See ./test --help for --size. 'HPI', ] for cpu_type in cpu_types: - self._bench( - extra_emulator_args=[ - '--cpu-type', cpu_type, - '--caches', - '--l2cache', - '--l1d_size', '1024kB', - '--l1i_size', '1024kB', - '--l2_size', '1024kB', - '--l3_size', '1024kB', + self.my_thread_pool.submit({**common_args, **{ + 'extra_emulator_args': [ + '--cpu-type', cpu_type, LF, + '--caches', LF, + '--l2cache', LF, + '--l1d_size', '1024kB', LF, + '--l1i_size', '1024kB', LF, + '--l2_size', '1024kB', LF, + '--l3_size', '1024kB', LF, ], - ) + }}) if self.env['arch'] == 'aarch64': # Do a fuller testing for aarch64. for build_type in ['debug', 'fast']: - self._bench(gem5_build_type=build_type) + self.my_thread_pool.submit({**common_args, **{'gem5_build_type': build_type}}) # Requires patching the executable. - # self._bench(gem5_script='biglittle') + # self.my_thread_pool.submit({{**common_args, 'gem5_script': 'biglittle'}}) + + def teardown(self): + self.my_thread_pool.join() + self._handle_thread_pool_errors(self.my_thread_pool) + return super().teardown() if __name__ == '__main__': Main().cli() diff --git a/test-gdb b/test-gdb index 27af259..51978f6 100755 --- a/test-gdb +++ b/test-gdb @@ -57,10 +57,10 @@ found by searching for the Python test files. test_source_base = os.path.relpath(base, self.env['root_dir']) common_args = self.get_common_args() common_args['baremetal'] = test_source_base + ext - test_id_string = self.test_setup(test_source_base) run_args = common_args.copy() run_args['gdb_wait'] = True run_args['background'] = True + test_id_string = self.test_setup(run_args, test_source_base) run_thread = threading.Thread(target=lambda: run(**run_args)) run_thread.start() gdb_args = common_args.copy() diff --git a/test-user-mode b/test-user-mode index 8776f5d..a811df0 100755 --- a/test-user-mode +++ b/test-user-mode @@ -40,44 +40,35 @@ If given, run only the given tests. Otherwise, run all tests. rootdir_abs_len = len(self.env['root_dir']) with thread_pool.ThreadPool( self.run_test, + handle_output=self.handle_output_function, nthreads=self.env['nproc'], thread_id_arg='thread_id', + submit_raise_exit=self.env['quit_on_fail'], ) as my_thread_pool: - try: - for test in self.env['tests']: - for path, in_dirnames, in_filenames in self.sh.walk(test): - path_abs = os.path.abspath(path) - dirpath_relative_root = path_abs[rootdir_abs_len + 1:] - for in_filename in in_filenames: - if os.path.splitext(in_filename)[1] in self.env['build_in_exts']: - path_relative_root = os.path.join(dirpath_relative_root, in_filename) - my_path_properties = path_properties.get(path_relative_root) - if my_path_properties.should_be_tested(self.env): - cur_run_args = run_args.copy() - cur_run_args.update({ - 'userland': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()), - }) - cur_run_args.update(my_path_properties['test_run_args']) - run_test_args = { - 'expected_exit_status': my_path_properties['exit_status'], - 'run_args': cur_run_args, - 'run_obj': lkmc.import_path.import_path_main('run'), - 'test_id': path_relative_root, - } - if my_path_properties['receives_signal']: - run_test_args['expected_exit_status'] = 128 - my_path_properties['exit_status'] - error = my_thread_pool.submit(run_test_args) - if error is not None: - if self.env['quit_on_fail']: - raise common.ExitLoop() - except common.ExitLoop: - pass - error = my_thread_pool.get_error() - if error is not None: - print(error) - return 1 - else: - return 0 + for test in self.env['tests']: + for path, in_dirnames, in_filenames in self.sh.walk(test): + path_abs = os.path.abspath(path) + dirpath_relative_root = path_abs[rootdir_abs_len + 1:] + for in_filename in in_filenames: + if os.path.splitext(in_filename)[1] in self.env['build_in_exts']: + path_relative_root = os.path.join(dirpath_relative_root, in_filename) + my_path_properties = path_properties.get(path_relative_root) + if my_path_properties.should_be_tested(self.env): + cur_run_args = run_args.copy() + cur_run_args.update({ + 'userland': os.path.relpath(os.path.join(path_abs, in_filename), os.getcwd()), + }) + cur_run_args.update(my_path_properties['test_run_args']) + run_test_args = { + 'expected_exit_status': my_path_properties['exit_status'], + 'run_args': cur_run_args, + 'run_obj': lkmc.import_path.import_path_main('run'), + 'test_id': path_relative_root, + } + if my_path_properties['receives_signal']: + run_test_args['expected_exit_status'] = 128 - my_path_properties['exit_status'] + my_thread_pool.submit(run_test_args) + return self._handle_thread_pool_errors(my_thread_pool) if __name__ == '__main__': Main().cli() diff --git a/thread_pool.py b/thread_pool.py index 5b0b677..516790b 100644 --- a/thread_pool.py +++ b/thread_pool.py @@ -1,11 +1,26 @@ #!/usr/bin/env python3 +''' +This file is MIT Licensed because I'm posting it on Stack Overflow: +https://stackoverflow.com/questions/19369724/the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676 +''' + from typing import Any, Callable, Dict, Iterable, Union import os import queue import sys import threading import time +import traceback + +class ThreadPoolExitException(Exception): + ''' + An object of this class may be raised by output_handler_function to + request early termination. + + It is also raised by submit() if submit_raise_exit=True. + ''' + pass class ThreadPool: ''' @@ -19,65 +34,153 @@ class ThreadPool: * queue sizes closely follow number of threads * if an exception happens, optionally stop soon afterwards - Functional form and further discussion at: - https://stackoverflow.com/questions/19369724/the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676 - This class form allows to use your own while loops with submit(). - Quick test with: + Exit soon after the first failure happens: .... - python3 thread_pool.py 2 -10 20 0 - python3 thread_pool.py 2 -10 20 1 - python3 thread_pool.py 2 -10 20 2 - python3 thread_pool.py 2 -10 20 3 - python3 thread_pool.py 2 -10 20 0 1 + python3 thread_pool.py 2 -10 20 handle_output_print .... - These ensure that execution stops neatly on error. + Sample output: + + .... + {'i': -9} -1.1111111111111112 None + {'i': -8} -1.25 None + {'i': -10} -1.0 None + {'i': -6} -1.6666666666666667 None + {'i': -7} -1.4285714285714286 None + {'i': -4} -2.5 None + {'i': -5} -2.0 None + {'i': -2} -5.0 None + {'i': -3} -3.3333333333333335 None + {'i': 0} None ZeroDivisionError('float division by zero') + {'i': -1} -10.0 None + {'i': 1} 10.0 None + {'i': 2} 5.0 None + work_function or handle_output raised: + Traceback (most recent call last): + File "thread_pool.py", line 181, in _func_runner + work_function_return = self.work_function(**work_function_input) + File "thread_pool.py", line 281, in work_function_maybe_raise + return 10.0 / i + ZeroDivisionError: float division by zero + work_function_input: {'i': 0} + work_function_return: None + .... + + Don't exit after first failure, run until end: + + .... + python3 thread_pool.py 2 -10 20 handle_output_print_no_exit + .... + + Store results in a queue for later inspection instead of printing immediately, + then print everything at the end: + + .... + python3 thread_pool.py 2 -10 20 handle_output_queue + .... + + Exit soon after the handle_output raise. + + .... + python3 thread_pool.py 2 -10 20 handle_output_raise + .... + + Relying on this interface to abort execution is discouraged, this should + usually only happen due to a programming error in the handler. + + Test that the argument called "thread_id" is passed to work_function and printed: + + .... + python3 thread_pool.py 2 -10 20 handle_output_print thread_id + .... + + Test with, ThreadPoolExitException and submit_raise_exit=True, same behaviour handle_output_print + except for the different exit cause report: + + .... + python3 thread_pool.py 2 -10 20 handle_output_raise_exit_exception + .... ''' def __init__( self, - func: Callable, + work_function: Callable, handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None, nthreads: Union[int,None] = None, thread_id_arg: Union[str,None] = None, + submit_raise_exit: bool = False, + submit_skip_exit: bool = False, ): ''' Start in a thread pool immediately. join() must be called afterwards at some point. - :param func: main work function to be evaluated. - :param handle_output: called on func return values as they + :param work_function: main work function to be evaluated. + :param handle_output: called on work_function return values as they are returned. - Signature is: handle_output(input, output, exception) where: + The function signature is: - * input: input given to func - * output: return value of func - * exception: the exception that func raised, or None otherwise + .... + handle_output( + work_function_input: Union[Dict,None], + work_function_return, + work_function_exception: Exception + ) -> Union[Exception,None] + .... - If this function returns non-None or raises, stop feeding - new input and exit ASAP when all currently running threads - have finished. + where work_function_exception the exception that work_function raised, + or None otherwise - Default: a handler that does nothing and just exits on exception. + The first non-None return value of a call to this function is returned by + submit(), get_handle_output_result() and join(). + + The intended semantic for this, is to return: + + * on success: + ** None to continue execution + ** ThreadPoolExitException() to request stop execution + * if work_function_input or work_function_exception raise: + ** the exception raised + + The ThreadPool user can then optionally terminate execution early on error + or request with either: + + * an explicit submit() return value check + break if a submit loop is used + * `with` + submit_raise_exit=True + + Default: a handler that just returns `exception`, which can normally be used + by the submit loop to detect an error and exit immediately. :param nthreads: number of threads to use. Default: nproc. - :param thread_id_arg: if not None, set the argument of func with this name + :param thread_id_arg: if not None, set the argument of work_function with this name to a 0-indexed thread ID. This allows function calls to coordinate usage of external resources such as files or ports. + :param submit_raise_exit: if True, submit() raises ThreadPoolExitException() if + get_handle_output_result() is not None. + :param submit_skip_exit: if True, submit() does nothing if + get_handle_output_result() is not None. + + You should avoid this interface if + you can use use submit_raise_exit with `with` instead ideally. + + However, when you can't work with with and are in a deeply nested loop, + it might just be easier to set this. ''' - self.func = func + self.work_function = work_function if handle_output is None: handle_output = lambda input, output, exception: exception self.handle_output = handle_output if nthreads is None: nthreads = len(os.sched_getaffinity(0)) self.thread_id_arg = thread_id_arg + self.submit_raise_exit = submit_raise_exit + self.submit_skip_exit = submit_skip_exit self.nthreads = nthreads - self.error_output = None - self.error_output_lock = threading.Lock() + self.handle_output_result = None + self.handle_output_result_lock = threading.Lock() self.in_queue = queue.Queue(maxsize=nthreads) self.threads = [] for i in range(self.nthreads): @@ -94,69 +197,121 @@ class ThreadPool: This is cool because it automatically ends the loop if an exception occurs. - But don't forget that errors may happen after the last submit is called, so you - likely want to check for that with get_error after the with. - - get_error() returns the same as the explicit join(). + But don't forget that errors may happen after the last submit was called, so you + likely want to check for that with get_handle_output_result() after the with. ''' return self - def __exit__(self, type, value, traceback): + def __exit__(self, exception_type, exception_value, exception_traceback): self.join() + return exception_type is ThreadPoolExitException - def get_error(self): - return self.error_output + def _func_runner(self, thread_id): + while True: + work_function_input = self.in_queue.get(block=True) + if work_function_input is None: + break + if self.thread_id_arg is not None: + work_function_input[self.thread_id_arg] = thread_id + try: + work_function_exception = None + work_function_return = self.work_function(**work_function_input) + except Exception as e: + work_function_exception = e + work_function_return = None + handle_output_exception = None + try: + handle_output_return = self.handle_output( + work_function_input, + work_function_return, + work_function_exception + ) + except Exception as e: + handle_output_exception = e + handle_output_result = None + if handle_output_exception is not None: + handle_output_result = handle_output_exception + elif handle_output_return is not None: + handle_output_result = handle_output_return + if handle_output_result is not None and self.handle_output_result is None: + with self.handle_output_result_lock: + self.handle_output_result = ( + work_function_input, + work_function_return, + handle_output_result + ) + self.in_queue.task_done() - def submit(self, work): + @staticmethod + def exception_traceback_string(exception): ''' - Submit work. Block if there is already enough work scheduled (~nthreads). - - :return: if an error occurred in some previously executed thread, the error. - Otherwise, None. This allows the caller to stop submitting further - work if desired. + Helper to get the traceback from an exception object. + This is usually what you want to print if an error happens in a thread: + https://stackoverflow.com/questions/3702675/how-to-print-the-full-traceback-without-halting-the-program/56199295#56199295 ''' - self.in_queue.put(work) - return self.error_output + return ''.join(traceback.format_exception( + None, exception, exception.__traceback__) + ) + + def get_handle_output_result(self): + ''' + :return: if a handle_output call has raised previously, return a tuple: + + .... + (work_function_input, work_function_return, exception_raised) + .... + + corresponding to the first such raise. + + Otherwise, if a handle_output returned non-None, a tuple: + + (work_function_input, work_function_return, handle_output_return) + + Otherwise, None. + ''' + return self.handle_output_result def join(self): ''' Request all threads to stop after they finish currently submitted work. - :return: same as submit() + :return: same as get_handle_output_result() ''' for thread in range(self.nthreads): self.in_queue.put(None) for thread in self.threads: thread.join() - return self.error_output + return self.get_handle_output_result() - def _func_runner(self, thread_id): - while True: - work = self.in_queue.get(block=True) - if work is None: - break - if self.thread_id_arg is not None: - work[self.thread_id_arg] = thread_id - try: - exception = None - out = self.func(**work) - except Exception as e: - exception = e - out = None - try: - handle_output_return = self.handle_output(work, out, exception) - except Exception as e: - with self.error_output_lock: - self.error_output = (work, out, e) - else: - if handle_output_return is not None: - with self.error_output_lock: - self.error_output = handle_output_return - finally: - self.in_queue.task_done() + def submit( + self, + work_function_input: Union[Dict,None] =None + ): + ''' + Submit work. Block if there is already enough work scheduled (~nthreads). + + :return: the same as get_handle_output_result + ''' + handle_output_result = self.get_handle_output_result() + if handle_output_result is not None: + if self.submit_raise_exit: + raise ThreadPoolExitException() + if self.submit_skip_exit: + return handle_output_result + if work_function_input is None: + work_function_input = {} + self.in_queue.put(work_function_input) + return handle_output_result if __name__ == '__main__': - def func_maybe_raise(i): + def get_work(min_, max_): + ''' + Generate simple range work for work_function. + ''' + for i in range(min_, max_): + yield {'i': i} + + def work_function_maybe_raise(i): ''' The main function that will be evaluated. @@ -165,17 +320,10 @@ if __name__ == '__main__': time.sleep((abs(i) % 4) / 10.0) return 10.0 / i - def func_get_thread(i, thread_id): + def work_function_get_thread(i, thread_id): time.sleep((abs(i) % 4) / 10.0) return thread_id - def get_work(min_, max_): - ''' - Generate simple range work for my_func. - ''' - for i in range(min_, max_): - yield {'i': i} - def handle_output_print(input, output, exception): ''' Print outputs and exit immediately on failure. @@ -200,13 +348,24 @@ if __name__ == '__main__': def handle_output_raise(input, output, exception): ''' - Raise if input == 10, to test that execution + Raise if input == 0, to test that execution stops nicely if this raises. ''' print('{!r} {!r} {!r}'.format(input, output, exception)) - if input['i'] == 10: + if input['i'] == 0: raise Exception + def handle_output_raise_exit_exception(input, output, exception): + ''' + Return a ThreadPoolExitException() if input == -5. + Return the work_function exception if it raised. + ''' + print('{!r} {!r} {!r}'.format(input, output, exception)) + if exception: + return exception + if output == 10.0 / -5: + return ThreadPoolExitException() + # CLI arguments. argv_len = len(sys.argv) if argv_len > 1: @@ -215,55 +374,66 @@ if __name__ == '__main__': nthreads = None else: nthreads = None - if argv_len > 2: min_ = int(sys.argv[2]) else: min_ = 1 - if argv_len > 3: max_ = int(sys.argv[3]) else: max_ = 100 - if argv_len > 4: - c = sys.argv[4][0] + handle_output_funtion_string = sys.argv[4] else: - c = '0' - if c == '1': - handle_output = handle_output_print_no_exit - elif c == '2': - handle_output = handle_output_queue - elif c == '3': - handle_output = handle_output_raise - else: - handle_output = handle_output_print - + handle_output_funtion_string = 'handle_output_print' + handle_output = eval(handle_output_funtion_string) if argv_len > 5: - c = sys.argv[5][0] + work_function = work_function_get_thread + thread_id_arg = sys.argv[5] else: - c = '0' - if c == '1': - my_func = func_get_thread - thread_id_arg = 'thread_id' - else: - my_func = func_maybe_raise + work_function = work_function_maybe_raise thread_id_arg = None # Action. - thread_pool = ThreadPool( - my_func, - handle_output, - nthreads, - thread_id_arg, - ) - for work in get_work(min_, max_): - error = thread_pool.submit(work) - if error is not None: - break - error = thread_pool.join() - if error is not None: - print('error: {!r}'.format(error)) + if handle_output is handle_output_raise_exit_exception: + # `with` version with implicit join and submit raise + # immediately when desired with ThreadPoolExitException. + # + # This is the more safe and convenient and DRY usage if + # you can use `with`, so prefer it generally. + with ThreadPool( + work_function, + handle_output, + nthreads, + thread_id_arg, + submit_raise_exit=True + ) as my_thread_pool: + for work in get_work(min_, max_): + my_thread_pool.submit(work) + handle_output_result = my_thread_pool.get_handle_output_result() + else: + # Explicit error checking in submit loop to exit immediately + # on error. + my_thread_pool = ThreadPool( + work_function, + handle_output, + nthreads, + thread_id_arg, + ) + for work_function_input in get_work(min_, max_): + handle_output_result = my_thread_pool.submit(work_function_input) + if handle_output_result is not None: + break + handle_output_result = my_thread_pool.join() + if handle_output_result is not None: + work_function_input, work_function_return, exception = handle_output_result + if type(exception) is ThreadPoolExitException: + print('Early exit requested by handle_output with ThreadPoolExitException:') + else: + print('work_function or handle_output raised:') + print(ThreadPool.exception_traceback_string(exception), end='') + print('work_function_input: {!r}'.format(work_function_input)) + print('work_function_return: {!r}'.format(work_function_return)) if handle_output == handle_output_queue: while not out_queue.empty(): print(out_queue.get())