From 37c5338938e2514911815bbbbd8aa6c9272f3f3c Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Fri, 23 May 2025 11:12:20 +0200 Subject: [PATCH 01/25] Typo. --- kernel_tuner/runners/sequential.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index aeebd5116..194eb0545 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -55,7 +55,7 @@ def run(self, parameter_space, tuning_options): :param tuning_options: A dictionary with all options regarding the tuning process. - :type tuning_options: kernel_tuner.iterface.Options + :type tuning_options: kernel_tuner.interface.Options :returns: A list of dictionaries for executed kernel configurations and their execution times. From 6c5b360cad115d840852f618f6ffda9cf89addbd Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Fri, 23 May 2025 11:12:43 +0200 Subject: [PATCH 02/25] Add missing parameter to the interface. --- kernel_tuner/runners/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel_tuner/runners/runner.py b/kernel_tuner/runners/runner.py index 80ab32146..8c4de22d7 100644 --- a/kernel_tuner/runners/runner.py +++ b/kernel_tuner/runners/runner.py @@ -14,7 +14,7 @@ def __init__( pass @abstractmethod - def get_environment(self): + def get_environment(self, tuning_options): pass @abstractmethod From a21caf8398306cb9f7b136043ae97f4a084c2759 Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Fri, 23 May 2025 11:53:57 +0200 Subject: [PATCH 03/25] Formatting. --- kernel_tuner/runners/sequential.py | 53 +++++++++++++++++------------- kernel_tuner/runners/simulation.py | 40 +++++++++++----------- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index 194eb0545..eeeedbd29 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -20,15 +20,13 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob :param kernel_options: A dictionary with all options for the kernel. :type kernel_options: kernel_tuner.interface.Options - :param device_options: A dictionary with all options for the device - on which the kernel should be tuned. + :param device_options: A dictionary with all options for the device on which the kernel should be tuned. :type device_options: kernel_tuner.interface.Options - :param iterations: The number of iterations used for benchmarking - each kernel instance. + :param iterations: The number of iterations used for benchmarking each kernel instance. :type iterations: int """ - #detect language and create high-level device interface + # detect language and create high-level device interface self.dev = DeviceInterface(kernel_source, iterations=iterations, observers=observers, **device_options) self.units = self.dev.units @@ -41,7 +39,7 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.last_strategy_time = 0 self.kernel_options = kernel_options - #move data to the GPU + # move data to the GPU self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments) def get_environment(self, tuning_options): @@ -53,16 +51,14 @@ def run(self, parameter_space, tuning_options): :param parameter_space: The parameter space as an iterable. :type parameter_space: iterable - :param tuning_options: A dictionary with all options regarding the tuning - process. + :param tuning_options: A dictionary with all options regarding the tuning process. :type tuning_options: kernel_tuner.interface.Options - :returns: A list of dictionaries for executed kernel configurations and their - execution times. - :rtype: dict()) + :returns: A list of dictionaries for executed kernel configurations and their execution times. + :rtype: dict() """ - logging.debug('sequential runner started for ' + self.kernel_options.kernel_name) + logging.debug("sequential runner started for " + self.kernel_options.kernel_name) results = [] @@ -77,33 +73,46 @@ def run(self, parameter_space, tuning_options): x_int = ",".join([str(i) for i in element]) if tuning_options.cache and x_int in tuning_options.cache: params.update(tuning_options.cache[x_int]) - params['compile_time'] = 0 - params['verification_time'] = 0 - params['benchmark_time'] = 0 + params["compile_time"] = 0 + params["verification_time"] = 0 + params["benchmark_time"] = 0 else: # attempt to warmup the GPU by running the first config in the parameter space and ignoring the result if not self.warmed_up: warmup_time = perf_counter() - self.dev.compile_and_benchmark(self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options) + self.dev.compile_and_benchmark( + self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options + ) self.warmed_up = True warmup_time = 1e3 * (perf_counter() - warmup_time) - result = self.dev.compile_and_benchmark(self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options) + result = self.dev.compile_and_benchmark( + self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options + ) params.update(result) if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig): - logging.debug('kernel configuration was skipped silently due to compile or runtime failure') + logging.debug("kernel configuration was skipped silently due to compile or runtime failure") # only compute metrics on configs that have not errored if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig): params = process_metrics(params, tuning_options.metrics) # get the framework time by estimating based on other times - total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) - params['strategy_time'] = self.last_strategy_time - params['framework_time'] = max(total_time - (params['compile_time'] + params['verification_time'] + params['benchmark_time'] + params['strategy_time']), 0) - params['timestamp'] = str(datetime.now(timezone.utc)) + total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) + params["strategy_time"] = self.last_strategy_time + params["framework_time"] = max( + total_time + - ( + params["compile_time"] + + params["verification_time"] + + params["benchmark_time"] + + params["strategy_time"] + ), + 0, + ) + params["timestamp"] = str(datetime.now(timezone.utc)) self.start_time = perf_counter() if result: diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index 22c7c667c..7a167bfcf 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -14,11 +14,11 @@ class SimulationDevice(_SimulationDevice): @property def name(self): - return self.env['device_name'] + return self.env["device_name"] @name.setter def name(self, value): - self.env['device_name'] = value + self.env["device_name"] = value if not self.quiet: print("Simulating: " + value) @@ -38,12 +38,10 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob :param kernel_options: A dictionary with all options for the kernel. :type kernel_options: kernel_tuner.interface.Options - :param device_options: A dictionary with all options for the device - on which the kernel should be tuned. + :param device_options: A dictionary with all options for the device on which the kernel should be tuned. :type device_options: kernel_tuner.interface.Options - :param iterations: The number of iterations used for benchmarking - each kernel instance. + :param iterations: The number of iterations used for benchmarking each kernel instance. :type iterations: int """ self.quiet = device_options.quiet @@ -70,21 +68,18 @@ def run(self, parameter_space, tuning_options): :param parameter_space: The parameter space as an iterable. :type parameter_space: iterable - :param tuning_options: A dictionary with all options regarding the tuning - process. + :param tuning_options: A dictionary with all options regarding the tuning process. :type tuning_options: kernel_tuner.iterface.Options - :returns: A list of dictionaries for executed kernel configurations and their - execution times. + :returns: A list of dictionaries for executed kernel configurations and their execution times. :rtype: dict() """ - logging.debug('simulation runner started for ' + self.kernel_options.kernel_name) + logging.debug("simulation runner started for " + self.kernel_options.kernel_name) results = [] # iterate over parameter space for element in parameter_space: - # check if element is in the cache x_int = ",".join([str(i) for i in element]) if tuning_options.cache and x_int in tuning_options.cache: @@ -98,21 +93,22 @@ def run(self, parameter_space, tuning_options): # configuration is already counted towards the unique_results. # It is the responsibility of cost_func to add configs to unique_results. if x_int in tuning_options.unique_results: - - result['compile_time'] = 0 - result['verification_time'] = 0 - result['benchmark_time'] = 0 + result["compile_time"] = 0 + result["verification_time"] = 0 + result["benchmark_time"] = 0 else: # configuration is evaluated for the first time, print to the console - util.print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units) + util.print_config_output( + tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units + ) # Everything but the strategy time and framework time are simulated, # self.last_strategy_time is set by cost_func - result['strategy_time'] = self.last_strategy_time + result["strategy_time"] = self.last_strategy_time try: - simulated_time = result['compile_time'] + result['verification_time'] + result['benchmark_time'] + simulated_time = result["compile_time"] + result["verification_time"] + result["benchmark_time"] tuning_options.simulated_time += simulated_time except KeyError: if "time_limit" in tuning_options: @@ -122,13 +118,15 @@ def run(self, parameter_space, tuning_options): total_time = 1000 * (perf_counter() - self.start_time) self.start_time = perf_counter() - result['framework_time'] = total_time - self.last_strategy_time + result["framework_time"] = total_time - self.last_strategy_time results.append(result) continue # if the element is not in the cache, raise an error - check = util.check_restrictions(tuning_options.restrictions, dict(zip(tuning_options['tune_params'].keys(), element)), True) + check = util.check_restrictions( + tuning_options.restrictions, dict(zip(tuning_options["tune_params"].keys(), element)), True + ) err_string = f"kernel configuration {element} not in cache, does {'' if check else 'not '}pass extra restriction check ({check})" logging.debug(err_string) raise ValueError(f"{err_string} - in simulation mode, all configurations must be present in the cache") From a2328c4c5eb5abdfc99219dcda53615f2c7ed42f Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Thu, 5 Jun 2025 11:12:02 +0200 Subject: [PATCH 04/25] First early draft of the parallel runner. --- examples/cuda/vector_add_parallel.py | 35 ++++++ kernel_tuner/interface.py | 14 ++- kernel_tuner/runners/parallel.py | 166 +++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 examples/cuda/vector_add_parallel.py create mode 100644 kernel_tuner/runners/parallel.py diff --git a/examples/cuda/vector_add_parallel.py b/examples/cuda/vector_add_parallel.py new file mode 100644 index 000000000..d1c112aa5 --- /dev/null +++ b/examples/cuda/vector_add_parallel.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +import numpy +from kernel_tuner import tune_kernel + + +def tune(): + kernel_string = """ + __global__ void vector_add(float *c, float *a, float *b, int n) { + int i = (blockIdx.x * block_size_x) + threadIdx.x; + if ( i < n ) { + c[i] = a[i] + b[i]; + } + } + """ + + size = 10000000 + + a = numpy.random.randn(size).astype(numpy.float32) + b = numpy.random.randn(size).astype(numpy.float32) + c = numpy.zeros_like(b) + n = numpy.int32(size) + + args = [c, a, b, n] + + tune_params = dict() + tune_params["block_size_x"] = [32 * i for i in range(1, 33)] + + results, env = tune_kernel("vector_add", kernel_string, size, args, tune_params, parallel_runner=4) + + return results + + +if __name__ == "__main__": + tune() diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py index bd421aeab..ed7b56487 100644 --- a/kernel_tuner/interface.py +++ b/kernel_tuner/interface.py @@ -32,6 +32,7 @@ import kernel_tuner.core as core import kernel_tuner.util as util from kernel_tuner.integration import get_objective_defaults +from kernel_tuner.runners.parallel import ParallelRunner from kernel_tuner.runners.sequential import SequentialRunner from kernel_tuner.runners.simulation import SimulationRunner from kernel_tuner.searchspace import Searchspace @@ -463,6 +464,7 @@ def __deepcopy__(self, _): ), ("metrics", ("specifies user-defined metrics, please see :ref:`metrics`.", "dict")), ("simulation_mode", ("Simulate an auto-tuning search from an existing cachefile", "bool")), + ("parallel_runner", ("If the value is larger than 1 use that number as the number of parallel runners doing the tuning", "int")), ("observers", ("""A list of Observers to use during tuning, please see :ref:`observers`.""", "list")), ] ) @@ -574,6 +576,7 @@ def tune_kernel( cache=None, metrics=None, simulation_mode=False, + parallel_runner=1, observers=None, objective=None, objective_higher_is_better=None, @@ -600,6 +603,8 @@ def tune_kernel( if iterations < 1: raise ValueError("Iterations should be at least one!") + if parallel_runner < 1: + logging.warning("The number of parallel runners should be at least one!") # sort all the options into separate dicts opts = locals() @@ -650,7 +655,14 @@ def tune_kernel( strategy = brute_force # select the runner for this job based on input - selected_runner = SimulationRunner if simulation_mode else SequentialRunner + # TODO: we could use the "match case" syntax when removing support for 3.9 + if simulation_mode: + selected_runner = SimulationRunner + elif parallel_runner > 1: + selected_runner = ParallelRunner + tuning_options.parallel_runner = parallel_runner + else: + selected_runner = SequentialRunner tuning_options.simulated_time = 0 runner = selected_runner(kernelsource, kernel_options, device_options, iterations, observers) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py new file mode 100644 index 000000000..a4362b0eb --- /dev/null +++ b/kernel_tuner/runners/parallel.py @@ -0,0 +1,166 @@ +"""A specialized runner that tunes in parallel the parameter space.""" +import logging +from time import perf_counter +from datetime import datetime, timezone + +from ray import remote, get, put + +from kernel_tuner.runners.runner import Runner +from kernel_tuner.core import DeviceInterface +from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache + + +class ParallelRunnerState: + """This class represents the state of a parallel tuning run.""" + + def __init__(self, observers, iterations): + self.device_options = None + self.quiet = False + self.kernel_source = None + self.warmed_up = False + self.simulation_mode = False + self.start_time = None + self.last_strategy_start_time = None + self.last_strategy_time = 0 + self.kernel_options = None + self.observers = observers + self.iterations = iterations + + +@remote +def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuning_options): + dev = DeviceInterface( + state.kernel_source, iterations=state.iterations, observers=state.observers, **state.device_options + ) + # move data to the GPU + gpu_args = dev.ready_argument_list(state.kernel_options.arguments) + # iterate over parameter space + results = [] + elements_per_task = len(parameter_space) / tuning_options.parallel_runner + first_element = task_id * elements_per_task + last_element = ( + (task_id + 1) * elements_per_task if task_id + 1 < tuning_options.parallel_runner else len(parameter_space) + ) + for element in parameter_space[first_element:last_element]: + params = dict(zip(tuning_options.tune_params.keys(), element)) + + result = None + warmup_time = 0 + + # check if configuration is in the cache + x_int = ",".join([str(i) for i in element]) + if tuning_options.cache and x_int in tuning_options.cache: + params.update(tuning_options.cache[x_int]) + params["compile_time"] = 0 + params["verification_time"] = 0 + params["benchmark_time"] = 0 + else: + # attempt to warm up the GPU by running the first config in the parameter space and ignoring the result + if not state.warmed_up: + warmup_time = perf_counter() + dev.compile_and_benchmark(state.kernel_source, gpu_args, params, state.kernel_options, tuning_options) + state.warmed_up = True + warmup_time = 1e3 * (perf_counter() - warmup_time) + + result = dev.compile_and_benchmark( + state.kernel_source, gpu_args, params, state.kernel_options, tuning_options + ) + + params.update(result) + + if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig): + logging.debug("kernel configuration was skipped silently due to compile or runtime failure") + + # only compute metrics on configs that have not errored + if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig): + params = process_metrics(params, tuning_options.metrics) + + # get the framework time by estimating based on other times + total_time = 1000 * ((perf_counter() - state.start_time) - warmup_time) + params["strategy_time"] = state.last_strategy_time + params["framework_time"] = max( + total_time + - ( + params["compile_time"] + + params["verification_time"] + + params["benchmark_time"] + + params["strategy_time"] + ), + 0, + ) + params["timestamp"] = str(datetime.now(timezone.utc)) + state.start_time = perf_counter() + + if result: + # print configuration to the console + print_config_output(tuning_options.tune_params, params, state.quiet, tuning_options.metrics, dev.units) + + # add configuration to cache + store_cache(x_int, params, tuning_options) + + # all visited configurations are added to results to provide a trace for optimization strategies + results.append(params) + + return results + + +class ParallelRunner(Runner): + """ParallelRunner is used to distribute configurations across multiple nodes.""" + + def __init__(self, kernel_source, kernel_options, device_options, iterations, observers): + """Instantiate the ParallelRunner. + + :param kernel_source: The kernel source + :type kernel_source: kernel_tuner.core.KernelSource + + :param kernel_options: A dictionary with all options for the kernel. + :type kernel_options: kernel_tuner.interface.Options + + :param device_options: A dictionary with all options for the device + on which the kernel should be tuned. + :type device_options: kernel_tuner.interface.Options + + :param iterations: The number of iterations used for benchmarking + each kernel instance. + :type iterations: int + """ + self.state = ParallelRunnerState(observers, iterations) + self.state.quiet = device_options.quiet + self.state.kernel_source = kernel_source + self.state.warmed_up = False + self.state.simulation_mode = False + self.state.start_time = perf_counter() + self.state.last_strategy_start_time = self.state.start_time + self.state.last_strategy_time = 0 + self.state.kernel_options = kernel_options + + def get_environment(self, tuning_options): + # TODO: we are going to fix this one later + return None + + def run(self, parameter_space, tuning_options): + """Iterate through the entire parameter space using a single Python process. + + :param parameter_space: The parameter space as an iterable. + :type parameter_space: iterable + + :param tuning_options: A dictionary with all options regarding the tuning process. + :type tuning_options: kernel_tuner.interface.Options + + :returns: A list of dictionaries for executed kernel configurations and their execution times. + :rtype: dict() + """ + # given the parameter_space, distribute it over Ray tasks + logging.debug("parallel runner started for " + self.state.kernel_options.kernel_name) + + results = [] + tasks = [] + parameter_space_ref = put(parameter_space) + state_ref = put(self.state) + tuning_options_ref = put(tuning_options) + for task_id in range(0, tuning_options.parallel_runner): + tasks.append(parallel_run.remote(task_id, state_ref, parameter_space_ref, tuning_options_ref)) + for task in tasks: + results.append(get(task)) + + return results From 68a569ba8849673cde3735824e3fd0a6a5c4e3ef Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Thu, 5 Jun 2025 11:36:35 +0200 Subject: [PATCH 05/25] Need a dummy DeviceInterface even on the master. --- kernel_tuner/runners/parallel.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index a4362b0eb..e8d759ecd 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -133,10 +133,12 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.state.last_strategy_start_time = self.state.start_time self.state.last_strategy_time = 0 self.state.kernel_options = kernel_options + # define a dummy device interface + self.dev = DeviceInterface(kernel_source) def get_environment(self, tuning_options): - # TODO: we are going to fix this one later - return None + # dummy environment + return self.dev.get_environment() def run(self, parameter_space, tuning_options): """Iterate through the entire parameter space using a single Python process. From 9d0dee4a4870ef7d7694ae363855c0cd5ca237ef Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Thu, 5 Jun 2025 11:41:59 +0200 Subject: [PATCH 06/25] Missing device_options in state. --- kernel_tuner/runners/parallel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index e8d759ecd..5b501d9c5 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -125,6 +125,7 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob :type iterations: int """ self.state = ParallelRunnerState(observers, iterations) + self.state.device_options = device_options self.state.quiet = device_options.quiet self.state.kernel_source = kernel_source self.state.warmed_up = False From aff21f035dd68b19e6b0de8c64c75efee3e01a4d Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Thu, 5 Jun 2025 15:12:08 +0200 Subject: [PATCH 07/25] Flatten the results. --- kernel_tuner/runners/parallel.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index 5b501d9c5..2d20bd4bc 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -2,6 +2,7 @@ import logging from time import perf_counter from datetime import datetime, timezone +from itertools import chain from ray import remote, get, put @@ -166,4 +167,4 @@ def run(self, parameter_space, tuning_options): for task in tasks: results.append(get(task)) - return results + return [chain.from_iterable(results)] From d7e8cae2778b7aad7408b76bbfe313aa69f05841 Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Thu, 5 Jun 2025 15:18:59 +0200 Subject: [PATCH 08/25] Various bug fixes. --- kernel_tuner/runners/parallel.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index 2d20bd4bc..f454d0686 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -28,7 +28,7 @@ def __init__(self, observers, iterations): self.iterations = iterations -@remote +@remote(num_cpus=1, num_gpus=1) def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuning_options): dev = DeviceInterface( state.kernel_source, iterations=state.iterations, observers=state.observers, **state.device_options @@ -37,9 +37,9 @@ def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuni gpu_args = dev.ready_argument_list(state.kernel_options.arguments) # iterate over parameter space results = [] - elements_per_task = len(parameter_space) / tuning_options.parallel_runner - first_element = task_id * elements_per_task - last_element = ( + elements_per_task = int(len(parameter_space) / tuning_options.parallel_runner) + first_element = int(task_id * elements_per_task) + last_element = int( (task_id + 1) * elements_per_task if task_id + 1 < tuning_options.parallel_runner else len(parameter_space) ) for element in parameter_space[first_element:last_element]: @@ -167,4 +167,4 @@ def run(self, parameter_space, tuning_options): for task in tasks: results.append(get(task)) - return [chain.from_iterable(results)] + return list(chain.from_iterable(results)) From b4ff7fa49574c8e277a71f70c289102e73243388 Mon Sep 17 00:00:00 2001 From: Alessio Sclocco Date: Fri, 6 Jun 2025 11:21:07 +0200 Subject: [PATCH 09/25] Add another example for the parallel runner. --- examples/cuda/sepconv_parallel.py | 88 +++++++++++++++++++++++++++++++ kernel_tuner/runners/parallel.py | 5 +- 2 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 examples/cuda/sepconv_parallel.py diff --git a/examples/cuda/sepconv_parallel.py b/examples/cuda/sepconv_parallel.py new file mode 100644 index 000000000..074200e1b --- /dev/null +++ b/examples/cuda/sepconv_parallel.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +import numpy +from kernel_tuner import tune_kernel +from collections import OrderedDict + + +def tune(): + with open("convolution.cu", "r") as f: + kernel_string = f.read() + + # setup tunable parameters + tune_params = OrderedDict() + tune_params["filter_height"] = [i for i in range(3, 19, 2)] + tune_params["filter_width"] = [i for i in range(3, 19, 2)] + tune_params["block_size_x"] = [16 * i for i in range(1, 65)] + tune_params["block_size_y"] = [2**i for i in range(6)] + tune_params["tile_size_x"] = [i for i in range(1, 11)] + tune_params["tile_size_y"] = [i for i in range(1, 11)] + + tune_params["use_padding"] = [0, 1] # toggle the insertion of padding in shared memory + tune_params["read_only"] = [0, 1] # toggle using the read-only cache + + # limit the search to only use padding when its effective, and at least 32 threads in a block + restrict = ["use_padding==0 or (block_size_x % 32 != 0)", "block_size_x*block_size_y >= 32"] + + # setup input and output dimensions + problem_size = (4096, 4096) + size = numpy.prod(problem_size) + largest_fh = max(tune_params["filter_height"]) + largest_fw = max(tune_params["filter_width"]) + input_size = (problem_size[0] + largest_fw - 1) * (problem_size[1] + largest_fh - 1) + + # create input data + output_image = numpy.zeros(size).astype(numpy.float32) + input_image = numpy.random.randn(input_size).astype(numpy.float32) + filter_weights = numpy.random.randn(largest_fh * largest_fw).astype(numpy.float32) + + # setup kernel arguments + cmem_args = {"d_filter": filter_weights} + args = [output_image, input_image, filter_weights] + + # tell the Kernel Tuner how to compute grid dimensions + grid_div_x = ["block_size_x", "tile_size_x"] + grid_div_y = ["block_size_y", "tile_size_y"] + + # start tuning separable convolution (row) + tune_params["filter_height"] = [1] + tune_params["tile_size_y"] = [1] + results_row = tune_kernel( + "convolution_kernel", + kernel_string, + problem_size, + args, + tune_params, + grid_div_y=grid_div_y, + grid_div_x=grid_div_x, + cmem_args=cmem_args, + verbose=False, + restrictions=restrict, + parallel_runner=1024, + cache="convolution_kernel_row", + ) + + # start tuning separable convolution (col) + tune_params["filter_height"] = tune_params["filter_width"][:] + tune_params["file_size_y"] = tune_params["tile_size_x"][:] + tune_params["filter_width"] = [1] + tune_params["tile_size_x"] = [1] + results_col = tune_kernel( + "convolution_kernel", + kernel_string, + problem_size, + args, + tune_params, + grid_div_y=grid_div_y, + grid_div_x=grid_div_x, + cmem_args=cmem_args, + verbose=False, + restrictions=restrict, + parallel_runner=1024, + cache="convolution_kernel_col", + ) + + return results_row, results_col + + +if __name__ == "__main__": + results_row, results_col = tune() diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index f454d0686..e689096f9 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -135,7 +135,10 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.state.last_strategy_start_time = self.state.start_time self.state.last_strategy_time = 0 self.state.kernel_options = kernel_options - # define a dummy device interface + # fields used directly by strategies + self.last_strategy_time = perf_counter() + self.state.last_strategy_start_time = self.last_strategy_time + # define a dummy device interface on the master node self.dev = DeviceInterface(kernel_source) def get_environment(self, tuning_options): From 426dd2a188ab22c5b8b5c8e784fb6a8be7bcae8e Mon Sep 17 00:00:00 2001 From: stijn Date: Mon, 19 Jan 2026 16:43:49 +0100 Subject: [PATCH 10/25] Rewrite parallel runner to use stateful actors --- examples/cuda/vector_add_parallel.py | 4 +- kernel_tuner/interface.py | 37 +-- kernel_tuner/runners/parallel.py | 342 +++++++++++++++++---------- kernel_tuner/runners/runner.py | 7 + kernel_tuner/runners/sequential.py | 3 + kernel_tuner/runners/simulation.py | 3 + 6 files changed, 248 insertions(+), 148 deletions(-) diff --git a/examples/cuda/vector_add_parallel.py b/examples/cuda/vector_add_parallel.py index d1c112aa5..8d35ce7c7 100644 --- a/examples/cuda/vector_add_parallel.py +++ b/examples/cuda/vector_add_parallel.py @@ -26,8 +26,8 @@ def tune(): tune_params = dict() tune_params["block_size_x"] = [32 * i for i in range(1, 33)] - results, env = tune_kernel("vector_add", kernel_string, size, args, tune_params, parallel_runner=4) - + results, env = tune_kernel("vector_add", kernel_string, size, args, tune_params, parallel_workers=True) + print(env) return results diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py index 2faa96213..46974cd28 100644 --- a/kernel_tuner/interface.py +++ b/kernel_tuner/interface.py @@ -37,9 +37,6 @@ import kernel_tuner.util as util from kernel_tuner.file_utils import get_input_file, get_t4_metadata, get_t4_results from kernel_tuner.integration import get_objective_defaults -from kernel_tuner.runners.parallel import ParallelRunner -from kernel_tuner.runners.sequential import SequentialRunner -from kernel_tuner.runners.simulation import SimulationRunner from kernel_tuner.searchspace import Searchspace try: @@ -469,7 +466,7 @@ def __deepcopy__(self, _): ), ("metrics", ("specifies user-defined metrics, please see :ref:`metrics`.", "dict")), ("simulation_mode", ("Simulate an auto-tuning search from an existing cachefile", "bool")), - ("parallel_runner", ("If the value is larger than 1 use that number as the number of parallel runners doing the tuning", "int")), + ("parallel_workers", ("Set to `True` or an integer to enable parallel tuning. If set to an integer, this will be the number of parallel workers.", "int|bool")), ("observers", ("""A list of Observers to use during tuning, please see :ref:`observers`.""", "list")), ] ) @@ -581,7 +578,7 @@ def tune_kernel( cache=None, metrics=None, simulation_mode=False, - parallel_runner=1, + parallel_workers=None, observers=None, objective=None, objective_higher_is_better=None, @@ -608,8 +605,6 @@ def tune_kernel( if iterations < 1: raise ValueError("Iterations should be at least one!") - if parallel_runner < 1: - logging.warning("The number of parallel runners should be at least one!") # sort all the options into separate dicts opts = locals() @@ -669,15 +664,21 @@ def tune_kernel( # select the runner for this job based on input # TODO: we could use the "match case" syntax when removing support for 3.9 - if simulation_mode: - selected_runner = SimulationRunner - elif parallel_runner > 1: - selected_runner = ParallelRunner - tuning_options.parallel_runner = parallel_runner - else: - selected_runner = SequentialRunner tuning_options.simulated_time = 0 - runner = selected_runner(kernelsource, kernel_options, device_options, iterations, observers) + + if parallel_workers and simulation_mode: + raise ValueError("Enabling `parallel_workers` and `simulation_mode` together is not supported") + elif simulation_mode: + from kernel_tuner.runners.simulation import SimulationRunner + runner = SimulationRunner(kernelsource, kernel_options, device_options, iterations, observers) + elif parallel_workers: + from kernel_tuner.runners.parallel import ParallelRunner + num_workers = None if parallel_workers is True else parallel_workers + runner = ParallelRunner(kernelsource, kernel_options, device_options, iterations, observers, num_workers=num_workers) + else: + from kernel_tuner.runners.sequential import SequentialRunner + runner = SequentialRunner(kernelsource, kernel_options, device_options, iterations, observers) + # the user-specified function may or may not have an optional atol argument; # we normalize it so that it always accepts atol. @@ -696,7 +697,8 @@ def tune_kernel( tuning_options.cachefile = None # create search space - searchspace = Searchspace(tune_params, restrictions, runner.dev.max_threads, **searchspace_construction_options) + device_info = runner.get_device_info() + searchspace = Searchspace(tune_params, restrictions, device_info.max_threads, **searchspace_construction_options) restrictions = searchspace._modified_restrictions tuning_options.restrictions = restrictions if verbose: @@ -716,6 +718,9 @@ def tune_kernel( results = strategy.tune(searchspace, runner, tuning_options) env = runner.get_environment(tuning_options) + # Shut down the runner + runner.shutdown() + # finished iterating over search space if results: # checks if results is not empty best_config = util.get_best_config(results, objective, objective_higher_is_better) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index e689096f9..18d1efbad 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -1,84 +1,92 @@ """A specialized runner that tunes in parallel the parameter space.""" import logging +import socket from time import perf_counter +from kernel_tuner.interface import Options +from kernel_tuner.util import ErrorConfig, print_config, print_config_output, process_metrics, store_cache +from kernel_tuner.core import DeviceInterface +from kernel_tuner.runners.runner import Runner from datetime import datetime, timezone -from itertools import chain -from ray import remote, get, put +try: + import ray +except ImportError as e: + raise Exception(f"Unable to initialize the parallel runner: {e}") -from kernel_tuner.runners.runner import Runner -from kernel_tuner.core import DeviceInterface -from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache +@ray.remote(num_gpus=1) +class DeviceActor: + def __init__(self, kernel_source, kernel_options, device_options, iterations, observers): + # detect language and create high-level device interface + self.dev = DeviceInterface(kernel_source, iterations=iterations, observers=observers, **device_options) + + self.units = self.dev.units + self.quiet = device_options.quiet + self.kernel_source = kernel_source + self.warmed_up = False if self.dev.requires_warmup else True + self.start_time = perf_counter() + self.last_strategy_start_time = self.start_time + self.last_strategy_time = 0 + self.kernel_options = kernel_options -class ParallelRunnerState: - """This class represents the state of a parallel tuning run.""" + # move data to the GPU + self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments) - def __init__(self, observers, iterations): - self.device_options = None - self.quiet = False - self.kernel_source = None - self.warmed_up = False - self.simulation_mode = False - self.start_time = None - self.last_strategy_start_time = None - self.last_strategy_time = 0 - self.kernel_options = None - self.observers = observers - self.iterations = iterations - - -@remote(num_cpus=1, num_gpus=1) -def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuning_options): - dev = DeviceInterface( - state.kernel_source, iterations=state.iterations, observers=state.observers, **state.device_options - ) - # move data to the GPU - gpu_args = dev.ready_argument_list(state.kernel_options.arguments) - # iterate over parameter space - results = [] - elements_per_task = int(len(parameter_space) / tuning_options.parallel_runner) - first_element = int(task_id * elements_per_task) - last_element = int( - (task_id + 1) * elements_per_task if task_id + 1 < tuning_options.parallel_runner else len(parameter_space) - ) - for element in parameter_space[first_element:last_element]: - params = dict(zip(tuning_options.tune_params.keys(), element)) + def shutdown(self): + ray.actor.exit_actor() + + def get_environment(self): + # Get the device properties + env = dict(self.dev.get_environment()) + + # Get the host name + env["host_name"] = socket.gethostname() + # Get info about the ray instance + ray_info = ray.get_runtime_context() + env["ray"] = dict( + node_id=ray_info.get_node_id(), + worker_id=ray_info.get_worker_id(), + actor_id=ray_info.get_actor_id(), + ) + + return env + + def run(self, element, tuning_options): + # TODO: logging.debug("sequential runner started for " + self.kernel_options.kernel_name) + objective = tuning_options.objective + metrics = tuning_options.metrics + tune_params = tuning_options.tune_params + + params = dict(element) result = None warmup_time = 0 - # check if configuration is in the cache - x_int = ",".join([str(i) for i in element]) - if tuning_options.cache and x_int in tuning_options.cache: - params.update(tuning_options.cache[x_int]) - params["compile_time"] = 0 - params["verification_time"] = 0 - params["benchmark_time"] = 0 - else: - # attempt to warm up the GPU by running the first config in the parameter space and ignoring the result - if not state.warmed_up: - warmup_time = perf_counter() - dev.compile_and_benchmark(state.kernel_source, gpu_args, params, state.kernel_options, tuning_options) - state.warmed_up = True - warmup_time = 1e3 * (perf_counter() - warmup_time) - - result = dev.compile_and_benchmark( - state.kernel_source, gpu_args, params, state.kernel_options, tuning_options + # attempt to warmup the GPU by running the first config in the parameter space and ignoring the result + if not self.warmed_up: + warmup_time = perf_counter() + self.dev.compile_and_benchmark( + self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options ) + self.warmed_up = True + warmup_time = 1e3 * (perf_counter() - warmup_time) - params.update(result) + result = self.dev.compile_and_benchmark( + self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options + ) - if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig): - logging.debug("kernel configuration was skipped silently due to compile or runtime failure") + if objective in result and isinstance(result[objective], ErrorConfig): + logging.debug("kernel configuration was skipped silently due to compile or runtime failure") + + params.update(result) # only compute metrics on configs that have not errored - if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig): - params = process_metrics(params, tuning_options.metrics) + if metrics and not isinstance(params.get(objective), ErrorConfig): + params = process_metrics(params, metrics) # get the framework time by estimating based on other times - total_time = 1000 * ((perf_counter() - state.start_time) - warmup_time) - params["strategy_time"] = state.last_strategy_time + total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) + params["strategy_time"] = self.last_strategy_time params["framework_time"] = max( total_time - ( @@ -89,85 +97,159 @@ def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuni ), 0, ) - params["timestamp"] = str(datetime.now(timezone.utc)) - state.start_time = perf_counter() - if result: - # print configuration to the console - print_config_output(tuning_options.tune_params, params, state.quiet, tuning_options.metrics, dev.units) + params["timestamp"] = str(datetime.now(timezone.utc)) + params["ray_actor_id"] = ray.get_runtime_context().get_actor_id() + params["host_name"] = socket.gethostname() - # add configuration to cache - store_cache(x_int, params, tuning_options) + self.start_time = perf_counter() # all visited configurations are added to results to provide a trace for optimization strategies - results.append(params) + return params + - return results +class DeviceActorState: + def __init__(self, actor): + self.actor = actor + self.running_jobs = [] + self.maximum_running_jobs = 1 + self.is_running = True + self.env = ray.get(actor.get_environment.remote()) + + def __repr__(self): + actor_id = self.env["ray"]["actor_id"] + host_name = self.env["host_name"] + return f"{actor_id} ({host_name})" + + def shutdown(self): + if self.is_running: + self.is_running = False + self.actor.shutdown.remote() + + def submit(self, *args): + job = self.actor.run.remote(*args) + self.running_jobs.append(job) + return job + + def is_available(self): + if not self.is_running: + return False + + # Check for ready jobs, but do not block + ready_jobs, self.running_jobs = ray.wait(self.running_jobs, timeout=0) + ray.get(ready_jobs) + + # Available if this actor can run another job + return len(self.running_jobs) < self.maximum_running_jobs class ParallelRunner(Runner): - """ParallelRunner is used to distribute configurations across multiple nodes.""" + def __init__(self, kernel_source, kernel_options, device_options, iterations, observers, num_workers=None): + if not ray.is_initialized(): + ray.init() + + if num_workers is None: + num_workers = int(ray.cluster_resources().get("GPU", 0)) + + if num_workers == 0: + raise Exception("failed to initialize parallel runner: no GPUs found") + + if num_workers < 1: + raise Exception(f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}") + + self.workers = [] + + try: + for index in range(num_workers): + actor = DeviceActor.remote(kernel_source, kernel_options, device_options, iterations, observers) + worker = DeviceActorState(actor) + self.workers.append(worker) + + logging.info(f"launched worker {index}: {worker}") + except: + # If an exception occurs, shut down the worker + self.shutdown() + raise + + # Check if all workers have the same device + device_names = {w.env.get("device_name") for w in self.workers} + if len(device_names) != 1: + self.shutdown() + raise Exception( + f"failed to initialize parallel runner: workers have different devices: {sorted(device_names)}" + ) - def __init__(self, kernel_source, kernel_options, device_options, iterations, observers): - """Instantiate the ParallelRunner. - - :param kernel_source: The kernel source - :type kernel_source: kernel_tuner.core.KernelSource - - :param kernel_options: A dictionary with all options for the kernel. - :type kernel_options: kernel_tuner.interface.Options - - :param device_options: A dictionary with all options for the device - on which the kernel should be tuned. - :type device_options: kernel_tuner.interface.Options - - :param iterations: The number of iterations used for benchmarking - each kernel instance. - :type iterations: int - """ - self.state = ParallelRunnerState(observers, iterations) - self.state.device_options = device_options - self.state.quiet = device_options.quiet - self.state.kernel_source = kernel_source - self.state.warmed_up = False - self.state.simulation_mode = False - self.state.start_time = perf_counter() - self.state.last_strategy_start_time = self.state.start_time - self.state.last_strategy_time = 0 - self.state.kernel_options = kernel_options - # fields used directly by strategies - self.last_strategy_time = perf_counter() - self.state.last_strategy_start_time = self.last_strategy_time - # define a dummy device interface on the master node - self.dev = DeviceInterface(kernel_source) + self.device_name = device_names.pop() + + # TODO + self.units = "sec" + self.quiet = device_options.quiet + + def get_device_info(self): + return Options(dict(max_threads=1024)) def get_environment(self, tuning_options): - # dummy environment - return self.dev.get_environment() + return dict( + device_name=self.device_name, + workers=[w.env for w in self.workers], + ) + + def shutdown(self): + for worker in self.workers: + try: + worker.shutdown() + except Exception as err: + logging.warning(f"error while shutting down worker {worker}: {err}") + + def submit_job(self, *args): + while True: + # Find an idle actor + for i, worker in enumerate(list(self.workers)): + if worker.is_available(): + # push the worker to the end + self.workers.pop(i) + self.workers.append(worker) + + # Submit the work + return worker.submit(*args) + + # Gather all running jobs + running_jobs = [job for w in self.workers for job in w.running_jobs] + + # If there are no running jobs, then something must be wrong. + # Maybe a worker has crashed or gotten into an invalid state. + if not running_jobs: + raise Exception("invalid state: no Ray workers are available to run job") + + # Wait until any running job completes + ray.wait(running_jobs, num_returns=1) def run(self, parameter_space, tuning_options): - """Iterate through the entire parameter space using a single Python process. - - :param parameter_space: The parameter space as an iterable. - :type parameter_space: iterable - - :param tuning_options: A dictionary with all options regarding the tuning process. - :type tuning_options: kernel_tuner.interface.Options - - :returns: A list of dictionaries for executed kernel configurations and their execution times. - :rtype: dict() - """ - # given the parameter_space, distribute it over Ray tasks - logging.debug("parallel runner started for " + self.state.kernel_options.kernel_name) - - results = [] - tasks = [] - parameter_space_ref = put(parameter_space) - state_ref = put(self.state) - tuning_options_ref = put(tuning_options) - for task_id in range(0, tuning_options.parallel_runner): - tasks.append(parallel_run.remote(task_id, state_ref, parameter_space_ref, tuning_options_ref)) - for task in tasks: - results.append(get(task)) - - return list(chain.from_iterable(results)) + running_jobs = dict() + completed_jobs = dict() + + # Submit jobs which are not in the cache + for config in parameter_space: + params = dict(zip(tuning_options.tune_params.keys(), config)) + key = ",".join([str(i) for i in config]) + + if key in tuning_options.cache: + completed_jobs[key] = tuning_options.cache[key] + else: + assert key not in running_jobs + running_jobs[key] = self.submit_job(params, tuning_options) + completed_jobs[key] = None + + # Wait for the running jobs to finish + for key, job in running_jobs.items(): + result = ray.get(job) + completed_jobs[key] = result + + if result: + # print configuration to the console + print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units) + + # add configuration to cache + store_cache(key, result, tuning_options) + + return list(completed_jobs.values()) diff --git a/kernel_tuner/runners/runner.py b/kernel_tuner/runners/runner.py index 8c4de22d7..3a886ad16 100644 --- a/kernel_tuner/runners/runner.py +++ b/kernel_tuner/runners/runner.py @@ -13,6 +13,13 @@ def __init__( ): pass + def shutdown(self): + pass + + @abstractmethod + def get_device_info(self): + pass + @abstractmethod def get_environment(self, tuning_options): pass diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index 8228402dc..c7b865d44 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -42,6 +42,9 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob # move data to the GPU self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments) + def get_device_info(self): + return self.dev + def get_environment(self, tuning_options): return self.dev.get_environment() diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index 7a167bfcf..f1d392c98 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -56,6 +56,9 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.last_strategy_time = 0 self.units = {} + def get_device_info(self): + return self.dev + def get_environment(self, tuning_options): env = self.dev.get_environment() env["simulation"] = True From f585d42b6d11f2a83755977018ae5ba63554a5ef Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 20 Jan 2026 12:00:06 +0100 Subject: [PATCH 11/25] Move `tuning_options` to constructor of `ParallelRunner` --- kernel_tuner/interface.py | 5 +-- kernel_tuner/runners/parallel.py | 43 +++++++++++------------ kernel_tuner/runners/sequential.py | 2 +- kernel_tuner/util.py | 56 ++++++++++++++---------------- test/test_util_functions.py | 2 +- 5 files changed, 53 insertions(+), 55 deletions(-) diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py index 41331ebea..73c028c21 100644 --- a/kernel_tuner/interface.py +++ b/kernel_tuner/interface.py @@ -682,10 +682,11 @@ def preprocess_cache(filepath): # process cache if cache: cache = preprocess_cache(cache) - util.process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cachefile = cache + tuning_options.cache = util.process_cache(cache, kernel_options, tuning_options, runner) else: - tuning_options.cache = {} tuning_options.cachefile = None + tuning_options.cache = {} # create search space tuning_options.restrictions_unmodified = deepcopy(restrictions) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index 18d1efbad..f033f3857 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -16,7 +16,7 @@ @ray.remote(num_gpus=1) class DeviceActor: - def __init__(self, kernel_source, kernel_options, device_options, iterations, observers): + def __init__(self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers): # detect language and create high-level device interface self.dev = DeviceInterface(kernel_source, iterations=iterations, observers=observers, **device_options) @@ -28,6 +28,7 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.last_strategy_start_time = self.start_time self.last_strategy_time = 0 self.kernel_options = kernel_options + self.tuning_options = tuning_options # move data to the GPU self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments) @@ -52,11 +53,10 @@ def get_environment(self): return env - def run(self, element, tuning_options): + def run(self, element): # TODO: logging.debug("sequential runner started for " + self.kernel_options.kernel_name) - objective = tuning_options.objective - metrics = tuning_options.metrics - tune_params = tuning_options.tune_params + objective = self.tuning_options.objective + metrics = self.tuning_options.metrics params = dict(element) result = None @@ -66,16 +66,16 @@ def run(self, element, tuning_options): if not self.warmed_up: warmup_time = perf_counter() self.dev.compile_and_benchmark( - self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options + self.kernel_source, self.gpu_args, params, self.kernel_options, self.tuning_options ) self.warmed_up = True warmup_time = 1e3 * (perf_counter() - warmup_time) result = self.dev.compile_and_benchmark( - self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options + self.kernel_source, self.gpu_args, params, self.kernel_options, self.tuning_options ) - if objective in result and isinstance(result[objective], ErrorConfig): + if isinstance(result.get(objective), ErrorConfig): logging.debug("kernel configuration was skipped silently due to compile or runtime failure") params.update(result) @@ -139,12 +139,12 @@ def is_available(self): ready_jobs, self.running_jobs = ray.wait(self.running_jobs, timeout=0) ray.get(ready_jobs) - # Available if this actor can run another job + # Available if this actor can now run another job return len(self.running_jobs) < self.maximum_running_jobs class ParallelRunner(Runner): - def __init__(self, kernel_source, kernel_options, device_options, iterations, observers, num_workers=None): + def __init__(self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers, num_workers=None): if not ray.is_initialized(): ray.init() @@ -152,16 +152,16 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob num_workers = int(ray.cluster_resources().get("GPU", 0)) if num_workers == 0: - raise Exception("failed to initialize parallel runner: no GPUs found") + raise RuntimeError("failed to initialize parallel runner: no GPUs found") if num_workers < 1: - raise Exception(f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}") + raise RuntimeError(f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}") self.workers = [] try: for index in range(num_workers): - actor = DeviceActor.remote(kernel_source, kernel_options, device_options, iterations, observers) + actor = DeviceActor.remote(kernel_source, kernel_options, device_options, tuning_options, iterations, observers) worker = DeviceActorState(actor) self.workers.append(worker) @@ -175,14 +175,14 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob device_names = {w.env.get("device_name") for w in self.workers} if len(device_names) != 1: self.shutdown() - raise Exception( + raise RuntimeError( f"failed to initialize parallel runner: workers have different devices: {sorted(device_names)}" ) self.device_name = device_names.pop() - # TODO - self.units = "sec" + # TODO: Get this from the device + self.units = {"time": "ms"} self.quiet = device_options.quiet def get_device_info(self): @@ -237,7 +237,7 @@ def run(self, parameter_space, tuning_options): completed_jobs[key] = tuning_options.cache[key] else: assert key not in running_jobs - running_jobs[key] = self.submit_job(params, tuning_options) + running_jobs[key] = self.submit_job(params) completed_jobs[key] = None # Wait for the running jobs to finish @@ -245,11 +245,10 @@ def run(self, parameter_space, tuning_options): result = ray.get(job) completed_jobs[key] = result - if result: - # print configuration to the console - print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units) + # print configuration to the console + print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units) - # add configuration to cache - store_cache(key, result, tuning_options) + # add configuration to cache + store_cache(key, result, tuning_options.cachefile, tuning_options.cache) return list(completed_jobs.values()) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index c7b865d44..2bd554bfc 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -123,7 +123,7 @@ def run(self, parameter_space, tuning_options): print_config_output(tuning_options.tune_params, params, self.quiet, tuning_options.metrics, self.units) # add configuration to cache - store_cache(x_int, params, tuning_options) + store_cache(x_int, params, tuning_options.cachefile, tuning_options.cache) # all visited configurations are added to results to provide a trace for optimization strategies results.append(params) diff --git a/kernel_tuner/util.py b/kernel_tuner/util.py index 2d9e3f1b3..635c6de78 100644 --- a/kernel_tuner/util.py +++ b/kernel_tuner/util.py @@ -1152,7 +1152,7 @@ def check_matching_problem_size(cached_problem_size, problem_size): if cached_problem_size_arr.size != problem_size_arr.size or not (cached_problem_size_arr == problem_size_arr).all(): raise ValueError(f"Cannot load cache which contains results for different problem_size, cache: {cached_problem_size}, requested: {problem_size}") -def process_cache(cache, kernel_options, tuning_options, runner): +def process_cache(cachefile, kernel_options, tuning_options, runner): """Cache file for storing tuned configurations. the cache file is stored using JSON and uses the following format: @@ -1181,9 +1181,9 @@ def process_cache(cache, kernel_options, tuning_options, runner): raise ValueError("Caching only works correctly when tunable parameters are stored in a dictionary") # if file does not exist, create new cache - if not os.path.isfile(cache): + if not os.path.isfile(cachefile): if tuning_options.simulation_mode: - raise ValueError(f"Simulation mode requires an existing cachefile: file {cache} does not exist") + raise ValueError(f"Simulation mode requires an existing cachefile: file {cachefile} does not exist") c = dict() c["device_name"] = runner.dev.name @@ -1197,15 +1197,14 @@ def process_cache(cache, kernel_options, tuning_options, runner): contents = json.dumps(c, cls=NpEncoder, indent="")[:-3] # except the last "}\n}" # write the header to the cachefile - with open(cache, "w") as cachefile: - cachefile.write(contents) + with open(cachefile, "w") as f: + f.write(contents) - tuning_options.cachefile = cache - tuning_options.cache = {} + return {} # if file exists else: - cached_data = read_cache(cache, open_cache=not tuning_options.simulation_mode) + cached_data = read_cache(cachefile, open_cache=not tuning_options.simulation_mode) # if in simulation mode, use the device name from the cache file as the runner device name if runner.simulation_mode: @@ -1231,17 +1230,16 @@ def process_cache(cache, kernel_options, tuning_options, runner): ) raise ValueError( f"Cannot load cache which contains results obtained with different tunable parameters. \ - Cache at '{cache}' has: {cached_data['tune_params_keys']}, tuning_options has: {list(tuning_options.tune_params.keys())}" + Cache at '{cachefile}' has: {cached_data['tune_params_keys']}, tuning_options has: {list(tuning_options.tune_params.keys())}" ) - tuning_options.cachefile = cache - tuning_options.cache = cached_data["cache"] + return cached_data["cache"] -def correct_open_cache(cache, open_cache=True): +def correct_open_cache(cachefile, open_cache=True): """If cache file was not properly closed, pretend it was properly closed.""" - with open(cache, "r") as cachefile: - filestr = cachefile.read().strip() + with open(cachefile, "r") as f: + filestr = f.read().strip() # if file was not properly closed, pretend it was properly closed if len(filestr) > 0 and filestr[-3:] not in ["}\n}", "}}}"]: @@ -1253,15 +1251,15 @@ def correct_open_cache(cache, open_cache=True): else: if open_cache: # if it was properly closed, open it for appending new entries - with open(cache, "w") as cachefile: - cachefile.write(filestr[:-3] + ",") + with open(cachefile, "w") as f: + f.write(filestr[:-3] + ",") return filestr -def read_cache(cache, open_cache=True): +def read_cache(cachefile, open_cache=True): """Read the cachefile into a dictionary, if open_cache=True prepare the cachefile for appending.""" - filestr = correct_open_cache(cache, open_cache) + filestr = correct_open_cache(cachefile, open_cache) error_configs = { "InvalidConfig": InvalidConfig(), @@ -1279,25 +1277,25 @@ def read_cache(cache, open_cache=True): return cache_data -def close_cache(cache): - if not os.path.isfile(cache): +def close_cache(cachefile): + if not os.path.isfile(cachefile): raise ValueError("close_cache expects cache file to exist") - with open(cache, "r") as fh: + with open(cachefile, "r") as fh: contents = fh.read() # close to file to make sure it can be read by JSON parsers if contents[-1] == ",": - with open(cache, "w") as fh: + with open(cachefile, "w") as fh: fh.write(contents[:-1] + "}\n}") -def store_cache(key, params, tuning_options): +def store_cache(key, params, cachefile, cache): """Stores a new entry (key, params) to the cachefile.""" # logging.debug('store_cache called, cache=%s, cachefile=%s' % (tuning_options.cache, tuning_options.cachefile)) - if isinstance(tuning_options.cache, dict): - if key not in tuning_options.cache: - tuning_options.cache[key] = params + if isinstance(cache, dict): + if key not in cache: + cache[key] = params # Convert ErrorConfig objects to string, wanted to do this inside the JSONconverter but couldn't get it to work output_params = params.copy() @@ -1305,9 +1303,9 @@ def store_cache(key, params, tuning_options): if isinstance(v, ErrorConfig): output_params[k] = str(v) - if tuning_options.cachefile: - with open(tuning_options.cachefile, "a") as cachefile: - cachefile.write("\n" + json.dumps({key: output_params}, cls=NpEncoder)[1:-1] + ",") + if cachefile: + with open(cachefile, "a") as f: + f.write("\n" + json.dumps({key: output_params}, cls=NpEncoder)[1:-1] + ",") def dump_cache(obj: str, tuning_options): diff --git a/test/test_util_functions.py b/test/test_util_functions.py index 4a1858f37..ae50dac18 100644 --- a/test/test_util_functions.py +++ b/test/test_util_functions.py @@ -632,7 +632,7 @@ def assert_open_cachefile_is_correctly_parsed(cache): # store one entry in the cache params = {"x": 4, "time": np.float32(0.1234)} - store_cache("4", params, tuning_options) + store_cache("4", params, tuning_options.cachefile, tuning_options.cache) assert len(tuning_options.cache) == 1 # close the cache From ad55ba47cc57a646282f29b3c7ecaa01fb8c6cd9 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 20 Jan 2026 14:28:42 +0100 Subject: [PATCH 12/25] Fix several errors related to parallel runner --- kernel_tuner/interface.py | 2 +- kernel_tuner/runners/parallel.py | 60 ++++++++++++++++++-------------- test/test_util_functions.py | 14 ++++---- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py index 73c028c21..053f71f29 100644 --- a/kernel_tuner/interface.py +++ b/kernel_tuner/interface.py @@ -662,7 +662,7 @@ def tune_kernel( elif parallel_workers: from kernel_tuner.runners.parallel import ParallelRunner num_workers = None if parallel_workers is True else parallel_workers - runner = ParallelRunner(kernelsource, kernel_options, device_options, iterations, observers, num_workers=num_workers) + runner = ParallelRunner(kernelsource, kernel_options, device_options, tuning_options, iterations, observers, num_workers=num_workers) else: from kernel_tuner.runners.sequential import SequentialRunner runner = SequentialRunner(kernelsource, kernel_options, device_options, iterations, observers) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index f033f3857..a05fc2fa5 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -2,16 +2,18 @@ import logging import socket from time import perf_counter -from kernel_tuner.interface import Options -from kernel_tuner.util import ErrorConfig, print_config, print_config_output, process_metrics, store_cache from kernel_tuner.core import DeviceInterface +from kernel_tuner.interface import Options from kernel_tuner.runners.runner import Runner +from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache from datetime import datetime, timezone +logger = logging.getLogger(__name__) + try: import ray except ImportError as e: - raise Exception(f"Unable to initialize the parallel runner: {e}") + raise ImportError(f"unable to initialize the parallel runner: {e}") from e @ray.remote(num_gpus=1) @@ -44,12 +46,12 @@ def get_environment(self): env["host_name"] = socket.gethostname() # Get info about the ray instance - ray_info = ray.get_runtime_context() - env["ray"] = dict( - node_id=ray_info.get_node_id(), - worker_id=ray_info.get_worker_id(), - actor_id=ray_info.get_actor_id(), - ) + ctx = ray.get_runtime_context() + env["ray"] = { + "node_id": ctx.get_node_id(), + "worker_id": ctx.get_worker_id(), + "actor_id": ctx.get_actor_id(), + } return env @@ -98,7 +100,7 @@ def run(self, element): 0, ) - params["timestamp"] = str(datetime.now(timezone.utc)) + params["timestamp"] = datetime.now(timezone.utc).isoformat() params["ray_actor_id"] = ray.get_runtime_context().get_actor_id() params["host_name"] = socket.gethostname() @@ -122,15 +124,22 @@ def __repr__(self): return f"{actor_id} ({host_name})" def shutdown(self): - if self.is_running: - self.is_running = False + if not self.is_running: + return + + self.is_running = False + + try: self.actor.shutdown.remote() + except Exception: + logger.exception("Failed to request actor shutdown: %s", self) - def submit(self, *args): - job = self.actor.run.remote(*args) + def submit(self, config): + logger.info(f"jobs submitted to worker {self}: {config}") + job = self.actor.run.remote(config) self.running_jobs.append(job) return job - + def is_available(self): if not self.is_running: return False @@ -165,7 +174,7 @@ def __init__(self, kernel_source, kernel_options, device_options, tuning_options worker = DeviceActorState(actor) self.workers.append(worker) - logging.info(f"launched worker {index}: {worker}") + logger.info(f"launched worker {index}: {worker}") except: # If an exception occurs, shut down the worker self.shutdown() @@ -186,31 +195,28 @@ def __init__(self, kernel_source, kernel_options, device_options, tuning_options self.quiet = device_options.quiet def get_device_info(self): - return Options(dict(max_threads=1024)) + return Options({"max_threads": 1024}) def get_environment(self, tuning_options): - return dict( - device_name=self.device_name, - workers=[w.env for w in self.workers], - ) + return { + "device_name": self.device_name, + "workers": [w.env for w in self.workers] + } def shutdown(self): for worker in self.workers: try: worker.shutdown() except Exception as err: - logging.warning(f"error while shutting down worker {worker}: {err}") + logger.exception("error while shutting down worker {worker}") def submit_job(self, *args): while True: - # Find an idle actor + # Round-robin: first available worker gets the job and goes to the back of the list for i, worker in enumerate(list(self.workers)): if worker.is_available(): - # push the worker to the end self.workers.pop(i) self.workers.append(worker) - - # Submit the work return worker.submit(*args) # Gather all running jobs @@ -219,7 +225,7 @@ def submit_job(self, *args): # If there are no running jobs, then something must be wrong. # Maybe a worker has crashed or gotten into an invalid state. if not running_jobs: - raise Exception("invalid state: no Ray workers are available to run job") + raise RuntimeError("invalid state: no Ray workers are available to run job") # Wait until any running job completes ray.wait(running_jobs, num_returns=1) diff --git a/test/test_util_functions.py b/test/test_util_functions.py index ae50dac18..56a5a7617 100644 --- a/test/test_util_functions.py +++ b/test/test_util_functions.py @@ -621,25 +621,25 @@ def assert_open_cachefile_is_correctly_parsed(cache): try: # call process_cache without pre-existing cache - process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cachefile = cache + tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner) # check if file has been created assert os.path.isfile(cache) assert_open_cachefile_is_correctly_parsed(cache) - assert tuning_options.cachefile == cache assert isinstance(tuning_options.cache, dict) assert len(tuning_options.cache) == 0 # store one entry in the cache params = {"x": 4, "time": np.float32(0.1234)} - store_cache("4", params, tuning_options.cachefile, tuning_options.cache) + store_cache("4", params, cache, tuning_options.cache) assert len(tuning_options.cache) == 1 # close the cache close_cache(cache) # now test process cache with a pre-existing cache file - process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner) assert_open_cachefile_is_correctly_parsed(cache) assert tuning_options.cache["4"]["time"] == params["time"] @@ -648,7 +648,7 @@ def assert_open_cachefile_is_correctly_parsed(cache): # a different kernel, device, or parameter set with pytest.raises(ValueError) as excep: kernel_options.kernel_name = "wrong_kernel" - process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner) assert "kernel" in str(excep.value) # correct the kernel name from last test @@ -656,7 +656,7 @@ def assert_open_cachefile_is_correctly_parsed(cache): with pytest.raises(ValueError) as excep: runner.dev.name = "wrong_device" - process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner) assert "device" in str(excep.value) # correct the device from last test @@ -664,7 +664,7 @@ def assert_open_cachefile_is_correctly_parsed(cache): with pytest.raises(ValueError) as excep: tuning_options.tune_params["y"] = ["a", "b"] - process_cache(cache, kernel_options, tuning_options, runner) + tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner) assert "parameter" in str(excep.value) finally: From 4d8f4f53c50a3e5c76299e20badd252f8b427d74 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 20 Jan 2026 17:33:59 +0100 Subject: [PATCH 13/25] Extend several strategies with support for parallel tuning: DiffEvo, FF, GA, PSO, all hillclimbers, random --- kernel_tuner/strategies/bayes_opt.py | 27 ++++++++++++++++ kernel_tuner/strategies/common.py | 6 ++++ kernel_tuner/strategies/diff_evo.py | 4 +-- kernel_tuner/strategies/firefly_algorithm.py | 13 ++++---- kernel_tuner/strategies/genetic_algorithm.py | 20 ++++++------ kernel_tuner/strategies/hillclimbers.py | 28 ++++++++++------- kernel_tuner/strategies/pso.py | 33 +++++++++++--------- kernel_tuner/strategies/random_sample.py | 13 +++----- 8 files changed, 92 insertions(+), 52 deletions(-) diff --git a/kernel_tuner/strategies/bayes_opt.py b/kernel_tuner/strategies/bayes_opt.py index a814e7ce2..31a68ca89 100644 --- a/kernel_tuner/strategies/bayes_opt.py +++ b/kernel_tuner/strategies/bayes_opt.py @@ -455,6 +455,33 @@ def fit_observations_to_model(self): """Update the model based on the current list of observations.""" self.__model.fit(self.__valid_params, self.__valid_observations) + def evaluate_parallel_objective_function(self, param_configs: list[tuple]) -> list[float]: + """Evaluates the objective function for multiple configurations in parallel.""" + results = [] + valid_param_configs = [] + valid_indices = [] + + # Extract the valid configurations + for param_config in param_configs: + param_config = self.unprune_param_config(param_config) + denormalized_param_config = self.denormalize_param_config(param_config) + if not self.__searchspace_obj.is_param_config_valid(denormalized_param_config): + results.append(self.invalid_value) + else: + valid_indices.append(len(results)) + results.append(None) + valid_param_configs.append(param_config) + + # Run valid configurations in parallel + scores = self.cost_func.run_all(valid_param_configs) + + # Put the scores at the right location in the result + for idx, score in zip(valid_indices, scores): + results[idx] = score + + self.fevals += len(scores) + return results + def evaluate_objective_function(self, param_config: tuple) -> float: """Evaluates the objective function.""" param_config = self.unprune_param_config(param_config) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 9ffe999b7..e567d9981 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -103,6 +103,12 @@ def __init__( def __call__(self, x, check_restrictions=True): + return self.run_one(x, check_restrictions=check_restrictions) + + def run_all(self, xs, check_restrictions=True): + return [self.run_one(x, check_restrictions=check_restrictions) for x in xs] + + def run_one(self, x, check_restrictions=True): """Cost function used by almost all strategies.""" self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time) diff --git a/kernel_tuner/strategies/diff_evo.py b/kernel_tuner/strategies/diff_evo.py index d80b6e8e0..6350b7d9f 100644 --- a/kernel_tuner/strategies/diff_evo.py +++ b/kernel_tuner/strategies/diff_evo.py @@ -140,7 +140,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F, population[0] = cost_func.get_start_pos() # Calculate the initial cost for each individual in the population - population_cost = np.array([cost_func(ind) for ind in population]) + population_cost = np.array(cost_func.run_all(population)) # Keep track of the best solution found so far best_idx = np.argmin(population_cost) @@ -209,7 +209,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F, # --- c. Selection --- # Calculate the cost of the new trial vectors - trial_population_cost = np.array([cost_func(ind) for ind in trial_population]) + trial_population_cost = np.array(cost_func.run_all(trial_population)) # Keep track of whether population changes over time no_change = True diff --git a/kernel_tuner/strategies/firefly_algorithm.py b/kernel_tuner/strategies/firefly_algorithm.py index a732d4041..861c5f861 100644 --- a/kernel_tuner/strategies/firefly_algorithm.py +++ b/kernel_tuner/strategies/firefly_algorithm.py @@ -44,13 +44,14 @@ def tune(searchspace: Searchspace, runner, tuning_options): swarm[0].position = x0 # compute initial intensities - for j in range(num_particles): - try: + try: + for j in range(num_particles): swarm[j].compute_intensity(cost_func) - except StopCriterionReached as e: - if tuning_options.verbose: - print(e) - return cost_func.results + except StopCriterionReached as e: + if tuning_options.verbose: + print(e) + return cost_func.results + for j in range(num_particles): if swarm[j].score <= best_score_global: best_position_global = swarm[j].position best_score_global = swarm[j].score diff --git a/kernel_tuner/strategies/genetic_algorithm.py b/kernel_tuner/strategies/genetic_algorithm.py index 804758eef..230cfd49a 100644 --- a/kernel_tuner/strategies/genetic_algorithm.py +++ b/kernel_tuner/strategies/genetic_algorithm.py @@ -43,19 +43,17 @@ def tune(searchspace: Searchspace, runner, tuning_options): # determine fitness of population members weighted_population = [] - for dna in population: - try: - # if we are not constraint-aware we should check restrictions upon evaluation - time = cost_func(dna, check_restrictions=not constraint_aware) - num_evaluated += 1 - except StopCriterionReached as e: - if tuning_options.verbose: - print(e) - return cost_func.results - - weighted_population.append((dna, time)) + try: + # if we are not constraint-aware we should check restrictions upon evaluation + times = cost_func.run_all(population, check_restrictions=not constraint_aware) + num_evaluated += len(population) + except StopCriterionReached as e: + if tuning_options.verbose: + print(e) + return cost_func.results # population is sorted such that better configs have higher chance of reproducing + weighted_population = list(zip(population, times)) weighted_population.sort(key=lambda x: x[1]) # 'best_score' is used only for printing diff --git a/kernel_tuner/strategies/hillclimbers.py b/kernel_tuner/strategies/hillclimbers.py index ccd4eebf0..cc53d7db4 100644 --- a/kernel_tuner/strategies/hillclimbers.py +++ b/kernel_tuner/strategies/hillclimbers.py @@ -72,33 +72,39 @@ def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searc if randomize: random.shuffle(indices) + children = [] + # in each dimension see the possible values for index in indices: neighbors = searchspace.get_param_neighbors(tuple(child), index, neighbor_method, randomize) # for each value in this dimension for val in neighbors: - orig_val = child[index] + child = list(child) child[index] = val + children.append(child) + if restart: + for child in children: # get score for this position score = cost_func(child, check_restrictions=False) - # generalize this to other tuning objectives if score < best_score: best_score = score base_sol = child[:] found_improved = True - if restart: - break - else: - child[index] = orig_val + break + else: + # get score for all positions in parallel + scores = cost_func.run_all(children, check_restrictions=False) - fevals = len(tuning_options.unique_results) - if fevals >= max_fevals: - return base_sol + for child, score in zip(children, scores): + if score < best_score: + best_score = score + base_sol = child[:] + found_improved = True - if found_improved and restart: - break + if found_improved and restart: + break return base_sol diff --git a/kernel_tuner/strategies/pso.py b/kernel_tuner/strategies/pso.py index e8489d12a..4e38aa311 100644 --- a/kernel_tuner/strategies/pso.py +++ b/kernel_tuner/strategies/pso.py @@ -51,24 +51,26 @@ def tune(searchspace: Searchspace, runner, tuning_options): if tuning_options.verbose: print("start iteration ", i, "best time global", best_score_global) + try: + scores = cost_func.run_all([p.position for p in swarm]) + except StopCriterionReached as e: + if tuning_options.verbose: + print(e) + return cost_func.results + # evaluate particle positions - for j in range(num_particles): - try: - swarm[j].evaluate(cost_func) - except StopCriterionReached as e: - if tuning_options.verbose: - print(e) - return cost_func.results + for p, score in zip(swarm, scores): + p.set_score(score) # update global best if needed - if swarm[j].score <= best_score_global: - best_position_global = swarm[j].position - best_score_global = swarm[j].score + if score <= best_score_global: + best_position_global = p.position + best_score_global = score # update particle velocities and positions - for j in range(0, num_particles): - swarm[j].update_velocity(best_position_global, w, c1, c2) - swarm[j].update_position(bounds) + for p in swarm: + p.update_velocity(best_position_global, w, c1, c2) + p.update_position(bounds) if tuning_options.verbose: print("Final result:") @@ -92,7 +94,10 @@ def __init__(self, bounds): self.score = sys.float_info.max def evaluate(self, cost_func): - self.score = cost_func(self.position) + self.set_score(cost_func(self.position)) + + def set_score(self, score): + self.score = score # update best_pos if needed if self.score < self.best_score: self.best_pos = self.position diff --git a/kernel_tuner/strategies/random_sample.py b/kernel_tuner/strategies/random_sample.py index 33b5075d3..4efe86151 100644 --- a/kernel_tuner/strategies/random_sample.py +++ b/kernel_tuner/strategies/random_sample.py @@ -20,16 +20,13 @@ def tune(searchspace: Searchspace, runner, tuning_options): num_samples = min(tuning_options.max_fevals, searchspace.size) samples = searchspace.get_random_sample(num_samples) - cost_func = CostFunc(searchspace, tuning_options, runner) - for sample in samples: - try: - cost_func(sample, check_restrictions=False) - except StopCriterionReached as e: - if tuning_options.verbose: - print(e) - return cost_func.results + try: + cost_func.run_all(samples, check_restrictions=False) + except StopCriterionReached as e: + if tuning_options.verbose: + print(e) return cost_func.results From fd41333745477b0d8a0b0b3496894c27ccaf3b74 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 27 Jan 2026 10:11:01 +0100 Subject: [PATCH 14/25] Add `pcu_bus_id` to environment for Nvidia backends --- kernel_tuner/backends/cupy.py | 1 + kernel_tuner/backends/nvcuda.py | 1 + kernel_tuner/backends/pycuda.py | 1 + 3 files changed, 3 insertions(+) diff --git a/kernel_tuner/backends/cupy.py b/kernel_tuner/backends/cupy.py index 51613be7c..87ba1514c 100644 --- a/kernel_tuner/backends/cupy.py +++ b/kernel_tuner/backends/cupy.py @@ -73,6 +73,7 @@ def __init__(self, device=0, iterations=7, compiler_options=None, observers=None s.split(":")[0].strip(): s.split(":")[1].strip() for s in cupy_info } env["device_name"] = info_dict[f"Device {device} Name"] + env["pci_bus_id"] = info_dict[f"Device {device} PCI Bus ID"] env["cuda_version"] = cp.cuda.runtime.driverGetVersion() env["compute_capability"] = self.cc diff --git a/kernel_tuner/backends/nvcuda.py b/kernel_tuner/backends/nvcuda.py index 15259cb23..ef5e0bdf1 100644 --- a/kernel_tuner/backends/nvcuda.py +++ b/kernel_tuner/backends/nvcuda.py @@ -99,6 +99,7 @@ def __init__(self, device=0, iterations=7, compiler_options=None, observers=None cuda_error_check(err) env = dict() env["device_name"] = device_properties.name.decode() + env["pci_bus_id"] = device_properties.pciBusID env["cuda_version"] = cuda.CUDA_VERSION env["compute_capability"] = self.cc env["iterations"] = self.iterations diff --git a/kernel_tuner/backends/pycuda.py b/kernel_tuner/backends/pycuda.py index c8f3e689a..8f9326c2d 100644 --- a/kernel_tuner/backends/pycuda.py +++ b/kernel_tuner/backends/pycuda.py @@ -139,6 +139,7 @@ def _finish_up(): # collect environment information env = dict() env["device_name"] = self.context.get_device().name() + env["pci_bus_id"] = self.context.get_device().pci_bus_id() env["cuda_version"] = ".".join([str(i) for i in drv.get_version()]) env["compute_capability"] = self.cc env["iterations"] = self.iterations From 96e168d53c2a374584ef7bc4d02517969e84cba3 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 27 Jan 2026 10:25:06 +0100 Subject: [PATCH 15/25] Add support `eval_all` in `CostFunc` --- kernel_tuner/strategies/bayes_opt.py | 2 +- kernel_tuner/strategies/common.py | 154 +++++++++++-------- kernel_tuner/strategies/diff_evo.py | 4 +- kernel_tuner/strategies/genetic_algorithm.py | 2 +- kernel_tuner/strategies/hillclimbers.py | 2 +- kernel_tuner/strategies/pso.py | 2 +- kernel_tuner/strategies/random_sample.py | 2 +- 7 files changed, 98 insertions(+), 70 deletions(-) diff --git a/kernel_tuner/strategies/bayes_opt.py b/kernel_tuner/strategies/bayes_opt.py index 31a68ca89..64d4c6234 100644 --- a/kernel_tuner/strategies/bayes_opt.py +++ b/kernel_tuner/strategies/bayes_opt.py @@ -473,7 +473,7 @@ def evaluate_parallel_objective_function(self, param_configs: list[tuple]) -> li valid_param_configs.append(param_config) # Run valid configurations in parallel - scores = self.cost_func.run_all(valid_param_configs) + scores = self.cost_func.eval_all(valid_param_configs) # Put the scores at the right location in the result for idx, score in zip(valid_indices, scores): diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index e567d9981..3da151733 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -91,6 +91,7 @@ def __init__( self.tuning_options["max_fevals"] = min( tuning_options["max_fevals"] if "max_fevals" in tuning_options else np.inf, searchspace.size ) + self.constraint_aware = tuning_options.strategy_options.get("constraint_aware") self.runner = runner self.scaling = scaling self.snap = snap @@ -100,90 +101,117 @@ def __init__( self.return_raw = f"{tuning_options['objective']}s" self.results = [] self.budget_spent_fraction = 0.0 + + def _normalize_and_validate_config(self, x, check_restrictions=True): + # snap values in x to nearest actual value for each parameter, unscale x if needed + if not self.snap: + if self.scaling: + config = unscale_and_snap_to_nearest(x, self.searchspace.tune_params, self.tuning_options.eps) + else: + config = snap_to_nearest_config(x, self.searchspace.tune_params) + else: + config = x + is_legal = True - def __call__(self, x, check_restrictions=True): - return self.run_one(x, check_restrictions=check_restrictions) - - def run_all(self, xs, check_restrictions=True): - return [self.run_one(x, check_restrictions=check_restrictions) for x in xs] - - def run_one(self, x, check_restrictions=True): - """Cost function used by almost all strategies.""" + # else check if this is a legal (non-restricted) configuration + if check_restrictions: + is_legal = self.searchspace.is_param_config_valid(tuple(config)) + + # Attempt to repare the config + if not is_legal and self.constraint_aware: + # attempt to repair + new_config = unscale_and_snap_to_nearest_valid(x, config, self.searchspace, self.tuning_options.eps) + + if new_config: + config = new_config + is_legal = True + + return config, is_legal + + def _run_configs(self, xs, check_restrictions=True): + """ Takes a list of Euclidian coordinates and evaluates the configurations at those points. """ self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time) # error value to return for numeric optimizers that need a numerical value logging.debug("_cost_func called") - logging.debug("x: %s", str(x)) # check if max_fevals is reached or time limit is exceeded self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options) - # snap values in x to nearest actual value for each parameter, unscale x if needed - if self.snap: - if self.scaling: - params = unscale_and_snap_to_nearest(x, self.searchspace.tune_params, self.tuning_options.eps) + batch_configs = [] # The configs to run + batch_indices = [] # Where to store result in `final_results`` + final_results = [] # List returned to the user + + for x in xs: + config, is_legal = self._normalize_and_validate_config(x, check_restrictions=check_restrictions) + logging.debug("normalize config: %s -> %s (legal: %s)", str(x), str(config), is_legal) + + if is_legal: + batch_configs.append(config) + batch_indices.append(len(final_results)) + final_results.append(None) else: - params = snap_to_nearest_config(x, self.searchspace.tune_params) - else: - params = x - logging.debug("params %s", str(params)) + result = dict(zip(self.searchspace.tune_params.keys(), config)) + result[self.tuning_options.objective] = util.InvalidConfig() + final_results.append(result) - legal = True - result = {} - x_int = ",".join([str(i) for i in params]) + # compile and benchmark the batch + batch_results = self.runner.run(batch_configs, self.tuning_options) + self.results.extend(batch_results) - # else check if this is a legal (non-restricted) configuration - if check_restrictions and self.searchspace.restrictions: - legal = self.searchspace.is_param_config_valid(tuple(params)) - - - if not legal: - if "constraint_aware" in self.tuning_options.strategy_options and self.tuning_options.strategy_options["constraint_aware"]: - # attempt to repair - new_params = unscale_and_snap_to_nearest_valid(x, params, self.searchspace, self.tuning_options.eps) - if new_params: - params = new_params - legal = True - x_int = ",".join([str(i) for i in params]) - - if not legal: - params_dict = dict(zip(self.searchspace.tune_params.keys(), params)) - result = params_dict - result[self.tuning_options.objective] = util.InvalidConfig() - - if legal: - # compile and benchmark this instance - res = self.runner.run([params], self.tuning_options) - result = res[0] - - # append to tuning results + # set in the results array + for index, result in zip(batch_indices, batch_results): + final_results[index] = result + + # append to `unique_results` + for config, result in zip(batch_configs, batch_results): + x_int = ",".join([str(i) for i in config]) if x_int not in self.tuning_options.unique_results: self.tuning_options.unique_results[x_int] = result - self.results.append(result) + # upon returning from this function control will be given back to the strategy, so reset the start time + self.runner.last_strategy_start_time = perf_counter() + return final_results + + def eval_all(self, xs, check_restrictions=True): + """Cost function used by almost all strategies.""" + results = self._run_configs(xs, check_restrictions=check_restrictions) + return_values = [] + return_raws = [] + + for result in results: + # get numerical return value, taking optimization direction into account + return_value = result[self.tuning_options.objective] + + if not isinstance(return_value, util.ErrorConfig): + # this is a valid configuration, so invert value in case of maximization + if self.tuning_options.objective_higher_is_better: + return_value = -return_value + else: + # this is not a valid configuration, replace with float max if needed + if not self.return_invalid: + return_value = sys.float_info.max + + # include raw data in return if requested + if self.return_raw is not None: + try: + return_raws.append(result[self.return_raw]) + except KeyError: + return_raws.append([np.nan]) - # upon returning from this function control will be given back to the strategy, so reset the start time - self.runner.last_strategy_start_time = perf_counter() + return_values.append(return_value) - # get numerical return value, taking optimization direction into account - return_value = result[self.tuning_options.objective] - if not isinstance(return_value, util.ErrorConfig): - # this is a valid configuration, so invert value in case of maximization - return_value = -return_value if self.tuning_options.objective_higher_is_better else return_value + if self.return_raw is not None: + return return_values, return_raws else: - # this is not a valid configuration, replace with float max if needed - if not self.return_invalid: - return_value = sys.float_info.max + return return_values - # include raw data in return if requested - if self.return_raw is not None: - try: - return return_value, result[self.return_raw] - except KeyError: - return return_value, [np.nan] + def eval(self, x, check_restrictions=True): + return self.eval_all([x], check_restrictions=check_restrictions)[0] - return return_value + def __call__(self, x, check_restrictions=True): + return self.eval(x, check_restrictions=check_restrictions) def get_start_pos(self): """Get starting position for optimization.""" diff --git a/kernel_tuner/strategies/diff_evo.py b/kernel_tuner/strategies/diff_evo.py index 6350b7d9f..9f7986e60 100644 --- a/kernel_tuner/strategies/diff_evo.py +++ b/kernel_tuner/strategies/diff_evo.py @@ -140,7 +140,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F, population[0] = cost_func.get_start_pos() # Calculate the initial cost for each individual in the population - population_cost = np.array(cost_func.run_all(population)) + population_cost = np.array(cost_func.eval_all(population)) # Keep track of the best solution found so far best_idx = np.argmin(population_cost) @@ -209,7 +209,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F, # --- c. Selection --- # Calculate the cost of the new trial vectors - trial_population_cost = np.array(cost_func.run_all(trial_population)) + trial_population_cost = np.array(cost_func.eval_all(trial_population)) # Keep track of whether population changes over time no_change = True diff --git a/kernel_tuner/strategies/genetic_algorithm.py b/kernel_tuner/strategies/genetic_algorithm.py index 230cfd49a..fa1f6cc98 100644 --- a/kernel_tuner/strategies/genetic_algorithm.py +++ b/kernel_tuner/strategies/genetic_algorithm.py @@ -45,7 +45,7 @@ def tune(searchspace: Searchspace, runner, tuning_options): weighted_population = [] try: # if we are not constraint-aware we should check restrictions upon evaluation - times = cost_func.run_all(population, check_restrictions=not constraint_aware) + times = cost_func.eval_all(population, check_restrictions=not constraint_aware) num_evaluated += len(population) except StopCriterionReached as e: if tuning_options.verbose: diff --git a/kernel_tuner/strategies/hillclimbers.py b/kernel_tuner/strategies/hillclimbers.py index cc53d7db4..1218ed03e 100644 --- a/kernel_tuner/strategies/hillclimbers.py +++ b/kernel_tuner/strategies/hillclimbers.py @@ -96,7 +96,7 @@ def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searc break else: # get score for all positions in parallel - scores = cost_func.run_all(children, check_restrictions=False) + scores = cost_func.eval_all(children, check_restrictions=False) for child, score in zip(children, scores): if score < best_score: diff --git a/kernel_tuner/strategies/pso.py b/kernel_tuner/strategies/pso.py index 4e38aa311..eefbc8661 100644 --- a/kernel_tuner/strategies/pso.py +++ b/kernel_tuner/strategies/pso.py @@ -52,7 +52,7 @@ def tune(searchspace: Searchspace, runner, tuning_options): print("start iteration ", i, "best time global", best_score_global) try: - scores = cost_func.run_all([p.position for p in swarm]) + scores = cost_func.eval_all([p.position for p in swarm]) except StopCriterionReached as e: if tuning_options.verbose: print(e) diff --git a/kernel_tuner/strategies/random_sample.py b/kernel_tuner/strategies/random_sample.py index 4efe86151..194401491 100644 --- a/kernel_tuner/strategies/random_sample.py +++ b/kernel_tuner/strategies/random_sample.py @@ -23,7 +23,7 @@ def tune(searchspace: Searchspace, runner, tuning_options): cost_func = CostFunc(searchspace, tuning_options, runner) try: - cost_func.run_all(samples, check_restrictions=False) + cost_func.eval_all(samples, check_restrictions=False) except StopCriterionReached as e: if tuning_options.verbose: print(e) From d7129cdd9b593c2685a2841693be00eb7989b434 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 27 Jan 2026 10:28:55 +0100 Subject: [PATCH 16/25] Remove `return_raw` from `CostFunc` as it is unused --- kernel_tuner/strategies/common.py | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 3da151733..413e74877 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -72,7 +72,6 @@ def __init__( scaling=False, snap=True, return_invalid=False, - return_raw=None, ): """An abstract method to handle evaluation of configurations. @@ -83,7 +82,6 @@ def __init__( scaling: whether to internally scale parameter values. Defaults to False. snap: whether to snap given configurations to their closests equivalent in the space. Defaults to True. return_invalid: whether to return the util.ErrorConfig of an invalid configuration. Defaults to False. - return_raw: returns (result, results[raw]). Key inferred from objective if set to True. Defaults to None. """ self.searchspace = searchspace self.tuning_options = tuning_options @@ -91,14 +89,13 @@ def __init__( self.tuning_options["max_fevals"] = min( tuning_options["max_fevals"] if "max_fevals" in tuning_options else np.inf, searchspace.size ) - self.constraint_aware = tuning_options.strategy_options.get("constraint_aware") + self.objective = tuning_options.objective + self.objective_higher_is_better = tuning_options.objective_higher_is_better + self.constraint_aware = bool(tuning_options.strategy_options.get("constraint_aware")) self.runner = runner self.scaling = scaling self.snap = snap self.return_invalid = return_invalid - self.return_raw = return_raw - if return_raw is True: - self.return_raw = f"{tuning_options['objective']}s" self.results = [] self.budget_spent_fraction = 0.0 @@ -153,7 +150,7 @@ def _run_configs(self, xs, check_restrictions=True): final_results.append(None) else: result = dict(zip(self.searchspace.tune_params.keys(), config)) - result[self.tuning_options.objective] = util.InvalidConfig() + result[self.objective] = util.InvalidConfig() final_results.append(result) # compile and benchmark the batch @@ -178,15 +175,14 @@ def eval_all(self, xs, check_restrictions=True): """Cost function used by almost all strategies.""" results = self._run_configs(xs, check_restrictions=check_restrictions) return_values = [] - return_raws = [] for result in results: # get numerical return value, taking optimization direction into account - return_value = result[self.tuning_options.objective] + return_value = result[self.objective] if not isinstance(return_value, util.ErrorConfig): # this is a valid configuration, so invert value in case of maximization - if self.tuning_options.objective_higher_is_better: + if self.objective_higher_is_better: return_value = -return_value else: # this is not a valid configuration, replace with float max if needed @@ -194,18 +190,9 @@ def eval_all(self, xs, check_restrictions=True): return_value = sys.float_info.max # include raw data in return if requested - if self.return_raw is not None: - try: - return_raws.append(result[self.return_raw]) - except KeyError: - return_raws.append([np.nan]) - return_values.append(return_value) - if self.return_raw is not None: - return return_values, return_raws - else: - return return_values + return return_values def eval(self, x, check_restrictions=True): return self.eval_all([x], check_restrictions=check_restrictions)[0] From 57fd617297aed461f3ac0782a30f068ca9756219 Mon Sep 17 00:00:00 2001 From: stijn Date: Tue, 27 Jan 2026 10:31:08 +0100 Subject: [PATCH 17/25] Fix timings and handling of duplicate jobs in parallel runner --- kernel_tuner/runners/parallel.py | 246 ++++++++++++++++++++----------- kernel_tuner/runners/runner.py | 4 + 2 files changed, 160 insertions(+), 90 deletions(-) diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py index a05fc2fa5..a14a30823 100644 --- a/kernel_tuner/runners/parallel.py +++ b/kernel_tuner/runners/parallel.py @@ -1,4 +1,5 @@ """A specialized runner that tunes in parallel the parameter space.""" +from collections import deque import logging import socket from time import perf_counter @@ -18,17 +19,18 @@ @ray.remote(num_gpus=1) class DeviceActor: - def __init__(self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers): + def __init__( + self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers + ): # detect language and create high-level device interface - self.dev = DeviceInterface(kernel_source, iterations=iterations, observers=observers, **device_options) + self.dev = DeviceInterface( + kernel_source, iterations=iterations, observers=observers, **device_options + ) self.units = self.dev.units self.quiet = device_options.quiet self.kernel_source = kernel_source self.warmed_up = False if self.dev.requires_warmup else True - self.start_time = perf_counter() - self.last_strategy_start_time = self.start_time - self.last_strategy_time = 0 self.kernel_options = kernel_options self.tuning_options = tuning_options @@ -55,11 +57,8 @@ def get_environment(self): return env - def run(self, element): + def run(self, key, element): # TODO: logging.debug("sequential runner started for " + self.kernel_options.kernel_name) - objective = self.tuning_options.objective - metrics = self.tuning_options.metrics - params = dict(element) result = None warmup_time = 0 @@ -77,41 +76,19 @@ def run(self, element): self.kernel_source, self.gpu_args, params, self.kernel_options, self.tuning_options ) - if isinstance(result.get(objective), ErrorConfig): - logging.debug("kernel configuration was skipped silently due to compile or runtime failure") - params.update(result) - # only compute metrics on configs that have not errored - if metrics and not isinstance(params.get(objective), ErrorConfig): - params = process_metrics(params, metrics) - - # get the framework time by estimating based on other times - total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) - params["strategy_time"] = self.last_strategy_time - params["framework_time"] = max( - total_time - - ( - params["compile_time"] - + params["verification_time"] - + params["benchmark_time"] - + params["strategy_time"] - ), - 0, - ) - params["timestamp"] = datetime.now(timezone.utc).isoformat() params["ray_actor_id"] = ray.get_runtime_context().get_actor_id() params["host_name"] = socket.gethostname() - self.start_time = perf_counter() - # all visited configurations are added to results to provide a trace for optimization strategies - return params + return key, params class DeviceActorState: - def __init__(self, actor): + def __init__(self, index, actor): + self.index = index self.actor = actor self.running_jobs = [] self.maximum_running_jobs = 1 @@ -121,7 +98,7 @@ def __init__(self, actor): def __repr__(self): actor_id = self.env["ray"]["actor_id"] host_name = self.env["host_name"] - return f"{actor_id} ({host_name})" + return f"{self.index} ({host_name}, {actor_id})" def shutdown(self): if not self.is_running: @@ -134,26 +111,41 @@ def shutdown(self): except Exception: logger.exception("Failed to request actor shutdown: %s", self) - def submit(self, config): - logger.info(f"jobs submitted to worker {self}: {config}") - job = self.actor.run.remote(config) + def submit(self, key, config): + logger.info(f"job submitted to worker {self}: {key}") + job = self.actor.run.remote(key, config) self.running_jobs.append(job) return job - + def is_available(self): if not self.is_running: return False # Check for ready jobs, but do not block ready_jobs, self.running_jobs = ray.wait(self.running_jobs, timeout=0) - ray.get(ready_jobs) + + for job in ready_jobs: + try: + key, _result = ray.get(job) + logger.info(f"job finished on worker {self}: {key}") + except Exception: + logger.exception(f"job failed on worker {self}") # Available if this actor can now run another job return len(self.running_jobs) < self.maximum_running_jobs class ParallelRunner(Runner): - def __init__(self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers, num_workers=None): + def __init__( + self, + kernel_source, + kernel_options, + device_options, + tuning_options, + iterations, + observers, + num_workers=None, + ): if not ray.is_initialized(): ray.init() @@ -164,97 +156,171 @@ def __init__(self, kernel_source, kernel_options, device_options, tuning_options raise RuntimeError("failed to initialize parallel runner: no GPUs found") if num_workers < 1: - raise RuntimeError(f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}") + raise RuntimeError( + f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}" + ) self.workers = [] try: + # Start workers for index in range(num_workers): - actor = DeviceActor.remote(kernel_source, kernel_options, device_options, tuning_options, iterations, observers) - worker = DeviceActorState(actor) + actor = DeviceActor.remote( + kernel_source, + kernel_options, + device_options, + tuning_options, + iterations, + observers, + ) + worker = DeviceActorState(index, actor) self.workers.append(worker) - logger.info(f"launched worker {index}: {worker}") + logger.info(f"connected to worker {worker}") + + # Check if all workers have the same device + device_names = {w.env.get("device_name") for w in self.workers} + if len(device_names) != 1: + raise RuntimeError( + f"failed to initialize parallel runner: workers have different devices: {sorted(device_names)}" + ) except: - # If an exception occurs, shut down the worker + # If an exception occurs, shut down the worker and reraise error self.shutdown() raise - # Check if all workers have the same device - device_names = {w.env.get("device_name") for w in self.workers} - if len(device_names) != 1: - self.shutdown() - raise RuntimeError( - f"failed to initialize parallel runner: workers have different devices: {sorted(device_names)}" - ) - self.device_name = device_names.pop() - # TODO: Get this from the device + # TODO: Get units from the device? + self.start_time = perf_counter() self.units = {"time": "ms"} self.quiet = device_options.quiet def get_device_info(self): + # TODO: Get this from the device? return Options({"max_threads": 1024}) def get_environment(self, tuning_options): - return { - "device_name": self.device_name, - "workers": [w.env for w in self.workers] - } + return {"device_name": self.device_name, "workers": [w.env for w in self.workers]} def shutdown(self): for worker in self.workers: try: worker.shutdown() except Exception as err: - logger.exception("error while shutting down worker {worker}") + logger.exception(f"error while shutting down worker {worker}") - def submit_job(self, *args): - while True: - # Round-robin: first available worker gets the job and goes to the back of the list - for i, worker in enumerate(list(self.workers)): - if worker.is_available(): - self.workers.pop(i) - self.workers.append(worker) - return worker.submit(*args) + def available_parallelism(self): + return len(self.workers) - # Gather all running jobs - running_jobs = [job for w in self.workers for job in w.running_jobs] + def submit_jobs(self, jobs): + pending_jobs = deque(jobs) + running_jobs = [] - # If there are no running jobs, then something must be wrong. - # Maybe a worker has crashed or gotten into an invalid state. - if not running_jobs: - raise RuntimeError("invalid state: no Ray workers are available to run job") + while pending_jobs or running_jobs: + should_wait = True - # Wait until any running job completes - ray.wait(running_jobs, num_returns=1) + # If there is still work left, submit it now + if pending_jobs: + for i, worker in enumerate(list(self.workers)): + if worker.is_available(): + # Push worker to back of list + self.workers.pop(i) + self.workers.append(worker) + + # Pop job and submit it + job = pending_jobs.popleft() + ref = worker.submit(*job) + running_jobs.append(ref) + + should_wait = False + break + + # If no work was submitted, wait until a worker is available + if should_wait: + if not running_jobs: + raise RuntimeError("invalid state: no ray workers available") + + ready_jobs, running_jobs = ray.wait(running_jobs, num_returns=1) + + for result in ready_jobs: + yield ray.get(result) def run(self, parameter_space, tuning_options): - running_jobs = dict() - completed_jobs = dict() + metrics = tuning_options.metrics + objective = tuning_options.objective - # Submit jobs which are not in the cache - for config in parameter_space: + jobs = [] # Jobs that need to be executed + results = [] # Results that will be returned at the end + key2index = dict() # Used to insert job result back into `results` + duplicate_entries = [] # Used for duplicate entries in `parameter_space` + + # Select jobs which are not in the cache + for index, config in enumerate(parameter_space): params = dict(zip(tuning_options.tune_params.keys(), config)) key = ",".join([str(i) for i in config]) if key in tuning_options.cache: - completed_jobs[key] = tuning_options.cache[key] + params.update(tuning_options.cache[key]) + params["compile_time"] = 0 + params["verification_time"] = 0 + params["benchmark_time"] = 0 + results.append(params) else: - assert key not in running_jobs - running_jobs[key] = self.submit_job(params) - completed_jobs[key] = None + if key not in key2index: + key2index[key] = index + else: + duplicate_entries.append((key2index[key], index)) + + jobs.append((key, params)) + results.append(None) + + total_worker_time = 0 + + # Submit jobs and wait for them to finish + for key, result in self.submit_jobs(jobs): + results[key2index[key]] = result + + # Collect total time spent by worker + total_worker_time += ( + params["compile_time"] + params["verification_time"] + params["benchmark_time"] + ) - # Wait for the running jobs to finish - for key, job in running_jobs.items(): - result = ray.get(job) - completed_jobs[key] = result + if isinstance(result.get(objective), ErrorConfig): + logging.error( + "kernel configuration {key} was skipped silently due to compile or runtime failure", + key, + ) # print configuration to the console - print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units) + print_config_output( + tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units + ) # add configuration to cache store_cache(key, result, tuning_options.cachefile, tuning_options.cache) - return list(completed_jobs.values()) + # Copy each `i` to `j` for every `i,j` in `duplicate_entries` + for i, j in duplicate_entries: + results[j] = dict(results[i]) + + total_time = 1000 * (perf_counter() - self.start_time) + self.start_time = perf_counter() + + strategy_time = self.last_strategy_time + self.last_strategy_time = 0 + + runner_time = total_time - strategy_time + framework_time = max(runner_time * len(self.workers) - total_worker_time, 0) + + # Post-process all the results + for params in results: + # Amortize the time over all the results + params["strategy_time"] = strategy_time / len(results) + params["framework_time"] = framework_time / len(results) + + # only compute metrics on configs that have not errored + if metrics and not isinstance(params.get(objective), ErrorConfig): + params = process_metrics(params, metrics) + + return results diff --git a/kernel_tuner/runners/runner.py b/kernel_tuner/runners/runner.py index 3a886ad16..e95b7811c 100644 --- a/kernel_tuner/runners/runner.py +++ b/kernel_tuner/runners/runner.py @@ -16,6 +16,10 @@ def __init__( def shutdown(self): pass + def available_parallelism(self): + """ Gives an indication of how many jobs this runner can execute in parallel. """ + return 1 + @abstractmethod def get_device_info(self): pass From e1259b1a2cd943953636bb56d156e70c8560d95a Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 16:06:31 +0000 Subject: [PATCH 18/25] fix bug for continuous optimization --- kernel_tuner/strategies/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 413e74877..898835f79 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -101,7 +101,7 @@ def __init__( def _normalize_and_validate_config(self, x, check_restrictions=True): # snap values in x to nearest actual value for each parameter, unscale x if needed - if not self.snap: + if self.snap: if self.scaling: config = unscale_and_snap_to_nearest(x, self.searchspace.tune_params, self.tuning_options.eps) else: From 72bfe945f3be2d1f6c52ee4a799903300270a848 Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 16:07:29 +0000 Subject: [PATCH 19/25] fix test_time_keeping test ensuring at least two GA generations --- test/test_runners.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_runners.py b/test/test_runners.py index 3a0a26e22..de15352b4 100644 --- a/test/test_runners.py +++ b/test/test_runners.py @@ -163,8 +163,8 @@ def test_time_keeping(env): answer = [args[1] + args[2], None, None, None] options = dict(method="uniform", - popsize=10, - maxiter=1, + popsize=5, + maxiter=50, mutation_chance=1, max_fevals=10) start = time.perf_counter() From cc6bb9781eae5db28ce8942de034c39b6b30a6cc Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 16:08:21 +0000 Subject: [PATCH 20/25] fix tests needing more context for tuning_options --- test/strategies/test_bayesian_optimization.py | 2 ++ test/test_common.py | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/test/strategies/test_bayesian_optimization.py b/test/strategies/test_bayesian_optimization.py index f8c889aab..5c7bab322 100644 --- a/test/strategies/test_bayesian_optimization.py +++ b/test/strategies/test_bayesian_optimization.py @@ -19,6 +19,8 @@ strategy_options = dict(popsize=0, max_fevals=10) tuning_options = Options(dict(restrictions=[], tune_params=tune_params, strategy_options=strategy_options)) tuning_options["scaling"] = True +tuning_options["objective"] = "time" +tuning_options["objective_higher_is_better"] = False tuning_options["snap"] = True max_threads = 1024 searchspace = Searchspace(tune_params, [], max_threads) diff --git a/test/test_common.py b/test/test_common.py index 7c1bd6838..40d25de81 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -1,20 +1,26 @@ import random import numpy as np +import pytest import kernel_tuner.strategies.common as common from kernel_tuner.interface import Options from kernel_tuner.searchspace import Searchspace +@pytest.fixture +def tuning_options(): + tuning_options = Options() + tuning_options["strategy_options"] = {} + tuning_options["objective"] = "time" + tuning_options["objective_higher_is_better"] = False + return tuning_options + -def test_get_bounds_x0_eps(): +def test_get_bounds_x0_eps(tuning_options): tune_params = dict() tune_params['x'] = [0, 1, 2, 3, 4] searchspace = Searchspace(tune_params, [], 1024) - tuning_options = Options() - tuning_options["strategy_options"] = {} - bounds, x0, eps = common.CostFunc(searchspace, tuning_options, None, scaling=True).get_bounds_x0_eps() assert bounds == [(0.0, 1.0)] @@ -27,7 +33,7 @@ def test_get_bounds_x0_eps(): assert eps == 1.0 -def test_get_bounds(): +def test_get_bounds(tuning_options): tune_params = dict() tune_params['x'] = [0, 1, 2, 3, 4] @@ -39,7 +45,7 @@ def test_get_bounds(): expected = [(0, 4), (0, 9900), (-11.2, 123.27)] searchspace = Searchspace(tune_params, None, 1024) - cost_func = common.CostFunc(searchspace, None, None) + cost_func = common.CostFunc(searchspace, tuning_options, None) answer = cost_func.get_bounds() assert answer == expected From ce8123a04129e6052103f1cdb0362c2aef106839 Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 18:36:27 +0000 Subject: [PATCH 21/25] do not count invalid for unique_results and avoid overshooting budget --- kernel_tuner/strategies/common.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 898835f79..77838828d 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -139,6 +139,7 @@ def _run_configs(self, xs, check_restrictions=True): batch_configs = [] # The configs to run batch_indices = [] # Where to store result in `final_results`` final_results = [] # List returned to the user + benchmark_config = [] for x in xs: config, is_legal = self._normalize_and_validate_config(x, check_restrictions=check_restrictions) @@ -148,10 +149,24 @@ def _run_configs(self, xs, check_restrictions=True): batch_configs.append(config) batch_indices.append(len(final_results)) final_results.append(None) + x_int = ",".join([str(i) for i in config]) + benchmark_config.append(x_int not in self.tuning_options.unique_results) else: result = dict(zip(self.searchspace.tune_params.keys(), config)) result[self.objective] = util.InvalidConfig() final_results.append(result) + benchmark_config.append(False) + + # do not overshoot max_fevals if we can avoid it + if "max_fevals" in self.tuning_options: + budget = self.tuning_options.max_fevals - len(self.tuning_options.unique_results) + if sum(benchmark_config) > budget: + # find index 'budget'th True value (+1 for including this last index) + last_index = benchmark_config.index(True, budget)+1 + # mask configs we cannot benchmark + batch_configs = batch_configs[:last_index] + batch_indices = batch_indices[:last_index] + final_results = final_results[:last_index] # compile and benchmark the batch batch_results = self.runner.run(batch_configs, self.tuning_options) @@ -162,10 +177,14 @@ def _run_configs(self, xs, check_restrictions=True): final_results[index] = result # append to `unique_results` - for config, result in zip(batch_configs, batch_results): - x_int = ",".join([str(i) for i in config]) - if x_int not in self.tuning_options.unique_results: - self.tuning_options.unique_results[x_int] = result + for config, result, benchmarked in zip(batch_configs, batch_results, benchmark_config): + if benchmarked: + x_int = ",".join([str(i) for i in config]) + if x_int not in self.tuning_options.unique_results: + self.tuning_options.unique_results[x_int] = result + + # check again for stop condition + self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options) # upon returning from this function control will be given back to the strategy, so reset the start time self.runner.last_strategy_start_time = perf_counter() From f6c63bff630b61d1f12dcb032b3b8045ea06de46 Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 22:23:15 +0000 Subject: [PATCH 22/25] fix for not overshooting/undershooting budget --- kernel_tuner/strategies/common.py | 15 ++++++++++----- test/strategies/test_strategies.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 77838828d..83793c256 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -123,9 +123,10 @@ def _normalize_and_validate_config(self, x, check_restrictions=True): if new_config: config = new_config is_legal = True - + return config, is_legal + def _run_configs(self, xs, check_restrictions=True): """ Takes a list of Euclidian coordinates and evaluates the configurations at those points. """ self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time) @@ -137,7 +138,7 @@ def _run_configs(self, xs, check_restrictions=True): self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options) batch_configs = [] # The configs to run - batch_indices = [] # Where to store result in `final_results`` + batch_indices = [] # Where to store result in `final_results`` final_results = [] # List returned to the user benchmark_config = [] @@ -155,14 +156,13 @@ def _run_configs(self, xs, check_restrictions=True): result = dict(zip(self.searchspace.tune_params.keys(), config)) result[self.objective] = util.InvalidConfig() final_results.append(result) - benchmark_config.append(False) # do not overshoot max_fevals if we can avoid it if "max_fevals" in self.tuning_options: budget = self.tuning_options.max_fevals - len(self.tuning_options.unique_results) if sum(benchmark_config) > budget: - # find index 'budget'th True value (+1 for including this last index) - last_index = benchmark_config.index(True, budget)+1 + # find index 'budget'th True value + last_index = _get_nth_true(benchmark_config, budget)+1 # mask configs we cannot benchmark batch_configs = batch_configs[:last_index] batch_indices = batch_indices[:last_index] @@ -272,6 +272,11 @@ def get_bounds(self): return bounds +def _get_nth_true(lst, n): + # Returns the index of the nth True value in a list + return [i for i, x in enumerate(lst) if x][n-1] + + def setup_method_arguments(method, bounds): """Prepare method specific arguments.""" kwargs = {} diff --git a/test/strategies/test_strategies.py b/test/strategies/test_strategies.py index ea5a2994d..8f77a9516 100644 --- a/test/strategies/test_strategies.py +++ b/test/strategies/test_strategies.py @@ -96,7 +96,7 @@ def test_strategies(vector_add, strategy): tune_params = vector_add[-1] unique_results = {} for result in results: - x_int = ",".join([str(v) for k, v in result.items() if k in tune_params]) + x_int = ",".join([str(v) for k, v in result.items() if k in tune_params.keys()]) if not isinstance(result["time"], InvalidConfig): unique_results[x_int] = result["time"] assert len(unique_results) <= filter_options["max_fevals"] From ce7e330d2926e850678d1d869f25599744f9a697 Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Thu, 29 Jan 2026 22:24:27 +0000 Subject: [PATCH 23/25] fix time accounting when using batched costfunc --- kernel_tuner/runners/simulation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index b369b85a0..f09772635 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -117,6 +117,10 @@ def run(self, parameter_space, tuning_options): # self.last_strategy_time is set by cost_func result["strategy_time"] = self.last_strategy_time + # Reset last strategy time to avoid counting it more than once in the + # framework time when strategy requests multiple configs at once + self.last_strategy_time = 0 + try: simulated_time = result["compile_time"] + result["verification_time"] + result["benchmark_time"] tuning_options.simulated_time += simulated_time @@ -128,7 +132,7 @@ def run(self, parameter_space, tuning_options): total_time = 1000 * (perf_counter() - self.start_time) self.start_time = perf_counter() - result["framework_time"] = total_time - self.last_strategy_time + result["framework_time"] = total_time - result["strategy_time"] results.append(result) continue From faf6cdcc19dbfad6733d09dd63b078e0ec7c6a3c Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Fri, 30 Jan 2026 08:10:33 +0000 Subject: [PATCH 24/25] fix timing issues --- kernel_tuner/runners/sequential.py | 11 ++++++++--- kernel_tuner/runners/simulation.py | 21 +++++++++++---------- kernel_tuner/strategies/common.py | 3 ++- kernel_tuner/util.py | 12 +++++++++++- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index 2bd554bfc..7e9d76934 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -5,7 +5,7 @@ from kernel_tuner.core import DeviceInterface from kernel_tuner.runners.runner import Runner -from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache +from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache, stop_criterion_reached class SequentialRunner(Runner): @@ -65,10 +65,16 @@ def run(self, parameter_space, tuning_options): results = [] + # self.last_strategy_time is set by cost_func + strategy_time_per_config = self.last_strategy_time / len(parameter_space) if len(parameter_space) > 0 else 0 + # iterate over parameter space for element in parameter_space: params = dict(zip(tuning_options.tune_params.keys(), element)) + if stop_criterion_reached(tuning_options): + return results + result = None warmup_time = 0 @@ -104,14 +110,13 @@ def run(self, parameter_space, tuning_options): # get the framework time by estimating based on other times total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) - params["strategy_time"] = self.last_strategy_time + params["strategy_time"] = strategy_time_per_config params["framework_time"] = max( total_time - ( params["compile_time"] + params["verification_time"] + params["benchmark_time"] - + params["strategy_time"] ), 0, ) diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index f09772635..986bf9b25 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -83,8 +83,15 @@ def run(self, parameter_space, tuning_options): results = [] + # self.last_strategy_time is set by cost_func + strategy_time_per_config = self.last_strategy_time / len(parameter_space) if len(parameter_space) > 0 else 0 + # iterate over parameter space for element in parameter_space: + + if util.stop_criterion_reached(tuning_options): + return results + # check if element is in the cache x_int = ",".join([str(i) for i in element]) if tuning_options.cache and x_int in tuning_options.cache: @@ -94,7 +101,6 @@ def run(self, parameter_space, tuning_options): if tuning_options.metrics and not isinstance(result.get(tuning_options.objective), util.ErrorConfig): result = util.process_metrics(result, tuning_options.metrics) - # Simulate behavior of sequential runner that when a configuration is # served from the cache by the sequential runner, the compile_time, # verification_time, and benchmark_time are set to 0. @@ -114,12 +120,7 @@ def run(self, parameter_space, tuning_options): ) # Everything but the strategy time and framework time are simulated, - # self.last_strategy_time is set by cost_func - result["strategy_time"] = self.last_strategy_time - - # Reset last strategy time to avoid counting it more than once in the - # framework time when strategy requests multiple configs at once - self.last_strategy_time = 0 + result["strategy_time"] = strategy_time_per_config try: simulated_time = result["compile_time"] + result["verification_time"] + result["benchmark_time"] @@ -132,7 +133,7 @@ def run(self, parameter_space, tuning_options): total_time = 1000 * (perf_counter() - self.start_time) self.start_time = perf_counter() - result["framework_time"] = total_time - result["strategy_time"] + result["framework_time"] = total_time results.append(result) continue @@ -145,11 +146,11 @@ def run(self, parameter_space, tuning_options): result['compile_time'] = 0 result['verification_time'] = 0 result['benchmark_time'] = 0 - result['strategy_time'] = self.last_strategy_time + result['strategy_time'] = strategy_time_per_config total_time = 1000 * (perf_counter() - self.start_time) self.start_time = perf_counter() - result['framework_time'] = total_time - self.last_strategy_time + result['framework_time'] = total_time result[tuning_options.objective] = util.InvalidConfig() results.append(result) diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index 83793c256..ab3929bf9 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -130,6 +130,7 @@ def _normalize_and_validate_config(self, x, check_restrictions=True): def _run_configs(self, xs, check_restrictions=True): """ Takes a list of Euclidian coordinates and evaluates the configurations at those points. """ self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time) + self.runner.start_time = perf_counter() # start framework time # error value to return for numeric optimizers that need a numerical value logging.debug("_cost_func called") @@ -166,7 +167,7 @@ def _run_configs(self, xs, check_restrictions=True): # mask configs we cannot benchmark batch_configs = batch_configs[:last_index] batch_indices = batch_indices[:last_index] - final_results = final_results[:last_index] + final_results = final_results[:batch_indices[-1]+1] # compile and benchmark the batch batch_results = self.runner.run(batch_configs, self.tuning_options) diff --git a/kernel_tuner/util.py b/kernel_tuner/util.py index 635c6de78..7abd7a60d 100644 --- a/kernel_tuner/util.py +++ b/kernel_tuner/util.py @@ -213,8 +213,18 @@ def check_stop_criterion(to: dict) -> float: if time_spent > to.time_limit: raise StopCriterionReached("time limit exceeded") return time_spent / to.time_limit - +def stop_criterion_reached(to: dict) -> bool: + """Returns True if stop criterion has been reached""" + res = False + if "max_fevals" in to: + if len(to.unique_results) >= to.max_fevals: + res = True + if "time_limit" in to: + time_spent = (time.perf_counter() - to.start_time) + (to.simulated_time * 1e-3) + to.startup_time + if time_spent > to.time_limit: + res |= True + return res def check_tune_params_list(tune_params, observers, simulation_mode=False): """Raise an exception if a tune parameter has a forbidden name.""" From ec30052a330f11e3466772b52704cfef5df73861 Mon Sep 17 00:00:00 2001 From: Ben van Werkhoven Date: Fri, 30 Jan 2026 09:10:38 +0000 Subject: [PATCH 25/25] fix budget overshoot issue for sequential tuning --- kernel_tuner/runners/sequential.py | 2 ++ kernel_tuner/runners/simulation.py | 2 ++ kernel_tuner/strategies/common.py | 2 ++ test/test_runners.py | 1 + 4 files changed, 7 insertions(+) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index 7e9d76934..4b7805946 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -132,5 +132,7 @@ def run(self, parameter_space, tuning_options): # all visited configurations are added to results to provide a trace for optimization strategies results.append(params) + if x_int not in tuning_options.unique_results: + tuning_options.unique_results[x_int] = result return results diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index 986bf9b25..659de3fe7 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -136,6 +136,8 @@ def run(self, parameter_space, tuning_options): result["framework_time"] = total_time results.append(result) + if x_int not in tuning_options.unique_results: + tuning_options.unique_results[x_int] = result continue # if the configuration is not in the cache and not within restrictions, simulate an InvalidConfig with warning diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index baff0f14e..f6375cf78 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -188,6 +188,8 @@ def _run_configs(self, xs, check_restrictions=True): self.tuning_options.unique_results[x_int] = result # check again for stop condition + # this check is necessary because some strategies cannot handle partially completed requests + # for example when only half of the configs in a population have been evaluated self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options) # upon returning from this function control will be given back to the strategy, so reset the start time diff --git a/test/test_runners.py b/test/test_runners.py index de15352b4..609ccad32 100644 --- a/test/test_runners.py +++ b/test/test_runners.py @@ -287,6 +287,7 @@ def test_runner(env): device_options = Options([(k, opts.get(k, None)) for k in _device_options.keys()]) tuning_options.cachefile = None + tuning_options.unique_results = {} # create runner runner = SequentialRunner(kernelsource,