From 5daad532890bba4ce896d74d77d0b42aea704acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ciro=20Santilli=20=E5=85=AD=E5=9B=9B=E4=BA=8B=E4=BB=B6=20?= =?UTF-8?q?=E6=B3=95=E8=BD=AE=E5=8A=9F?= Date: Sun, 5 May 2019 00:00:00 +0000 Subject: [PATCH] thread_pool: support passing thread IDs Then use that to fix gem5 error log read race. --- common.py | 25 ++++++++++++++++--------- run | 2 +- shell_helpers.py | 1 + test-user-mode | 4 ++-- thread_pool.py | 36 ++++++++++++++++++++++++++++++++---- 5 files changed, 52 insertions(+), 16 deletions(-) diff --git a/common.py b/common.py index dc125b9..a08e2e5 100644 --- a/common.py +++ b/common.py @@ -439,7 +439,8 @@ if one was not given explicitly. ''', ) self.add_argument( - '-u', '--userland', + '-u', + '--userland', help='''\ Run the given userland executable in user mode instead of booting the Linux kernel in full system mode. In gem5, user mode is called Syscall Emulation (SE) mode and @@ -459,14 +460,16 @@ CLI arguments to pass to the userland executable. # Run. self.add_argument( - '-n', '--run-id', default='0', + '--port-offset', + type=int, help='''\ -ID for run outputs such as gem5's m5out. Allows you to do multiple runs, -and then inspect separate outputs later in different output directories. +Increase the ports to be used such as for GDB by an offset to run multiple +instances in parallel. Default: the run ID (-n) if that is an integer, otherwise 0. ''' ) self.add_argument( - '-P', '--prebuilt', default=False, + '--prebuilt', + default=False, help='''\ Use prebuilt packaged host utilities as much as possible instead of the ones we built ourselves. Saves build time, but decreases @@ -474,10 +477,11 @@ the likelihood of incompatibilities. ''' ) self.add_argument( - '--port-offset', type=int, + '--run-id', + default='0', help='''\ -Increase the ports to be used such as for GDB by an offset to run multiple -instances in parallel. Default: the run ID (-n) if that is an integer, otherwise 0. +ID for run outputs such as gem5's m5out. Allows you to do multiple runs, +and then inspect separate outputs later in different output directories. ''' ) @@ -1418,7 +1422,8 @@ class TestCliFunction(LkmcCliFunction): run_obj, run_args=None, test_id=None, - expected_exit_status=None + expected_exit_status=None, + thread_id=0, ): ''' This is a setup / run / teardown setup for simple tests that just do a single run. @@ -1429,10 +1434,12 @@ class TestCliFunction(LkmcCliFunction): :param run_obj: callable object :param run_args: arguments to be passed to the runnable object :param test_id: test identifier, to be added in addition to of arch and emulator ids + :param thread_id: which thread the test is running under ''' if run_obj.is_arch_supported(self.env['arch']): if run_args is None: run_args = {} + run_args['run_id'] = thread_id test_id_string = self.test_setup(test_id) exit_status = run_obj(**run_args) return self.test_teardown( diff --git a/run b/run index 5e6b2d5..b930171 100755 --- a/run +++ b/run @@ -715,7 +715,7 @@ Extra options to append at the end of the emulator command line. if line.rstrip() == self.env['magic_fail_string']: exit_status = 1 break - if exit_status != 0: + if exit_status != 0 and self.env['show_stdout']: self.log_error('simulation error detected by parsing logs') return exit_status diff --git a/shell_helpers.py b/shell_helpers.py index 8a458e1..a52b45b 100644 --- a/shell_helpers.py +++ b/shell_helpers.py @@ -175,6 +175,7 @@ class ShellHelpers: if not self.quiet: self._print_thread_safe('+ ' + cmd_string) if cmd_file is not None: + os.makedirs(os.path.dirname(cmd_file), exist_ok=True) with open(cmd_file, 'w') as f: f.write('#!/usr/bin/env bash\n') f.write(cmd_string) diff --git a/test-user-mode b/test-user-mode index 6feee14..9cb058b 100755 --- a/test-user-mode +++ b/test-user-mode @@ -24,7 +24,6 @@ If given, run only the given tests. Otherwise, run all tests. ) def timed_main(self): - run = self.import_path_main('run') run_args = self.get_common_args() run_args['ctrl_c_host'] = True run_args['show_stdout'] = False @@ -36,6 +35,7 @@ If given, run only the given tests. Otherwise, run all tests. with ThreadPool( self.run_test, nthreads=self.env['nproc'], + thread_id_arg='thread_id', ) as thread_pool: try: for path, in_dirnames, in_filenames in self.walk_source_targets( @@ -56,7 +56,7 @@ If given, run only the given tests. Otherwise, run all tests. error = thread_pool.submit({ 'expected_exit_status': test.exit_status, 'run_args': cur_run_args, - 'run_obj': run, + 'run_obj': self.import_path_main('run'), 'test_id': path_relative_root, }) if error is not None: diff --git a/thread_pool.py b/thread_pool.py index c92d330..5b0b677 100644 --- a/thread_pool.py +++ b/thread_pool.py @@ -31,6 +31,7 @@ class ThreadPool: python3 thread_pool.py 2 -10 20 1 python3 thread_pool.py 2 -10 20 2 python3 thread_pool.py 2 -10 20 3 + python3 thread_pool.py 2 -10 20 0 1 .... These ensure that execution stops neatly on error. @@ -39,7 +40,8 @@ class ThreadPool: self, func: Callable, handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None, - nthreads: Union[int,None] = None + nthreads: Union[int,None] = None, + thread_id_arg: Union[str,None] = None, ): ''' Start in a thread pool immediately. @@ -62,6 +64,9 @@ class ThreadPool: Default: a handler that does nothing and just exits on exception. :param nthreads: number of threads to use. Default: nproc. + :param thread_id_arg: if not None, set the argument of func with this name + to a 0-indexed thread ID. This allows function calls to coordinate + usage of external resources such as files or ports. ''' self.func = func if handle_output is None: @@ -69,6 +74,7 @@ class ThreadPool: self.handle_output = handle_output if nthreads is None: nthreads = len(os.sched_getaffinity(0)) + self.thread_id_arg = thread_id_arg self.nthreads = nthreads self.error_output = None self.error_output_lock = threading.Lock() @@ -77,6 +83,7 @@ class ThreadPool: for i in range(self.nthreads): thread = threading.Thread( target=self._func_runner, + args=(i,) ) self.threads.append(thread) thread.start() @@ -123,11 +130,13 @@ class ThreadPool: thread.join() return self.error_output - def _func_runner(self): + def _func_runner(self, thread_id): while True: work = self.in_queue.get(block=True) if work is None: break + if self.thread_id_arg is not None: + work[self.thread_id_arg] = thread_id try: exception = None out = self.func(**work) @@ -147,7 +156,7 @@ class ThreadPool: self.in_queue.task_done() if __name__ == '__main__': - def my_func(i): + def func_maybe_raise(i): ''' The main function that will be evaluated. @@ -156,6 +165,10 @@ if __name__ == '__main__': time.sleep((abs(i) % 4) / 10.0) return 10.0 / i + def func_get_thread(i, thread_id): + time.sleep((abs(i) % 4) / 10.0) + return thread_id + def get_work(min_, max_): ''' Generate simple range work for my_func. @@ -202,14 +215,17 @@ if __name__ == '__main__': nthreads = None else: nthreads = None + if argv_len > 2: min_ = int(sys.argv[2]) else: min_ = 1 + if argv_len > 3: max_ = int(sys.argv[3]) else: max_ = 100 + if argv_len > 4: c = sys.argv[4][0] else: @@ -223,11 +239,23 @@ if __name__ == '__main__': else: handle_output = handle_output_print + if argv_len > 5: + c = sys.argv[5][0] + else: + c = '0' + if c == '1': + my_func = func_get_thread + thread_id_arg = 'thread_id' + else: + my_func = func_maybe_raise + thread_id_arg = None + # Action. thread_pool = ThreadPool( my_func, handle_output, - nthreads + nthreads, + thread_id_arg, ) for work in get_work(min_, max_): error = thread_pool.submit(work)