From 066d614f097848d2d3e41438275db6fb237a181f Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Sun, 29 Jan 2023 10:36:05 +0100 Subject: [PATCH 01/13] wip: leftover enhancements --- sklearn_numba_dpex/common/_utils.py | 11 +- sklearn_numba_dpex/common/kernels.py | 237 ++++++++---------- .../common/tests/test_kernels.py | 11 +- sklearn_numba_dpex/kmeans/drivers.py | 75 +++--- 4 files changed, 148 insertions(+), 186 deletions(-) diff --git a/sklearn_numba_dpex/common/_utils.py b/sklearn_numba_dpex/common/_utils.py index 9c3cb8b..ad0a7a2 100644 --- a/sklearn_numba_dpex/common/_utils.py +++ b/sklearn_numba_dpex/common/_utils.py @@ -39,11 +39,14 @@ def _plus_closure(x, y): return _plus_closure -def _divide(): - def _divide_closure(x, y): - return x / y +def _divide_by(divisor): + def _divide_by_fn(): + def _divide_closure(x): + return x / divisor - return _divide_closure + return _divide_closure + + return _divide_by_fn def _check_max_work_group_size( diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index a99b33c..c7b35f9 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -47,80 +47,59 @@ # https://github.com/soda-inria/sklearn-numba-dpex/pull/82 when # fixed. @lru_cache -def make_elementwise_binary_op_1d_kernel(size, op, work_group_size, dtype): - """This kernel is mostly necessary to work around lack of support for this - operation in dpnp, see https://github.com/IntelPython/dpnp/issues/1238""" - op = dpex.func(op()) +def make_apply_elementwise_func(shape, func, work_group_size, dtype): + func_ = dpex.func(func()) + n_items = math.prod(shape) @dpex.kernel # fmt: off - def elementwise_ops( - data, # INOUT (size,) - operand_right # IN (1,) + def elementwise_ops_kernel( + data, # INOUT (n_items,) ): # fmt: on - item_idx = dpex.get_global_id(zero_idx) - if item_idx >= size: + if item_idx >= n_items: return - operand_left = data[item_idx] - data[item_idx] = op(operand_left, operand_right[0]) - - global_size = math.ceil(size / work_group_size) * work_group_size - return elementwise_ops[global_size, work_group_size] - - -@lru_cache -def make_initialize_to_zeros_2d_kernel(size0, size1, work_group_size, dtype): + item = data[item_idx] + data[item_idx] = func_(item) - n_items = size0 * size1 global_size = math.ceil(n_items / work_group_size) * work_group_size - zero = dtype(0.0) - - # Optimized for C-contiguous arrays - @dpex.kernel - def initialize_to_zeros(data): - item_idx = dpex.get_global_id(zero_idx) - if item_idx >= n_items: - return - - row_idx = item_idx // size1 - col_idx = item_idx % size1 - data[row_idx, col_idx] = zero + def elementwise_ops(data): + data = dpt.reshape(data, (-1,)) + elementwise_ops_kernel[global_size, work_group_size](data) - return initialize_to_zeros[global_size, work_group_size] + return elementwise_ops @lru_cache -def make_initialize_to_zeros_3d_kernel(size0, size1, size2, work_group_size, dtype): +def make_initialize_to_zeros_kernel(shape, work_group_size, dtype): - n_items = size0 * size1 * size2 - stride0 = size1 * size2 + n_items = math.prod(shape) global_size = math.ceil(n_items / work_group_size) * work_group_size zero = dtype(0.0) - # Optimized for C-contiguous arrays @dpex.kernel - def initialize_to_zeros(data): + def initialize_to_zeros_kernel(data): item_idx = dpex.get_global_id(zero_idx) if item_idx >= n_items: return - i = item_idx // stride0 - stride0_idx = item_idx % stride0 - j = stride0_idx // size2 - k = stride0_idx % size2 - data[i, j, k] = zero + data[item_idx] = zero + + def initialize_to_zeros(data): + data = dpt.reshape(data, (-1,)) + initialize_to_zeros_kernel[global_size, work_group_size](data) - return initialize_to_zeros[global_size, work_group_size] + return initialize_to_zeros @lru_cache -def make_broadcast_division_1d_2d_axis0_kernel(size0, size1, work_group_size): - global_size = math.ceil(size1 / work_group_size) * work_group_size +def make_broadcast_division_1d_2d_axis0_kernel(shape, work_group_size): + n_rows, n_cols = shape + global_size = math.ceil(n_cols / work_group_size) * work_group_size # NB: the left operand is modified inplace, the right operand is only read into. # Optimized for C-contiguous array and for @@ -129,12 +108,12 @@ def make_broadcast_division_1d_2d_axis0_kernel(size0, size1, work_group_size): def broadcast_division(dividend_array, divisor_vector): col_idx = dpex.get_global_id(zero_idx) - if col_idx >= size1: + if col_idx >= n_cols: return divisor = divisor_vector[col_idx] - for row_idx in range(size0): + for row_idx in range(n_rows): dividend_array[row_idx, col_idx] = ( dividend_array[row_idx, col_idx] / divisor ) @@ -143,15 +122,16 @@ def broadcast_division(dividend_array, divisor_vector): @lru_cache -def make_broadcast_ops_1d_2d_axis1_kernel(size0, size1, ops, work_group_size, dtype): +def make_broadcast_ops_1d_2d_axis1_kernel(shape, ops, work_group_size, dtype): """ ops must be a function that will be interpreted as a dpex.func and is subject to the same rules. It is expected to take two scalar arguments and return one scalar value. lambda functions are advised against since the cache will not work with lamda functions. sklearn_numba_dpex.common._utils expose some pre-defined `ops`. """ + n_rows, n_cols = shape - global_size = math.ceil(size1 / work_group_size) * work_group_size + global_size = math.ceil(n_cols / work_group_size) * work_group_size ops = dpex.func(ops()) # NB: the left operand is modified inplace, the right operand is only read into. @@ -161,10 +141,10 @@ def make_broadcast_ops_1d_2d_axis1_kernel(size0, size1, ops, work_group_size, dt def broadcast_ops(left_operand_array, right_operand_vector): col_idx = dpex.get_global_id(zero_idx) - if col_idx >= size1: + if col_idx >= n_cols: return - for row_idx in range(size0): + for row_idx in range(n_rows): left_operand_array[row_idx, col_idx] = ops( left_operand_array[row_idx, col_idx], right_operand_vector[row_idx] ) @@ -173,8 +153,9 @@ def broadcast_ops(left_operand_array, right_operand_vector): @lru_cache -def make_half_l2_norm_2d_axis0_kernel(size0, size1, work_group_size, dtype): - global_size = math.ceil(size1 / work_group_size) * work_group_size +def make_half_l2_norm_2d_axis0_kernel(shape, work_group_size, dtype): + n_rows, n_cols = shape + global_size = math.ceil(n_cols / work_group_size) * work_group_size zero = dtype(0.0) two = dtype(2.0) @@ -189,12 +170,12 @@ def half_l2_norm( # fmt: on col_idx = dpex.get_global_id(zero_idx) - if col_idx >= size1: + if col_idx >= n_cols: return l2_norm = zero - for row_idx in range(size0): + for row_idx in range(n_rows): item = data[row_idx, col_idx] l2_norm += item * item @@ -207,8 +188,7 @@ def half_l2_norm( # operators than sum. @lru_cache def make_sum_reduction_2d_kernel( - size0, - size1, + shape, device, dtype, work_group_size="max", @@ -300,46 +280,40 @@ def make_sum_reduction_2d_kernel( .. [3] https://shreeraman-ak.medium.com/parallel-reduction-with-cuda-d0ae10c1ae2c """ - if is_1d := not size1: + if is_1d := (len(shape) == 1): axis = 1 - size1 = size0 - size0 = 1 + shape1 = shape[0] + shape0 = 1 + else: + shape0, shape1 = shape - # NB: the shape of the work group is different in each of those two cases. Summing - # efficiently requires to adapt to very different IO patterns depending on the sum - # axis, which motivates the need for different kernels with different work group - # sizes for each cases. As a consequence, the shape of the intermediate results - # in the main driver loop, and the `global_size` (total number of work items fired - # per call) for each kernel call, are also different, and are variabilized with - # the lambda functions `get_result_shape` and `get_global_size`. if axis == 0: - work_group_size, kernels = _prepare_sum_reduction_2d_axis0( - size1, + work_group_shape, kernels, shape_update_fn = _prepare_sum_reduction_2d_axis0( + shape1, work_group_size, sub_group_size, fused_elementwise_func, dtype, device, ) - get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, size1) - get_global_size = ( - lambda result_sum_axis_size: result_sum_axis_size - * work_group_size - * math.ceil(size1 / sub_group_size) - ) else: # axis == 1 - work_group_size, kernels = _prepare_sum_reduction_2d_axis1( - size0, work_group_size, fused_elementwise_func, dtype, device - ) - get_result_shape = lambda result_sum_axis_size: (size0, result_sum_axis_size) - get_global_size = ( - lambda result_sum_axis_size: result_sum_axis_size * work_group_size * size0 + work_group_shape, kernels, shape_update_fn = _prepare_sum_reduction_2d_axis1( + shape0, work_group_size, fused_elementwise_func, dtype, device ) # XXX: The kernels seem to work fine with work_group_size==1 on GPU but fail on CPU. - if work_group_size == 1: + if math.prod(work_group_shape) == 1: raise NotImplementedError("work_group_size==1 is not supported.") + # NB: the shape of the work group is different in each of those two cases. Summing + # efficiently requires to adapt to very different IO patterns depending on the sum + # axis, which motivates the need for different kernels with different work group + # sizes for each cases. As a consequence, the shape of the intermediate results + # in the main driver loop, and the `global_size` (total number of work items fired + # per call) for each kernel call, are also different, and are variabilized with + # the functions `get_result_shape` and `get_global_size`. + get_result_shape, get_global_size = shape_update_fn + # `fused_elementwise_func` is applied elementwise during the first pass on # data, in the first kernel execution only, using `fused_func_kernel`. Subsequent # kernel calls only sum the data, using `nofunc_kernel`. @@ -353,7 +327,7 @@ def make_sum_reduction_2d_kernel( # single work item should iterates one time on the remaining values to finish the # reduction? kernel = fused_func_kernel - sum_axis_size = size0 if axis == 0 else size1 + sum_axis_size = shape0 if axis == 0 else shape1 next_input_size = sum_axis_size while next_input_size > 1: result_sum_axis_size = math.ceil(next_input_size / reduction_block_size) @@ -366,7 +340,7 @@ def make_sum_reduction_2d_kernel( result = dpt.empty(result_shape, dtype=dtype, device=device) global_size = get_global_size(result_sum_axis_size) - kernel = kernel[global_size, work_group_size] + kernel = kernel[global_size, work_group_shape] kernels_and_empty_tensors_pairs.append((kernel, result)) kernel = nofunc_kernel @@ -420,6 +394,7 @@ def fused_elementwise_func_(x): work_group_size = get_maximum_power_of_2_smaller_than(work_group_size) ( + work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis1_kernel( @@ -429,15 +404,25 @@ def fused_elementwise_func_(x): if fused_elementwise_func is None: partial_sum_reduction_nofunc = partial_sum_reduction else: - _, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis1_kernel( + *_, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis1_kernel( n_rows, work_group_size, fused_elementwise_func_, dtype ) - return work_group_size, ( - (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels - reduction_block_size, + get_result_shape = lambda result_sum_axis_size: (n_rows, result_sum_axis_size) + get_global_size = lambda result_sum_axis_size: ( + n_rows, + result_sum_axis_size * work_group_size, ) + return ( + work_group_shape, + ( + (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels + reduction_block_size, + ), + (get_result_shape, get_global_size), + ) # shape_update_fn + @lru_cache def _make_partial_sum_reduction_2d_axis1_kernel( @@ -463,6 +448,7 @@ def _make_partial_sum_reduction_2d_axis1_kernel( n_local_iterations = np.int64(math.log2(work_group_size) - 1) local_values_size = work_group_size reduction_block_size = 2 * work_group_size + work_group_shape = (1, work_group_size) @dpex.kernel # fmt: off @@ -473,39 +459,32 @@ def partial_sum_reduction( # fmt: on # Each work group processes a window of the `summands` input array with # shape `(1, reduction_block_size)` with `reduction_block_size = 2 * - # work_group_size`. The windows never overlap and create a grid that + # work_group_size`, and is responsible for summing all values in the window + # it spans. The windows never overlap and create a grid that # cover all the values of the `summands` array. - # `n_work_groups_per_row` windows are necessary to cover one row of the - # input array. - n_work_groups_per_row = result.shape[minus_one_idx] - - # The group of work items identified by `group_id` is responsible for - # summing all values in the window it spans. - group_id = dpex.get_group_id(zero_idx) - # The work groups are indexed in row-major order, from that let's deduce the - # row of `summands` to process by work items in `group_id`. - row_idx = group_id // n_work_groups_per_row + # row of `summands` to process by work items in `group_id`... + row_idx = dpex.get_group_id(zero_idx) # ... and the position of the window within this row, ranging from 0 # (first window in the row) to `n_work_groups_per_row - 1` (last window # in the row): - local_work_group_id_in_row = group_id % n_work_groups_per_row + local_work_group_id_in_row = dpex.get_group_id(one_idx) # Since all windows have size `reduction_block_size`, the position of the first # item in the window is given by: first_value_idx = local_work_group_id_in_row * reduction_block_size - # To sum up, this work group `group_id` will sum items with coordinates + # To sum up, this current work group will sum items with coordinates # (`row_idx`, `col_idx`), with `col_idx` ranging from `first_value_idx` # (first item in the window) to `first_value_idx + work_group_size - 1` (last - # item in the window) + # item in the window). # The current work item is indexed locally within the group of work items, with # index `local_work_id` that can range from `0` (first item in the work group) # to `work_group_size - 1` (last item in the work group) - local_work_id = dpex.get_local_id(zero_idx) + local_work_id = dpex.get_local_id(one_idx) # Let's remember the size of the array to ensure that the last window in the # row do not try to access items outside the buffer. @@ -583,7 +562,7 @@ def partial_sum_reduction( local_values[zero_idx] + local_values[one_idx] ) - return reduction_block_size, partial_sum_reduction + return work_group_shape, reduction_block_size, partial_sum_reduction def _prepare_sum_reduction_2d_axis0( @@ -610,7 +589,8 @@ def fused_elementwise_func_(x): f"sub_group_size={sub_group_size} and " f"work_group_size={work_group_size}" ) - check_power_of_2(work_group_size // sub_group_size) + n_sub_groups_per_work_group = work_group_size // sub_group_size + check_power_of_2(n_sub_groups_per_work_group) else: # Round work_group_size to the maximum smaller power-of-two multiple of @@ -621,6 +601,7 @@ def fused_elementwise_func_(x): work_group_size = n_sub_groups_per_work_group * sub_group_size ( + work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis0_kernel( @@ -630,15 +611,25 @@ def fused_elementwise_func_(x): if fused_elementwise_func is None: partial_sum_reduction_nofunc = partial_sum_reduction else: - _, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis0_kernel( + *_, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis0_kernel( n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype ) - return work_group_size, ( - (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels - reduction_block_size, + get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, n_cols) + get_global_size = lambda result_sum_axis_size: ( + result_sum_axis_size * n_sub_groups_per_work_group, + sub_group_size * math.ceil(n_cols / sub_group_size), ) + return ( + work_group_shape, + ( + (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels + reduction_block_size, + ), + (get_result_shape, get_global_size), + ) # shape_update_fn + @lru_cache def _make_partial_sum_reduction_2d_axis0_kernel( @@ -657,6 +648,7 @@ def _make_partial_sum_reduction_2d_axis0_kernel( local_values_size = (n_sub_groups_per_work_group, sub_group_size) reduction_block_size = 2 * n_sub_groups_per_work_group + work_group_shape = (n_sub_groups_per_work_group, sub_group_size) # ???: how does this strategy compares to having each thread reducing N contiguous # items ? @@ -672,28 +664,19 @@ def partial_sum_reduction( # The work groups are mapped to window of items in the input `summands` of size # `(reduction_block_size, sub_group_size)`, # where `reduction_block_size = 2 * n_sub_groups_per_work_group` such that the - # windows never overlap and create a grid that cover all the items. The work - # groups are indexed following a column-major order. - - # `n_blocks_per_col` windows are necessary to cover one column of the input - # array - n_blocks_per_col = result.shape[zero_idx] - - # The group of work items `group_id` is responsible for summing all items in - # the window it spans over axis 0, resulting in a output of shape - # `(1, sub_group_size)` - group_id = dpex.get_group_id(zero_idx) + # windows never overlap and create a grid that cover all the items. Each + # work group is responsible for summing all items in the window it spans over + # axis 0, resulting in a output of shape `(1, sub_group_size)`. - # The work groups are indexed in column-major order. From this let's deduce the + # The work groups are indexed in row-major order. From this let's deduce the # position of the window within the column... - local_block_id_in_col = group_id % n_blocks_per_col + local_block_id_in_col = dpex.get_group_id(one_idx) # Let's map the current work item to an index in a 2D grid, where the # `work_group_size` work items are mapped in row-major order to the array # of size `(n_sub_groups_per_work_group, sub_group_size)`. - local_work_id = dpex.get_local_id(zero_idx) # 1D idx - local_row_idx = local_work_id // sub_group_size # 2D idx, first coordinate - local_col_idx = local_work_id % sub_group_size # 2D idx, second coordinate + local_row_idx = dpex.get_local_id(zero_idx) # 2D idx, first coordinate + local_col_idx = dpex.get_local_id(one_idx) # 2D idx, second coordinate # This way, each row in the 2D index can be seen as mapped to two rows in the # corresponding window of items of the input `summands`, with the first row of @@ -734,7 +717,7 @@ def partial_sum_reduction( # position of the window in the grid of windows, and by the local position of # the work item in the 2D index): col_idx = ( - (group_id // n_blocks_per_col) * sub_group_size + local_col_idx + dpex.get_group_id(one_idx) * sub_group_size + local_col_idx ) # We must be careful to not read items outside of the array ! @@ -784,7 +767,7 @@ def partial_sum_reduction( local_values[one_idx, local_col_idx] ) - return reduction_block_size, partial_sum_reduction + return work_group_shape, reduction_block_size, partial_sum_reduction @lru_cache diff --git a/sklearn_numba_dpex/common/tests/test_kernels.py b/sklearn_numba_dpex/common/tests/test_kernels.py index fb88395..fa4d620 100644 --- a/sklearn_numba_dpex/common/tests/test_kernels.py +++ b/sklearn_numba_dpex/common/tests/test_kernels.py @@ -31,14 +31,11 @@ def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): if work_group_size == "max": sub_group_size = min(device.sub_group_sizes) - elif work_group_size == 1: - sub_group_size = 1 else: sub_group_size = work_group_size // 2 sum_reduction_2d_kernel = make_sum_reduction_2d_kernel( - size0=len(array_in), - size1=array_in.shape[1], + shape=(len(array_in), array_in.shape[1]), work_group_size=work_group_size, device=device, dtype=dtype, @@ -60,8 +57,7 @@ def test_sum_reduction_1d(length, expected_result, dtype, work_group_size): device = array_in.device.sycl_device sum_reduction_1d_kernel = make_sum_reduction_2d_kernel( - size0=len(array_in), - size1=None, + shape=(len(array_in),), work_group_size=work_group_size, device=device, dtype=dtype, @@ -101,8 +97,7 @@ def test_sum_reduction_raise_on_invalid_size_parameters( ): with expected_error: make_sum_reduction_2d_kernel( - size0=10, - size1=10, + shape=(10, 10), work_group_size=work_group_size, sub_group_size=sub_group_size, axis=axis, diff --git a/sklearn_numba_dpex/kmeans/drivers.py b/sklearn_numba_dpex/kmeans/drivers.py index a3c4927..72583f0 100644 --- a/sklearn_numba_dpex/kmeans/drivers.py +++ b/sklearn_numba_dpex/kmeans/drivers.py @@ -7,20 +7,19 @@ import numpy as np from sklearn_numba_dpex.common._utils import ( - _divide, + _divide_by, _get_global_mem_cache_size, _minus, _plus, _square, ) from sklearn_numba_dpex.common.kernels import ( + make_apply_elementwise_func, make_argmin_reduction_1d_kernel, make_broadcast_division_1d_2d_axis0_kernel, make_broadcast_ops_1d_2d_axis1_kernel, - make_elementwise_binary_op_1d_kernel, make_half_l2_norm_2d_axis0_kernel, - make_initialize_to_zeros_2d_kernel, - make_initialize_to_zeros_3d_kernel, + make_initialize_to_zeros_kernel, make_sum_reduction_2d_kernel, ) from sklearn_numba_dpex.common.random import ( @@ -96,17 +95,14 @@ def lloyd( n_samples, n_features, max_work_group_size, compute_dtype ) - reset_cluster_sizes_private_copies_kernel = make_initialize_to_zeros_2d_kernel( - size0=n_centroids_private_copies, - size1=n_clusters, + reset_cluster_sizes_private_copies_kernel = make_initialize_to_zeros_kernel( + shape=(n_centroids_private_copies, n_clusters), work_group_size=max_work_group_size, dtype=compute_dtype, ) - reset_centroids_private_copies_kernel = make_initialize_to_zeros_3d_kernel( - size0=n_centroids_private_copies, - size1=n_features, - size2=n_clusters, + reset_centroids_private_copies_kernel = make_initialize_to_zeros_kernel( + shape=(n_centroids_private_copies, n_features, n_clusters), work_group_size=max_work_group_size, dtype=compute_dtype, ) @@ -125,23 +121,20 @@ def lloyd( ) half_l2_norm_kernel = make_half_l2_norm_2d_axis0_kernel( - size0=n_features, - size1=n_clusters, + (n_features, n_clusters), work_group_size=max_work_group_size, dtype=compute_dtype, ) reduce_inertia_kernel = make_sum_reduction_2d_kernel( - size0=n_samples, - size1=None, # 1d reduction + shape=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, ) reduce_centroid_shifts_kernel = make_sum_reduction_2d_kernel( - size0=n_clusters, - size1=None, # 1d reduction + shape=(n_clusters,), work_group_size="max", device=device, dtype=compute_dtype, @@ -463,16 +456,18 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): max_work_group_size = device.max_work_group_size sum_axis1_kernel = make_sum_reduction_2d_kernel( - X_t.shape[0], - X_t.shape[1], + X_t.shape, axis=1, work_group_size="max", device=device, dtype=compute_dtype, ) - elementwise_binary_divide_kernel = make_elementwise_binary_op_1d_kernel( - n_features, _divide, max_work_group_size, compute_dtype + divide_by_n_samples_kernel = make_apply_elementwise_func( + (n_features,), + _divide_by(compute_dtype(n_samples)), + max_work_group_size, + compute_dtype, ) # At the time of writing this code, dpnp does not support functions (like `==` @@ -480,8 +475,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): # manner. # TODO: if dpnp support extends to relevant features, use it instead ? sum_of_squares_kernel = make_sum_reduction_2d_kernel( - size0=n_features, - size1=None, + shape=(n_features,), work_group_size="max", device=device, dtype=compute_dtype, @@ -490,11 +484,8 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): X_mean = sum_axis1_kernel(X_t)[:, 0] - divisor = dpt.full( - sh=(1,), fill_value=compute_dtype(n_samples), dtype=compute_dtype, device=device - ) # Change `X_mean` inplace - elementwise_binary_divide_kernel(X_mean, divisor) + divide_by_n_samples_kernel(X_mean) X_sum_squared = sum_of_squares_kernel(X_mean)[0] X_mean_is_zeroed = float(X_sum_squared) == 0.0 @@ -509,8 +500,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): # subtract the mean of x for more accurate distance computations X_t = dpt.asarray(X_t, copy=copy_x) broadcast_X_minus_X_mean = make_broadcast_ops_1d_2d_axis1_kernel( - n_features, - n_samples, + (n_features, n_samples), ops=_minus, work_group_size=max_work_group_size, dtype=compute_dtype, @@ -522,8 +512,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): if isinstance(init, dpt.usm_ndarray): n_clusters = init.shape[1] broadcast_init_minus_X_mean = make_broadcast_ops_1d_2d_axis1_kernel( - n_features, - n_clusters, + (n_features, n_clusters), ops=_minus, work_group_size=max_work_group_size, dtype=compute_dtype, @@ -534,8 +523,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): n_items = n_features * n_samples variance_kernel = make_sum_reduction_2d_kernel( - size0=n_items, - size1=None, + shape=(n_items,), work_group_size="max", device=device, dtype=compute_dtype, @@ -552,8 +540,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): # manner. # TODO: if dpnp support extends to relevant features, use it instead ? sum_sample_weight_kernel = make_sum_reduction_2d_kernel( - size0=n_samples, - size1=None, + size0=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, @@ -584,8 +571,7 @@ def restore_data_after_lloyd(X_t, best_centers_t, X_mean, copy_x): best_centers_t = dpt.asarray(best_centers_t, copy=False) broadcast_init_plus_X_mean = make_broadcast_ops_1d_2d_axis1_kernel( - n_features, - n_clusters, + (n_features, n_clusters), ops=_plus, work_group_size=max_work_group_size, dtype=compute_dtype, @@ -606,8 +592,7 @@ def restore_data_after_lloyd(X_t, best_centers_t, X_mean, copy_x): if not copy_x: X_t = dpt.asarray(X_t, copy=False) broadcast_X_plus_X_mean = make_broadcast_ops_1d_2d_axis1_kernel( - n_features, - n_samples, + (n_features, n_samples), ops=_plus, work_group_size=max_work_group_size, dtype=compute_dtype, @@ -673,8 +658,7 @@ def get_labels_inertia(X_t, centroids_t, sample_weight, with_inertia): ) half_l2_norm_kernel = make_half_l2_norm_2d_axis0_kernel( - size0=n_features, - size1=n_clusters, + (n_features, n_clusters), work_group_size=max_work_group_size, dtype=compute_dtype, ) @@ -704,8 +688,7 @@ def get_labels_inertia(X_t, centroids_t, sample_weight, with_inertia): ) reduce_inertia_kernel = make_sum_reduction_2d_kernel( - size0=n_samples, - size1=None, # 1d reduction + size0=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, @@ -827,16 +810,14 @@ def kmeans_plusplus( ) reduce_potential_1d_kernel = make_sum_reduction_2d_kernel( - size0=n_samples, - size1=None, + shape=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, ) reduce_potential_2d_kernel = make_sum_reduction_2d_kernel( - size0=n_local_trials, - size1=n_samples, + shape=(n_local_trials, n_samples), axis=1, work_group_size="max", device=device, From 9d554db8a0a6b28dba9cbf8c9d836c77971f9490 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Mon, 6 Feb 2023 18:18:08 +0100 Subject: [PATCH 02/13] wip: apply 2D grid of work items to kmeans kernels --- sklearn_numba_dpex/common/kernels.py | 12 +++- .../kernels/_base_kmeans_kernel_funcs.py | 21 +++--- .../kernels/compute_euclidean_distances.py | 35 +++++----- .../kmeans/kernels/compute_labels.py | 33 +++++++--- .../kmeans/kernels/kmeans_plusplus.py | 33 ++++++---- .../kmeans/kernels/lloyd_single_step.py | 41 ++++++------ sklearn_numba_dpex/kmeans/kernels/utils.py | 64 ++++++++++++------- 7 files changed, 147 insertions(+), 92 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index c7b35f9..4856efc 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -195,6 +195,7 @@ def make_sum_reduction_2d_kernel( axis=None, sub_group_size=None, fused_elementwise_func=None, + allocate_output_buffer=True, ): """Compute data_2d.sum(axis=axis) or data_1d.sum(). @@ -336,8 +337,11 @@ def make_sum_reduction_2d_kernel( # collected. Thus it can be more efficient to re-use a same instance of # `sum_reduction` (e.g within iterations of a loop) since it avoid # deallocation and reallocation every time. - result_shape = get_result_shape(result_sum_axis_size) - result = dpt.empty(result_shape, dtype=dtype, device=device) + if (result_sum_axis_size > 1) or allocate_output_buffer: + result_shape = get_result_shape(result_sum_axis_size) + result = dpt.empty(result_shape, dtype=dtype, device=device) + else: + result = None global_size = get_global_size(result_sum_axis_size) kernel = kernel[global_size, work_group_shape] @@ -347,7 +351,7 @@ def make_sum_reduction_2d_kernel( next_input_size = result_sum_axis_size - def sum_reduction(summands): + def sum_reduction(summands, out=None): if is_1d: # Makes the 1d case a special 2d case to reuse the same kernel. summands = dpt.reshape(summands, (1, -1)) @@ -359,6 +363,8 @@ def sum_reduction(summands): # TODO: manually dispatch the kernels with a SyclQueue for kernel, result in kernels_and_empty_tensors_pairs: + if result is None: + result = out kernel(summands, result) summands = result diff --git a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py index 90e8c88..fd9c54d 100644 --- a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py +++ b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py @@ -1,3 +1,4 @@ +import dpctl as dpt import numba_dpex as dpex @@ -63,7 +64,8 @@ def make_pairwise_ops_base_kernel_funcs( @dpex.func def initialize_window_of_centroids( - local_work_id, + local_row_idx, + local_col_idx, first_centroid_idx, centroids_half_l2_norm, is_last_centroid_window, @@ -73,7 +75,8 @@ def initialize_window_of_centroids( ): if is_last_centroid_window: initialize_last_window_of_centroids( - local_work_id, + local_row_idx, + local_col_idx, first_centroid_idx, centroids_half_l2_norm, # OUT @@ -82,7 +85,8 @@ def initialize_window_of_centroids( ) else: initialize_full_window_of_centroids( - local_work_id, + local_row_idx, + local_col_idx, first_centroid_idx, centroids_half_l2_norm, # OUT @@ -203,6 +207,7 @@ def __init__( def make_initialize_window_kernel_func(self, window_n_centroids): zero = self.dtype(0.0) + zero_as_a_long = dpt.int64(0) @dpex.func def _initialize_results(results): @@ -217,7 +222,8 @@ def _initialize_results(results): @dpex.func # fmt: off def _initialize_window_of_centroids( - local_work_id, # PARAM + local_row_idx, # PARAM + local_col_idx, # PARAM first_centroid_idx, # PARAM centroids_half_l2_norm, # IN window_of_centroids_half_l2_norms, # OUT @@ -229,10 +235,9 @@ def _initialize_window_of_centroids( # The first `window_n_centroids` work items cooperate on loading the # values of centroids_half_l2_norm relevant to current window. Each work # item loads one single value. - if local_work_id < window_n_centroids: - half_l2_norm_loading_idx = first_centroid_idx + local_work_id - window_of_centroids_half_l2_norms[local_work_id] = ( - centroids_half_l2_norm[half_l2_norm_loading_idx] + if local_row_idx > zero_as_a_long: + window_of_centroids_half_l2_norms[local_col_idx] = ( + centroids_half_l2_norm[first_centroid_idx + local_col_idx] ) return _initialize_window_of_centroids diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py index 54d0fc8..01850ba 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py @@ -18,24 +18,24 @@ def make_compute_euclidean_distances_fixed_window_kernel( ): window_n_centroids = sub_group_size - centroids_window_width = window_n_centroids + 1 input_work_group_size = work_group_size work_group_size = _check_max_work_group_size( - work_group_size, device, centroids_window_width * np.dtype(dtype).itemsize + work_group_size, device, window_n_centroids * np.dtype(dtype).itemsize ) centroids_window_height = work_group_size // sub_group_size - if work_group_size != input_work_group_size: - work_group_size = centroids_window_height * sub_group_size - - elif centroids_window_height * sub_group_size != work_group_size: + if (work_group_size == input_work_group_size) and ( + centroids_window_height * sub_group_size != work_group_size + ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " f"sub_group_size={sub_group_size} and work_group_size={work_group_size}" ) + work_group_shape = (centroids_window_height, window_n_centroids) + ( initialize_window_of_centroids, load_window_of_centroids_and_features, @@ -56,9 +56,8 @@ def make_compute_euclidean_distances_fixed_window_kernel( last_centroid_window_idx = n_windows_for_centroids - 1 last_feature_window_idx = n_windows_for_features - 1 - centroids_window_shape = (centroids_window_height, centroids_window_width) - zero_idx = np.int64(0) + one_idx = np.int64(0) @dpex.kernel # fmt: off @@ -69,17 +68,19 @@ def compute_distances( ): # fmt: on - sample_idx = dpex.get_global_id(zero_idx) - local_work_id = dpex.get_local_id(zero_idx) + sample_idx = ( + (dpex.get_global_id(zero_idx) * sub_group_size) + + dpex.get_global_id(one_idx) + ) - centroids_window = dpex.local.array(shape=centroids_window_shape, dtype=dtype) + centroids_window = dpex.local.array(shape=work_group_shape, dtype=dtype) sq_distances = dpex.private.array(shape=window_n_centroids, dtype=dtype) first_centroid_idx = zero_idx - window_loading_centroid_idx = local_work_id % window_n_centroids - window_loading_feature_offset = local_work_id // window_n_centroids + window_loading_centroid_idx = dpex.get_local_id(zero_idx) + window_loading_feature_offset = dpex.get_local_id(one_idx) for centroid_window_idx in range(n_windows_for_centroids): is_last_centroid_window = centroid_window_idx == last_centroid_window_idx @@ -127,5 +128,9 @@ def compute_distances( dpex.barrier(dpex.CLK_LOCAL_MEM_FENCE) - global_size = (math.ceil(n_samples / work_group_size)) * (work_group_size) - return compute_distances[global_size, work_group_size] + global_size = ( + math.ceil(math.ceil(n_samples / window_n_centroids) / centroids_window_height) + * centroids_window_height, + window_n_centroids, + ) + return compute_distances[global_size, work_group_shape] diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py index 0b71de1..bfe3085 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py @@ -27,7 +27,7 @@ def make_label_assignment_fixed_window_kernel( work_group_size, device, required_local_memory_per_item=dtype_itemsize, - required_memory_constant=sub_group_size * dtype_itemsize, + required_memory_constant=window_n_centroids * dtype_itemsize, ) centroids_window_width = window_n_centroids @@ -36,12 +36,16 @@ def make_label_assignment_fixed_window_kernel( if work_group_size != input_work_group_size: work_group_size = centroids_window_height * sub_group_size - elif centroids_window_height * sub_group_size != work_group_size: + if (work_group_size == input_work_group_size) and ( + centroids_window_height * sub_group_size != work_group_size + ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " f"sub_group_size={sub_group_size} and work_group_size={work_group_size}" ) + work_group_shape = (centroids_window_height, window_n_centroids) + ( initialize_window_of_centroids, load_window_of_centroids_and_features, @@ -70,6 +74,7 @@ def make_label_assignment_fixed_window_kernel( inf = dtype(math.inf) zero_idx = np.int64(0) + one_idx = np.int64(0) @dpex.kernel # fmt: off @@ -80,9 +85,12 @@ def assignment( assignments_idx, # OUT (n_samples,) ): # fmt: on - - sample_idx = dpex.get_global_id(zero_idx) - local_work_id = dpex.get_local_id(zero_idx) + sample_idx = ( + (dpex.get_global_id(zero_idx) * sub_group_size) + + dpex.get_global_id(one_idx) + ) + local_row_idx = dpex.get_local_id(zero_idx) + local_col_idx = dpex.get_local_id(one_idx) centroids_window = dpex.local.array(shape=centroids_window_shape, dtype=dtype) window_of_centroids_half_l2_norms = dpex.local.array( @@ -95,14 +103,15 @@ def assignment( min_idx = zero_idx min_sample_pseudo_inertia = inf - window_loading_centroid_idx = local_work_id % window_n_centroids - window_loading_feature_offset = local_work_id // window_n_centroids + window_loading_centroid_idx = local_row_idx + window_loading_feature_offset = local_col_idx for centroid_window_idx in range(n_windows_for_centroids): is_last_centroid_window = centroid_window_idx == last_centroid_window_idx initialize_window_of_centroids( - local_work_id, + local_row_idx, + local_col_idx, first_centroid_idx, centroids_half_l2_norm, is_last_centroid_window, @@ -161,5 +170,9 @@ def assignment( assignments_idx[sample_idx] = min_idx - global_size = (math.ceil(n_samples / work_group_size)) * (work_group_size) - return assignment[global_size, work_group_size] + global_size = ( + math.ceil(math.ceil(n_samples / window_n_centroids) / centroids_window_height) + * centroids_window_height, + window_n_centroids, + ) + return assignment[global_size, work_group_shape] diff --git a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py index 57ad825..bd097b8 100644 --- a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py +++ b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py @@ -115,18 +115,18 @@ def make_kmeansplusplus_single_step_fixed_window_kernel( work_group_size, device, required_local_memory_per_item=np.dtype(dtype).itemsize ) - candidates_window_width = window_n_candidates candidates_window_height = work_group_size // sub_group_size - if work_group_size != input_work_group_size: - work_group_size = candidates_window_height * sub_group_size - - elif candidates_window_height * sub_group_size != work_group_size: + if (work_group_size == input_work_group_size) and ( + candidates_window_height * sub_group_size != work_group_size + ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " f"sub_group_size={sub_group_size} and work_group_size={work_group_size}" ) + work_group_shape = (candidates_window_height, window_n_candidates) + ( initialize_window_of_candidates, load_window_of_candidates_and_features, @@ -147,9 +147,8 @@ def make_kmeansplusplus_single_step_fixed_window_kernel( last_candidate_window_idx = n_windows_for_candidates - 1 last_feature_window_idx = n_windows_for_features - 1 - candidates_window_shape = (candidates_window_height, candidates_window_width) - zero_idx = np.int64(0) + one_idx = np.int64(0) @dpex.kernel # fmt: off @@ -161,17 +160,19 @@ def kmeansplusplus_single_step( sq_distances_t, # OUT (n_candidates, n_samples) ): # fmt: on - sample_idx = dpex.get_global_id(zero_idx) - local_work_id = dpex.get_local_id(zero_idx) + sample_idx = ( + (dpex.get_global_id(zero_idx) * sub_group_size) + + dpex.get_global_id(one_idx) + ) - candidates_window = dpex.local.array(shape=candidates_window_shape, dtype=dtype) + candidates_window = dpex.local.array(shape=work_group_shape, dtype=dtype) sq_distances = dpex.private.array(shape=window_n_candidates, dtype=dtype) first_candidate_idx = zero_idx - window_loading_candidate_idx = local_work_id % window_n_candidates - window_loading_feature_offset = local_work_id // window_n_candidates + window_loading_candidate_idx = dpex.get_local_id(zero_idx) + window_loading_feature_offset = dpex.get_local_id(one_idx) for candidate_window_idx in range(n_windows_for_candidates): is_last_candidate_window = candidate_window_idx == last_candidate_window_idx @@ -230,5 +231,9 @@ def kmeansplusplus_single_step( dpex.barrier(dpex.CLK_LOCAL_MEM_FENCE) - global_size = (math.ceil(n_samples / work_group_size)) * (work_group_size) - return kmeansplusplus_single_step[global_size, work_group_size] + global_size = ( + math.ceil(math.ceil(n_samples / window_n_candidates) / candidates_window_height) + * candidates_window_height, + window_n_candidates, + ) + return kmeansplusplus_single_step[global_size, work_group_shape] diff --git a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py index b8c4401..98adb04 100644 --- a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py +++ b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py @@ -70,18 +70,18 @@ def make_lloyd_single_step_fixed_window_kernel( required_memory_constant=sub_group_size * dtype_itemsize, ) - centroids_window_width = window_n_centroids centroids_window_height = work_group_size // sub_group_size - if work_group_size != input_work_group_size: - work_group_size = centroids_window_height * sub_group_size - - elif centroids_window_height * sub_group_size != work_group_size: + if (work_group_size == input_work_group_size) and ( + centroids_window_height * sub_group_size != work_group_size + ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " f"sub_group_size={sub_group_size} and work_group_size={work_group_size}" ) + work_group_shape = (centroids_window_height, window_n_centroids) + ( initialize_window_of_centroids, load_window_of_centroids_and_features, @@ -106,10 +106,11 @@ def make_lloyd_single_step_fixed_window_kernel( last_centroid_window_idx = n_windows_for_centroids - 1 last_feature_window_idx = n_windows_for_features - 1 - centroids_window_shape = (centroids_window_height, centroids_window_width) - - global_size = (math.ceil(n_samples / work_group_size)) * (work_group_size) - n_subgroups = global_size // sub_group_size + n_subgroups = math.ceil(n_samples / window_n_centroids) + global_size = ( + math.ceil(n_subgroups / centroids_window_height) * centroids_window_height, + window_n_centroids, + ) n_cluster_items = n_clusters * (n_features + 1) n_cluster_bytes = np.dtype(dtype).itemsize * n_cluster_items @@ -122,6 +123,7 @@ def make_lloyd_single_step_fixed_window_kernel( n_centroids_private_copies = int(min(n_subgroups, n_centroids_private_copies)) zero_idx = np.int64(0) + one_idx = np.int64(0) inf = dtype(math.inf) # TODO: currently, constant memory is not supported by numba_dpex, but for read-only @@ -172,15 +174,17 @@ def fused_lloyd_single_step( centroids_half_l2_norm to reduce the overall number of floating point operations in the kernel. """ - sample_idx = dpex.get_global_id(zero_idx) - local_work_id = dpex.get_local_id(zero_idx) + sub_group_idx = dpex.get_global_id(zero_idx) + sample_idx = (sub_group_idx * sub_group_size) + dpex.get_global_id(one_idx) + local_row_idx = dpex.get_local_id(zero_idx) + local_col_idx = dpex.get_local_id(one_idx) # This array in shared memory is used as a sliding array over values of # current_centroids_t. During each iteration in the inner loop, a new one is # loaded and used by all work items in the work group to compute partial # results. The array slides over the features in the outer loop, and over the # samples in the inner loop. - centroids_window = dpex.local.array(shape=centroids_window_shape, dtype=dtype) + centroids_window = dpex.local.array(shape=work_group_shape, dtype=dtype) # This array in shared memory is used as a sliding array over the centroids. # It contains values of centroids_half_l2_norm for each centroid in the sliding @@ -204,8 +208,8 @@ def fused_lloyd_single_step( # Those variables are used in the inner loop during loading of the window of # centroids - window_loading_centroid_idx = local_work_id % window_n_centroids - window_loading_feature_offset = local_work_id // window_n_centroids + window_loading_centroid_idx = local_row_idx + window_loading_feature_offset = local_col_idx # STEP 1: compute the closest centroid # Outer loop: iterate on successive windows of size window_n_centroids that @@ -222,7 +226,8 @@ def fused_lloyd_single_step( # are modified in place. is_last_centroid_window = centroid_window_idx == last_centroid_window_idx initialize_window_of_centroids( - local_work_id, + local_row_idx, + local_col_idx, first_centroid_idx, centroids_half_l2_norm, is_last_centroid_window, @@ -334,9 +339,7 @@ def fused_lloyd_single_step( # counts. # each work item is assigned an array of centroids in a round robin manner - privatization_idx = ( - sample_idx // sub_group_size - ) % n_centroids_private_copies + privatization_idx = sub_group_idx % n_centroids_private_copies weight = sample_weight[sample_idx] @@ -354,5 +357,5 @@ def fused_lloyd_single_step( ) return ( n_centroids_private_copies, - fused_lloyd_single_step[global_size, work_group_size], + fused_lloyd_single_step[global_size, work_group_shape], ) diff --git a/sklearn_numba_dpex/kmeans/kernels/utils.py b/sklearn_numba_dpex/kmeans/kernels/utils.py index 9d04fd1..3168668 100644 --- a/sklearn_numba_dpex/kmeans/kernels/utils.py +++ b/sklearn_numba_dpex/kmeans/kernels/utils.py @@ -199,10 +199,10 @@ def make_reduce_centroid_data_kernel( work_group_size, dtype, ): + n_centroid_items = n_features * n_clusters + n_sums = n_centroid_items + n_clusters # n additional sums for cluster_sizes + global_size = math.ceil(n_sums / work_group_size) * work_group_size - n_work_groups_for_clusters = math.ceil(n_clusters / work_group_size) - n_work_items_for_clusters = n_work_groups_for_clusters * work_group_size - global_size = n_work_items_for_clusters * n_features zero = dtype(0.0) one_incr = np.int32(1) @@ -210,34 +210,26 @@ def make_reduce_centroid_data_kernel( # n_features * n_clusters >> preferred_work_group_size_multiple @dpex.kernel # fmt: off - def reduce_centroid_data( + def _reduce_centroid_data_kernel( cluster_sizes_private_copies, # IN (n_copies, n_clusters) - centroids_t_private_copies, # IN (n_copies, n_features, n_clusters) + centroids_t_private_copies, # IN (n_copies, n_features * n_clusters) (reshaped) # noqa cluster_sizes, # OUT (n_clusters,) - centroids_t, # OUT (n_features, n_clusters) + centroids_t, # OUT (n_features * n_clusters,) (flattened) empty_clusters_list, # OUT (n_clusters,) n_empty_clusters, # OUT (1,) ): # fmt: on - - group_idx = dpex.get_group_id(zero_idx) - item_idx = dpex.get_local_id(zero_idx) - feature_idx = group_idx // n_work_groups_for_clusters - cluster_idx = ( - (group_idx % n_work_groups_for_clusters) * work_group_size - ) + item_idx - if cluster_idx >= n_clusters: - return + item_idx = dpex.get_global_id(zero_idx) + sum_ = zero # reduce the centroid values - sum_ = zero - for copy_idx in range(n_centroids_private_copies): - sum_ += centroids_t_private_copies[copy_idx, feature_idx, cluster_idx] - centroids_t[feature_idx, cluster_idx] = sum_ + if item_idx < n_centroid_items: + for copy_idx in range(n_centroids_private_copies): + sum_ += centroids_t_private_copies[copy_idx, item_idx] + centroids_t[item_idx] = sum_ - # reduce the cluster sizes - if feature_idx == zero_idx: - sum_ = zero + elif item_idx < n_sums: + cluster_idx = item_idx - n_sums for copy_idx in range(n_centroids_private_copies): sum_ += cluster_sizes_private_copies[copy_idx, cluster_idx] cluster_sizes[cluster_idx] = sum_ @@ -249,7 +241,33 @@ def reduce_centroid_data( ) empty_clusters_list[current_n_empty_clusters] = cluster_idx - return reduce_centroid_data[global_size, work_group_size] + reduce_centroid_data_kernel = _reduce_centroid_data_kernel[ + global_size, work_group_size + ] + + def reduce_centroid_data( + cluster_sizes_private_copies, + centroids_t_private_copies, + cluster_sizes, + centroids_t, + empty_clusters_list, + n_empty_clusters, + ): + centroids_t_private_copies = dpt.reshape( + centroids_t_private_copies, + (n_centroids_private_copies, n_features * n_clusters), + ) + centroids_t = dpt.asarray(dpt.reshape(centroids_t, (-1,))) + reduce_centroid_data_kernel( + cluster_sizes_private_copies, + centroids_t_private_copies, + cluster_sizes, + centroids_t, + empty_clusters_list, + n_empty_clusters, + ) + + return reduce_centroid_data @lru_cache From 7d2e719396abf20a43fbc7b0a7ba9ba2d1d173da Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Mon, 6 Feb 2023 18:25:35 +0100 Subject: [PATCH 03/13] wip: apply 2D grid of work items to kmeans kernels --- sklearn_numba_dpex/common/kernels.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index 4856efc..c7b35f9 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -195,7 +195,6 @@ def make_sum_reduction_2d_kernel( axis=None, sub_group_size=None, fused_elementwise_func=None, - allocate_output_buffer=True, ): """Compute data_2d.sum(axis=axis) or data_1d.sum(). @@ -337,11 +336,8 @@ def make_sum_reduction_2d_kernel( # collected. Thus it can be more efficient to re-use a same instance of # `sum_reduction` (e.g within iterations of a loop) since it avoid # deallocation and reallocation every time. - if (result_sum_axis_size > 1) or allocate_output_buffer: - result_shape = get_result_shape(result_sum_axis_size) - result = dpt.empty(result_shape, dtype=dtype, device=device) - else: - result = None + result_shape = get_result_shape(result_sum_axis_size) + result = dpt.empty(result_shape, dtype=dtype, device=device) global_size = get_global_size(result_sum_axis_size) kernel = kernel[global_size, work_group_shape] @@ -351,7 +347,7 @@ def make_sum_reduction_2d_kernel( next_input_size = result_sum_axis_size - def sum_reduction(summands, out=None): + def sum_reduction(summands): if is_1d: # Makes the 1d case a special 2d case to reuse the same kernel. summands = dpt.reshape(summands, (1, -1)) @@ -363,8 +359,6 @@ def sum_reduction(summands, out=None): # TODO: manually dispatch the kernels with a SyclQueue for kernel, result in kernels_and_empty_tensors_pairs: - if result is None: - result = out kernel(summands, result) summands = result From 7b1da9ebd7ae0d1fa27effe5e55eb4f8766b5fd6 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:25:22 +0100 Subject: [PATCH 04/13] Small enhancements accross repo --- sklearn_numba_dpex/common/kernels.py | 24 ++++--------- sklearn_numba_dpex/common/random.py | 3 +- sklearn_numba_dpex/kmeans/drivers.py | 13 ++++--- .../kernels/_base_kmeans_kernel_funcs.py | 6 ++-- .../kernels/compute_euclidean_distances.py | 20 ++++++----- .../kmeans/kernels/compute_labels.py | 14 ++++---- .../kmeans/kernels/kmeans_plusplus.py | 19 ++++++----- .../kmeans/kernels/lloyd_single_step.py | 34 +++++++++++++++---- sklearn_numba_dpex/kmeans/kernels/utils.py | 5 ++- 9 files changed, 80 insertions(+), 58 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index c7b35f9..a1628a0 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -1,15 +1,6 @@ # TODO: many auxilliary kernels in the package might be better optimized and we could # benchmark alternative implementations for each of them, that could include -# - using 2D or 3D grid of work groups and work items where applicable (e.g. in -# some of the kernels that take 2D or 3D data as input) rather than using 1D grid. When -# doing so, one should be especially careful about how the segments of adjacent work -# items of size preferred_work_group_size_multiple are dispatched especially regarding -# RW operations in memory. A wrong dispatch strategy could slash memory bandwith and -# reduce performance. Using 2D or 3D grid correctly might on the other hand improve -# performance since it saves costly indexing operations (like //) -# - investigate if flat 1D-like indexing also works for ND kernels, thus saving the -# need to compute coordinates for each dimension for element-wise operations. -# - or using numba + dpnp to directly leverage kernels that are shipped in dpnp to +# using numba + dpnp to directly leverage kernels that are shipped in dpnp to # replace numpy methods. # However, in light of our main goal that is bringing a GPU KMeans to scikit-learn, the # importance of those TODOs is currently seen as secondary, since the execution time of @@ -186,7 +177,6 @@ def half_l2_norm( # TODO: this kernel could be abstracted away to support other commutative binary # operators than sum. -@lru_cache def make_sum_reduction_2d_kernel( shape, device, @@ -370,6 +360,7 @@ def sum_reduction(summands): return sum_reduction +@lru_cache def _prepare_sum_reduction_2d_axis1( n_rows, work_group_size, fused_elementwise_func, dtype, device ): @@ -424,7 +415,6 @@ def fused_elementwise_func_(x): ) # shape_update_fn -@lru_cache def _make_partial_sum_reduction_2d_axis1_kernel( n_rows, work_group_size, fused_elementwise_func, dtype ): @@ -446,7 +436,6 @@ def _make_partial_sum_reduction_2d_axis1_kernel( # Number of iteration in each execution of the kernel: n_local_iterations = np.int64(math.log2(work_group_size) - 1) - local_values_size = work_group_size reduction_block_size = 2 * work_group_size work_group_shape = (1, work_group_size) @@ -513,7 +502,7 @@ def partial_sum_reduction( augend_idx = first_value_idx + local_work_id addend_idx = first_value_idx + work_group_size + local_work_id - local_values = dpex.local.array(local_values_size, dtype=dtype) + local_values = dpex.local.array(work_group_size, dtype=dtype) # We must be careful to not read items outside of the array ! if augend_idx >= sum_axis_size: @@ -565,6 +554,7 @@ def partial_sum_reduction( return work_group_shape, reduction_block_size, partial_sum_reduction +@lru_cache def _prepare_sum_reduction_2d_axis0( n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype, device ): @@ -631,7 +621,6 @@ def fused_elementwise_func_(x): ) # shape_update_fn -@lru_cache def _make_partial_sum_reduction_2d_axis0_kernel( n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype ): @@ -646,7 +635,6 @@ def _make_partial_sum_reduction_2d_axis0_kernel( # Number of iteration in each execution of the kernel: n_local_iterations = np.int64(math.log2(n_sub_groups_per_work_group) - 1) - local_values_size = (n_sub_groups_per_work_group, sub_group_size) reduction_block_size = 2 * n_sub_groups_per_work_group work_group_shape = (n_sub_groups_per_work_group, sub_group_size) @@ -670,7 +658,7 @@ def partial_sum_reduction( # The work groups are indexed in row-major order. From this let's deduce the # position of the window within the column... - local_block_id_in_col = dpex.get_group_id(one_idx) + local_block_id_in_col = dpex.get_group_id(zero_idx) # Let's map the current work item to an index in a 2D grid, where the # `work_group_size` work items are mapped in row-major order to the array @@ -711,7 +699,7 @@ def partial_sum_reduction( # local memory of size (n_sub_groups_per_work_group, sub_group_size) (i.e one # slot for each work item and two items in the window), and yet again # contiguous work items write into contiguous slots. - local_values = dpex.local.array(local_values_size, dtype=dtype) + local_values = dpex.local.array(work_group_shape, dtype=dtype) # The current work item use the following second coordinate (given by the # position of the window in the grid of windows, and by the local position of diff --git a/sklearn_numba_dpex/common/random.py b/sklearn_numba_dpex/common/random.py index ba4335a..2b16bb1 100644 --- a/sklearn_numba_dpex/common/random.py +++ b/sklearn_numba_dpex/common/random.py @@ -191,8 +191,7 @@ def create_xoroshiro128pp_states(n_states, subsequence_start=0, seed=None, devic if from_cpu_to_device: return states.to_device(device) - else: - return states + return states @lru_cache diff --git a/sklearn_numba_dpex/kmeans/drivers.py b/sklearn_numba_dpex/kmeans/drivers.py index 72583f0..d366a98 100644 --- a/sklearn_numba_dpex/kmeans/drivers.py +++ b/sklearn_numba_dpex/kmeans/drivers.py @@ -60,6 +60,12 @@ def lloyd( max_work_group_size = device.max_work_group_size sub_group_size = min(device.sub_group_sizes) global_mem_cache_size = _get_global_mem_cache_size(device) + + # The following parameter controls the fraction of the device global cache memory + # that can be relied upon to reduce the probability of collision of atomic + # ops during execution (see `lloyd_single_step` kernel for more information). The + # following value has been chosen empirically and it might be beneficial to test it + # further on different hardware. centroids_private_copies_max_cache_occupancy = 0.7 verbose = bool(verbose) @@ -108,8 +114,7 @@ def lloyd( ) broadcast_division_kernel = make_broadcast_division_1d_2d_axis0_kernel( - size0=n_features, - size1=n_clusters, + shape=(n_features, n_clusters), work_group_size=max_work_group_size, ) @@ -540,7 +545,7 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): # manner. # TODO: if dpnp support extends to relevant features, use it instead ? sum_sample_weight_kernel = make_sum_reduction_2d_kernel( - size0=(n_samples,), + shape=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, @@ -688,7 +693,7 @@ def get_labels_inertia(X_t, centroids_t, sample_weight, with_inertia): ) reduce_inertia_kernel = make_sum_reduction_2d_kernel( - size0=(n_samples,), + shape=(n_samples,), work_group_size="max", device=device, dtype=compute_dtype, diff --git a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py index fd9c54d..c70becd 100644 --- a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py +++ b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py @@ -1,5 +1,5 @@ -import dpctl as dpt import numba_dpex as dpex +import numpy as np def make_pairwise_ops_base_kernel_funcs( @@ -207,7 +207,7 @@ def __init__( def make_initialize_window_kernel_func(self, window_n_centroids): zero = self.dtype(0.0) - zero_as_a_long = dpt.int64(0) + zero_as_a_long = np.int64(0) @dpex.func def _initialize_results(results): @@ -235,7 +235,7 @@ def _initialize_window_of_centroids( # The first `window_n_centroids` work items cooperate on loading the # values of centroids_half_l2_norm relevant to current window. Each work # item loads one single value. - if local_row_idx > zero_as_a_long: + if local_row_idx == zero_as_a_long: window_of_centroids_half_l2_norms[local_col_idx] = ( centroids_half_l2_norm[first_centroid_idx + local_col_idx] ) diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py index 01850ba..1219e89 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py @@ -27,7 +27,7 @@ def make_compute_euclidean_distances_fixed_window_kernel( centroids_window_height = work_group_size // sub_group_size if (work_group_size == input_work_group_size) and ( - centroids_window_height * sub_group_size != work_group_size + (centroids_window_height * sub_group_size) != work_group_size ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " @@ -57,7 +57,7 @@ def make_compute_euclidean_distances_fixed_window_kernel( last_feature_window_idx = n_windows_for_features - 1 zero_idx = np.int64(0) - one_idx = np.int64(0) + one_idx = np.int64(1) @dpex.kernel # fmt: off @@ -68,19 +68,21 @@ def compute_distances( ): # fmt: on - sample_idx = ( - (dpex.get_global_id(zero_idx) * sub_group_size) - + dpex.get_global_id(one_idx) - ) - centroids_window = dpex.local.array(shape=work_group_shape, dtype=dtype) sq_distances = dpex.private.array(shape=window_n_centroids, dtype=dtype) first_centroid_idx = zero_idx - window_loading_centroid_idx = dpex.get_local_id(zero_idx) - window_loading_feature_offset = dpex.get_local_id(one_idx) + local_col_idx = dpex.get_local_id(one_idx) + + window_loading_feature_offset = dpex.get_local_id(zero_idx) + window_loading_centroid_idx = local_col_idx + + sample_idx = ( + (dpex.get_global_id(zero_idx) * sub_group_size) + + local_col_idx + ) for centroid_window_idx in range(n_windows_for_centroids): is_last_centroid_window = centroid_window_idx == last_centroid_window_idx diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py index bfe3085..0ff3ad9 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py @@ -37,7 +37,7 @@ def make_label_assignment_fixed_window_kernel( work_group_size = centroids_window_height * sub_group_size if (work_group_size == input_work_group_size) and ( - centroids_window_height * sub_group_size != work_group_size + (centroids_window_height * sub_group_size) != work_group_size ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " @@ -74,7 +74,7 @@ def make_label_assignment_fixed_window_kernel( inf = dtype(math.inf) zero_idx = np.int64(0) - one_idx = np.int64(0) + one_idx = np.int64(1) @dpex.kernel # fmt: off @@ -85,12 +85,12 @@ def assignment( assignments_idx, # OUT (n_samples,) ): # fmt: on + local_row_idx = dpex.get_local_id(zero_idx) + local_col_idx = dpex.get_local_id(one_idx) sample_idx = ( (dpex.get_global_id(zero_idx) * sub_group_size) - + dpex.get_global_id(one_idx) + + local_col_idx ) - local_row_idx = dpex.get_local_id(zero_idx) - local_col_idx = dpex.get_local_id(one_idx) centroids_window = dpex.local.array(shape=centroids_window_shape, dtype=dtype) window_of_centroids_half_l2_norms = dpex.local.array( @@ -103,8 +103,8 @@ def assignment( min_idx = zero_idx min_sample_pseudo_inertia = inf - window_loading_centroid_idx = local_row_idx - window_loading_feature_offset = local_col_idx + window_loading_feature_offset = local_row_idx + window_loading_centroid_idx = local_col_idx for centroid_window_idx in range(n_windows_for_centroids): is_last_centroid_window = centroid_window_idx == last_centroid_window_idx diff --git a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py index bd097b8..bbc29f1 100644 --- a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py +++ b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py @@ -118,7 +118,7 @@ def make_kmeansplusplus_single_step_fixed_window_kernel( candidates_window_height = work_group_size // sub_group_size if (work_group_size == input_work_group_size) and ( - candidates_window_height * sub_group_size != work_group_size + (candidates_window_height * sub_group_size) != work_group_size ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " @@ -148,7 +148,7 @@ def make_kmeansplusplus_single_step_fixed_window_kernel( last_feature_window_idx = n_windows_for_features - 1 zero_idx = np.int64(0) - one_idx = np.int64(0) + one_idx = np.int64(1) @dpex.kernel # fmt: off @@ -160,10 +160,6 @@ def kmeansplusplus_single_step( sq_distances_t, # OUT (n_candidates, n_samples) ): # fmt: on - sample_idx = ( - (dpex.get_global_id(zero_idx) * sub_group_size) - + dpex.get_global_id(one_idx) - ) candidates_window = dpex.local.array(shape=work_group_shape, dtype=dtype) @@ -171,8 +167,15 @@ def kmeansplusplus_single_step( first_candidate_idx = zero_idx - window_loading_candidate_idx = dpex.get_local_id(zero_idx) - window_loading_feature_offset = dpex.get_local_id(one_idx) + local_col_idx = dpex.get_local_id(one_idx) + + window_loading_feature_offset = dpex.get_local_id(zero_idx) + window_loading_candidate_idx = local_col_idx + + sample_idx = ( + (dpex.get_global_id(zero_idx) * sub_group_size) + + local_col_idx + ) for candidate_window_idx in range(n_windows_for_candidates): is_last_candidate_window = candidate_window_idx == last_candidate_window_idx diff --git a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py index 98adb04..02b0d57 100644 --- a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py +++ b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py @@ -73,7 +73,7 @@ def make_lloyd_single_step_fixed_window_kernel( centroids_window_height = work_group_size // sub_group_size if (work_group_size == input_work_group_size) and ( - centroids_window_height * sub_group_size != work_group_size + (centroids_window_height * sub_group_size) != work_group_size ): raise ValueError( "Expected work_group_size to be a multiple of sub_group_size but got " @@ -117,13 +117,35 @@ def make_lloyd_single_step_fixed_window_kernel( # TODO: control that this value is not higher than the number of sub-groups of size # sub_group_size that can effectively run concurrently. We should fetch this # information and apply it here. + + # NB: for more details about the privatization strategy the following variables + # refer too, please read the inline commenting that address it with more details in + # the kernel definition. + + # Ensure that the memory allocated for privatization will not saturate the global + # memory cache size. For this purpose we limit the number of private copies to + # a fraction of the available global memory cache size. n_centroids_private_copies = ( global_mem_cache_size * centroids_private_copies_max_cache_occupancy ) // n_cluster_bytes - n_centroids_private_copies = int(min(n_subgroups, n_centroids_private_copies)) + + # Each set of `sub_group_size` consecutive work items is assigned one private + # copy, and several such set can be assigned to the same private copy. Thus, at + # most `n_subgroups` private copies are needed. + # Moreover, collisions can only occur between sub groups that execute concurrently. + # Thus, at most `nb_concurrent_sub_groups` private copies are needed. + # TODO: `nb_concurrent_sub_groups` is considered equal to + # `device.max_compute_units`. We're not sure that this is the correct + # read of the device specs. Confirm or fix once it's made clearer. Suggested reads + # that highlight complexity of the execution model: + # - https://github.com/IntelPython/dpctl/issues/1033 + # - https://stackoverflow.com/a/6490897 + n_centroids_private_copies = int( + min(n_subgroups, n_centroids_private_copies, device.max_compute_units) + ) zero_idx = np.int64(0) - one_idx = np.int64(0) + one_idx = np.int64(1) inf = dtype(math.inf) # TODO: currently, constant memory is not supported by numba_dpex, but for read-only @@ -175,9 +197,9 @@ def fused_lloyd_single_step( operations in the kernel. """ sub_group_idx = dpex.get_global_id(zero_idx) - sample_idx = (sub_group_idx * sub_group_size) + dpex.get_global_id(one_idx) local_row_idx = dpex.get_local_id(zero_idx) local_col_idx = dpex.get_local_id(one_idx) + sample_idx = (sub_group_idx * sub_group_size) + local_col_idx # This array in shared memory is used as a sliding array over values of # current_centroids_t. During each iteration in the inner loop, a new one is @@ -208,8 +230,8 @@ def fused_lloyd_single_step( # Those variables are used in the inner loop during loading of the window of # centroids - window_loading_centroid_idx = local_row_idx - window_loading_feature_offset = local_col_idx + window_loading_feature_offset = local_row_idx + window_loading_centroid_idx = local_col_idx # STEP 1: compute the closest centroid # Outer loop: iterate on successive windows of size window_n_centroids that diff --git a/sklearn_numba_dpex/kmeans/kernels/utils.py b/sklearn_numba_dpex/kmeans/kernels/utils.py index 3168668..e92b004 100644 --- a/sklearn_numba_dpex/kmeans/kernels/utils.py +++ b/sklearn_numba_dpex/kmeans/kernels/utils.py @@ -220,6 +220,9 @@ def _reduce_centroid_data_kernel( ): # fmt: on item_idx = dpex.get_global_id(zero_idx) + if item_idx >= n_sums: + return + sum_ = zero # reduce the centroid values @@ -229,7 +232,7 @@ def _reduce_centroid_data_kernel( centroids_t[item_idx] = sum_ elif item_idx < n_sums: - cluster_idx = item_idx - n_sums + cluster_idx = item_idx - n_centroid_items for copy_idx in range(n_centroids_private_copies): sum_ += cluster_sizes_private_copies[copy_idx, cluster_idx] cluster_sizes[cluster_idx] = sum_ From 05fa4a9de70f8fe35c9fcd7abf23076a6dbfbdad Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:55:55 +0100 Subject: [PATCH 05/13] empty commit for github refresh ? From 49dad1ea3783fd2ad599cd6d3a03942fdb448112 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Tue, 7 Feb 2023 15:06:37 +0100 Subject: [PATCH 06/13] bump isort to version that fixes install issue at https://github.com/PyCQA/isort/issues/2077 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ae9795e..e7cb48c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: files: sklearn_numba_dpex/ additional_dependencies: [pytest==6.2.4] - repo: https://github.com/PyCQA/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort files: sklearn_numba_dpex/ From 28513cc2983f66ea3be9ba0c92aded70d08b0140 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Wed, 8 Feb 2023 13:58:47 +0100 Subject: [PATCH 07/13] Fix (hack) issue with some work group sizes on CPU --- sklearn_numba_dpex/common/kernels.py | 27 ++++++++++++++----- .../common/tests/test_kernels.py | 8 +++--- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index a1628a0..72a8d13 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -292,8 +292,8 @@ def make_sum_reduction_2d_kernel( ) # XXX: The kernels seem to work fine with work_group_size==1 on GPU but fail on CPU. - if math.prod(work_group_shape) == 1: - raise NotImplementedError("work_group_size==1 is not supported.") + # if math.prod(work_group_shape) == 1: + # raise NotImplementedError("work_group_size==1 is not supported.") # NB: the shape of the work group is different in each of those two cases. Summing # efficiently requires to adapt to very different IO patterns depending on the sum @@ -590,19 +590,26 @@ def fused_elementwise_func_(x): ) work_group_size = n_sub_groups_per_work_group * sub_group_size + _is_cpu = device.has_aspect_cpu + ( work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype + n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype, _is_cpu ) if fused_elementwise_func is None: partial_sum_reduction_nofunc = partial_sum_reduction else: *_, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype + n_cols, + work_group_size, + sub_group_size, + fused_elementwise_func_, + dtype, + _is_cpu, ) get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, n_cols) @@ -622,7 +629,7 @@ def fused_elementwise_func_(x): def _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype + n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype, _is_cpu ): """When axis=0, each work group performs a local reduction on axis 0 in a window of size `(sub_group_size_,work_group_size // sub_group_size)`.""" @@ -705,7 +712,7 @@ def partial_sum_reduction( # position of the window in the grid of windows, and by the local position of # the work item in the 2D index): col_idx = ( - dpex.get_group_id(one_idx) * sub_group_size + local_col_idx + (dpex.get_group_id(one_idx) * sub_group_size) + local_col_idx ) # We must be careful to not read items outside of the array ! @@ -731,13 +738,19 @@ def partial_sum_reduction( # a similar memory access pattern than seen at the previous step. n_active_sub_groups = n_sub_groups_per_work_group for i in range(n_local_iterations): + # At each iteration, half of the remaining work items with the highest id # are discarded. n_active_sub_groups = n_active_sub_groups // two_as_a_long work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups if ( (local_row_idx < n_active_sub_groups) and - (col_idx < n_cols) and + # HACK: this condition save some memory access and was added thinking + # it might be good for performance. However on CPU it causes wrong + # results in some tests. + # TODO: create a minimal reproducer and report to `numba_dpex`. Remove + # the hack once it is fixed. + (_is_cpu or (col_idx < n_cols)) and (work_item_row_idx < sum_axis_size) ): local_values[local_row_idx, local_col_idx] += ( diff --git a/sklearn_numba_dpex/common/tests/test_kernels.py b/sklearn_numba_dpex/common/tests/test_kernels.py index fa4d620..0a1b17b 100644 --- a/sklearn_numba_dpex/common/tests/test_kernels.py +++ b/sklearn_numba_dpex/common/tests/test_kernels.py @@ -14,9 +14,9 @@ @pytest.mark.parametrize("axis", [0, 1]) -@pytest.mark.parametrize("work_group_size", [2, 4, 8, "max"]) +@pytest.mark.parametrize("work_group_size", [1, 2, 4, 8, "max"]) @pytest.mark.parametrize( - "test_input_shape", [(1, 1), (1, 3), (3, 1), (3, 5), (5, 3), (3, 4), (4, 3)] + "test_input_shape", [(1, 1), (1, 3), (3, 1), (3, 5), (5, 3), (3, 4), (4, 3), (3, 9)] ) @pytest.mark.parametrize("dtype", float_dtype_params) def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): @@ -31,6 +31,8 @@ def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): if work_group_size == "max": sub_group_size = min(device.sub_group_sizes) + elif work_group_size == 1: + sub_group_size = 1 else: sub_group_size = work_group_size // 2 @@ -48,7 +50,7 @@ def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): assert_allclose(expected_result, actual_result) -@pytest.mark.parametrize("work_group_size", [2, 4, 8, "max"]) +@pytest.mark.parametrize("work_group_size", [1, 2, 4, 8, "max"]) @pytest.mark.parametrize("length, expected_result", [(4, 6), (5, 10)]) @pytest.mark.parametrize("dtype", float_dtype_params) def test_sum_reduction_1d(length, expected_result, dtype, work_group_size): From d70ab27641e264f2109271572ffdca3f28138e8e Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Wed, 8 Feb 2023 14:05:44 +0100 Subject: [PATCH 08/13] Apply suggestions Co-authored-by: Julien Jerphanion --- sklearn_numba_dpex/kmeans/drivers.py | 4 +++- .../kmeans/kernels/compute_euclidean_distances.py | 4 +++- sklearn_numba_dpex/kmeans/kernels/compute_labels.py | 5 ++++- sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py | 4 +++- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sklearn_numba_dpex/kmeans/drivers.py b/sklearn_numba_dpex/kmeans/drivers.py index d366a98..121228b 100644 --- a/sklearn_numba_dpex/kmeans/drivers.py +++ b/sklearn_numba_dpex/kmeans/drivers.py @@ -468,9 +468,11 @@ def prepare_data_for_lloyd(X_t, init, tol, sample_weight, copy_x): dtype=compute_dtype, ) + elementwise_divide_by_n_samples_fn = _divide_by(compute_dtype(n_samples)) + divide_by_n_samples_kernel = make_apply_elementwise_func( (n_features,), - _divide_by(compute_dtype(n_samples)), + elementwise_divide_by_n_samples_fn, max_work_group_size, compute_dtype, ) diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py index 1219e89..d05eb1a 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_euclidean_distances.py @@ -130,8 +130,10 @@ def compute_distances( dpex.barrier(dpex.CLK_LOCAL_MEM_FENCE) + n_windows_for_sample = math.ceil(n_samples / window_n_centroids) + global_size = ( - math.ceil(math.ceil(n_samples / window_n_centroids) / centroids_window_height) + math.ceil(n_windows_for_sample / centroids_window_height) * centroids_window_height, window_n_centroids, ) diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py index 0ff3ad9..70dbb57 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py @@ -170,9 +170,12 @@ def assignment( assignments_idx[sample_idx] = min_idx + n_windows_for_sample = math.ceil(n_samples / window_n_centroids) + global_size = ( - math.ceil(math.ceil(n_samples / window_n_centroids) / centroids_window_height) + math.ceil(n_windows_for_sample / centroids_window_height) * centroids_window_height, window_n_centroids, ) + return assignment[global_size, work_group_shape] diff --git a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py index bbc29f1..ba22b93 100644 --- a/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py +++ b/sklearn_numba_dpex/kmeans/kernels/kmeans_plusplus.py @@ -234,8 +234,10 @@ def kmeansplusplus_single_step( dpex.barrier(dpex.CLK_LOCAL_MEM_FENCE) + n_windows_for_samples = math.ceil(n_samples / window_n_candidates) + global_size = ( - math.ceil(math.ceil(n_samples / window_n_candidates) / candidates_window_height) + math.ceil(n_windows_for_samples / candidates_window_height) * candidates_window_height, window_n_candidates, ) From a3a7530debfd4d86c4940918d83b3b86fbc28d3a Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Wed, 8 Feb 2023 14:23:24 +0100 Subject: [PATCH 09/13] fixup sum with wgs 1 --- sklearn_numba_dpex/common/kernels.py | 4 ++-- sklearn_numba_dpex/common/tests/test_kernels.py | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index 72a8d13..c964e75 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -292,8 +292,8 @@ def make_sum_reduction_2d_kernel( ) # XXX: The kernels seem to work fine with work_group_size==1 on GPU but fail on CPU. - # if math.prod(work_group_shape) == 1: - # raise NotImplementedError("work_group_size==1 is not supported.") + if math.prod(work_group_shape) == 1: + raise NotImplementedError("work_group_size==1 is not supported.") # NB: the shape of the work group is different in each of those two cases. Summing # efficiently requires to adapt to very different IO patterns depending on the sum diff --git a/sklearn_numba_dpex/common/tests/test_kernels.py b/sklearn_numba_dpex/common/tests/test_kernels.py index 0a1b17b..b0596f7 100644 --- a/sklearn_numba_dpex/common/tests/test_kernels.py +++ b/sklearn_numba_dpex/common/tests/test_kernels.py @@ -14,7 +14,7 @@ @pytest.mark.parametrize("axis", [0, 1]) -@pytest.mark.parametrize("work_group_size", [1, 2, 4, 8, "max"]) +@pytest.mark.parametrize("work_group_size", [2, 4, 8, "max"]) @pytest.mark.parametrize( "test_input_shape", [(1, 1), (1, 3), (3, 1), (3, 5), (5, 3), (3, 4), (4, 3), (3, 9)] ) @@ -31,8 +31,6 @@ def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): if work_group_size == "max": sub_group_size = min(device.sub_group_sizes) - elif work_group_size == 1: - sub_group_size = 1 else: sub_group_size = work_group_size // 2 @@ -50,7 +48,7 @@ def test_sum_reduction_2d(test_input_shape, work_group_size, axis, dtype): assert_allclose(expected_result, actual_result) -@pytest.mark.parametrize("work_group_size", [1, 2, 4, 8, "max"]) +@pytest.mark.parametrize("work_group_size", [2, 4, 8, "max"]) @pytest.mark.parametrize("length, expected_result", [(4, 6), (5, 10)]) @pytest.mark.parametrize("dtype", float_dtype_params) def test_sum_reduction_1d(length, expected_result, dtype, work_group_size): From f9e099461a79beb72fe86c9a1584d0684feac05b Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 9 Feb 2023 14:34:08 +0100 Subject: [PATCH 10/13] hack group sizes for sum(axis=0) on cpu --- sklearn_numba_dpex/common/kernels.py | 30 +++++++++++++--------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index c964e75..62cedf7 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -583,33 +583,36 @@ def fused_elementwise_func_(x): check_power_of_2(n_sub_groups_per_work_group) else: + # HACK: this kernel unexpectedly mis-behave when work group has size >= 8 on + # CPU, so we force low group sizes. + work_group_size = sub_group_size = min(device.sub_group_sizes) + # TODO: there seem to be a bug that causes the last group not to be executed + # for some group sizes. The kernel for `sum(axis=1)` is very similar + # and does not show this unexpected behavior, so it suggets a bug in the JIT. + # Find a reproducer and submit it in the issue tracker. Or is it a misuse of + # group sizes for CPU ? + # Round work_group_size to the maximum smaller power-of-two multiple of # `sub_group_size` n_sub_groups_per_work_group = get_maximum_power_of_2_smaller_than( work_group_size / sub_group_size ) + n_sub_groups_per_work_group = n_sub_groups_per_work_group work_group_size = n_sub_groups_per_work_group * sub_group_size - _is_cpu = device.has_aspect_cpu - ( work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype, _is_cpu + n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype ) if fused_elementwise_func is None: partial_sum_reduction_nofunc = partial_sum_reduction else: *_, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, - work_group_size, - sub_group_size, - fused_elementwise_func_, - dtype, - _is_cpu, + n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype ) get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, n_cols) @@ -629,7 +632,7 @@ def fused_elementwise_func_(x): def _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype, _is_cpu + n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype ): """When axis=0, each work group performs a local reduction on axis 0 in a window of size `(sub_group_size_,work_group_size // sub_group_size)`.""" @@ -745,12 +748,7 @@ def partial_sum_reduction( work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups if ( (local_row_idx < n_active_sub_groups) and - # HACK: this condition save some memory access and was added thinking - # it might be good for performance. However on CPU it causes wrong - # results in some tests. - # TODO: create a minimal reproducer and report to `numba_dpex`. Remove - # the hack once it is fixed. - (_is_cpu or (col_idx < n_cols)) and + (col_idx < n_cols) and (work_item_row_idx < sum_axis_size) ): local_values[local_row_idx, local_col_idx] += ( From 9bbdfd2166e766f6d523b56957b9b53b6c67a506 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 9 Feb 2023 17:06:45 +0100 Subject: [PATCH 11/13] investigating sum(axis=0) cpu issue --- sklearn_numba_dpex/common/kernels.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index 62cedf7..dcb0ba1 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -583,15 +583,6 @@ def fused_elementwise_func_(x): check_power_of_2(n_sub_groups_per_work_group) else: - # HACK: this kernel unexpectedly mis-behave when work group has size >= 8 on - # CPU, so we force low group sizes. - work_group_size = sub_group_size = min(device.sub_group_sizes) - # TODO: there seem to be a bug that causes the last group not to be executed - # for some group sizes. The kernel for `sum(axis=1)` is very similar - # and does not show this unexpected behavior, so it suggets a bug in the JIT. - # Find a reproducer and submit it in the issue tracker. Or is it a misuse of - # group sizes for CPU ? - # Round work_group_size to the maximum smaller power-of-two multiple of # `sub_group_size` n_sub_groups_per_work_group = get_maximum_power_of_2_smaller_than( @@ -745,11 +736,12 @@ def partial_sum_reduction( # At each iteration, half of the remaining work items with the highest id # are discarded. n_active_sub_groups = n_active_sub_groups // two_as_a_long - work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups + # work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups if ( - (local_row_idx < n_active_sub_groups) and - (col_idx < n_cols) and - (work_item_row_idx < sum_axis_size) + (local_row_idx < n_active_sub_groups) + # and + # (col_idx < n_cols) and + # (work_item_row_idx < sum_axis_size) ): local_values[local_row_idx, local_col_idx] += ( local_values[local_row_idx + n_active_sub_groups, local_col_idx] From 90725dbe8e00c6164e0c7bf3c5141c313fe1d93b Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 9 Feb 2023 17:24:24 +0100 Subject: [PATCH 12/13] investigating sum(axis=0) cpu issue #2 --- sklearn_numba_dpex/common/kernels.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index dcb0ba1..c5d7857 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -591,19 +591,26 @@ def fused_elementwise_func_(x): n_sub_groups_per_work_group = n_sub_groups_per_work_group work_group_size = n_sub_groups_per_work_group * sub_group_size + _is_cpu = device.has_aspect_cpu + ( work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype + n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype, _is_cpu ) if fused_elementwise_func is None: partial_sum_reduction_nofunc = partial_sum_reduction else: *_, partial_sum_reduction_nofunc = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype + n_cols, + work_group_size, + sub_group_size, + fused_elementwise_func_, + dtype, + _is_cpu, ) get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, n_cols) @@ -623,7 +630,7 @@ def fused_elementwise_func_(x): def _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype + n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype, _is_cpu ): """When axis=0, each work group performs a local reduction on axis 0 in a window of size `(sub_group_size_,work_group_size // sub_group_size)`.""" @@ -736,12 +743,11 @@ def partial_sum_reduction( # At each iteration, half of the remaining work items with the highest id # are discarded. n_active_sub_groups = n_active_sub_groups // two_as_a_long - # work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups + work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups if ( - (local_row_idx < n_active_sub_groups) - # and - # (col_idx < n_cols) and - # (work_item_row_idx < sum_axis_size) + (local_row_idx < n_active_sub_groups) and + (_is_cpu or ((col_idx < n_cols) and + (work_item_row_idx < sum_axis_size))) ): local_values[local_row_idx, local_col_idx] += ( local_values[local_row_idx + n_active_sub_groups, local_col_idx] From 33ef2dc3ce14473f0eaa7428f01314f90cf73aec Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Mon, 13 Feb 2023 12:05:17 +0100 Subject: [PATCH 13/13] Apply suggestions Co-authored-by: Julien Jerphanion Co-authored-by: Olivier Grisel --- docker/Dockerfile | 1 - sklearn_numba_dpex/common/kernels.py | 35 +++++-------- .../kernels/_base_kmeans_kernel_funcs.py | 28 ++++++++--- .../kmeans/kernels/compute_labels.py | 5 +- .../kmeans/kernels/lloyd_single_step.py | 13 +++-- sklearn_numba_dpex/kmeans/kernels/utils.py | 49 ++++++++++--------- 6 files changed, 69 insertions(+), 62 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 7b64808..f103651 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -131,7 +131,6 @@ ARG ONEAPI_CACHE_DIR=$TMPDIR/intel/cache ARG ONEAPI_DOWNLOAD_DIR=$TMPDIR/intel/download ARG ONEAPI_LOG_DIR=$TMPDIR/intel/log ARG ONEAPI_COMPONENTS="intel.oneapi.lin.dpcpp-cpp-compiler:intel.oneapi.lin.tbb.devel:intel.oneapi.lin.mkl.devel" -ARG LLVM_SPIRV_INSTALL_DIR RUN mkdir -p $ONEAPI_INSTALL_DIR $ONEAPI_CACHE_DIR $ONEAPI_DOWNLOAD_DIR $ONEAPI_LOG_DIR \ && wget -P $ONEAPI_DOWNLOAD_DIR $ONEAPI_INSTALLER_URL/$ONEAPI_INSTALL_BINARY_NAME \ && chmod +x $ONEAPI_DOWNLOAD_DIR/$ONEAPI_INSTALL_BINARY_NAME \ diff --git a/sklearn_numba_dpex/common/kernels.py b/sklearn_numba_dpex/common/kernels.py index c5d7857..cb3f109 100644 --- a/sklearn_numba_dpex/common/kernels.py +++ b/sklearn_numba_dpex/common/kernels.py @@ -405,14 +405,9 @@ def fused_elementwise_func_(x): result_sum_axis_size * work_group_size, ) - return ( - work_group_shape, - ( - (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels - reduction_block_size, - ), - (get_result_shape, get_global_size), - ) # shape_update_fn + kernels = (partial_sum_reduction, partial_sum_reduction_nofunc) + shape_update_fn = (get_result_shape, get_global_size) + return (work_group_shape, (kernels, reduction_block_size), shape_update_fn) def _make_partial_sum_reduction_2d_axis1_kernel( @@ -591,14 +586,12 @@ def fused_elementwise_func_(x): n_sub_groups_per_work_group = n_sub_groups_per_work_group work_group_size = n_sub_groups_per_work_group * sub_group_size - _is_cpu = device.has_aspect_cpu - ( work_group_shape, reduction_block_size, partial_sum_reduction, ) = _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype, _is_cpu + n_cols, work_group_size, sub_group_size, fused_elementwise_func_, dtype ) if fused_elementwise_func is None: @@ -610,7 +603,6 @@ def fused_elementwise_func_(x): sub_group_size, fused_elementwise_func_, dtype, - _is_cpu, ) get_result_shape = lambda result_sum_axis_size: (result_sum_axis_size, n_cols) @@ -619,18 +611,13 @@ def fused_elementwise_func_(x): sub_group_size * math.ceil(n_cols / sub_group_size), ) - return ( - work_group_shape, - ( - (partial_sum_reduction, partial_sum_reduction_nofunc), # kernels - reduction_block_size, - ), - (get_result_shape, get_global_size), - ) # shape_update_fn + kernels = (partial_sum_reduction, partial_sum_reduction_nofunc) + shape_update_fn = (get_result_shape, get_global_size) + return (work_group_shape, (kernels, reduction_block_size), shape_update_fn) def _make_partial_sum_reduction_2d_axis0_kernel( - n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype, _is_cpu + n_cols, work_group_size, sub_group_size, fused_elementwise_func, dtype ): """When axis=0, each work group performs a local reduction on axis 0 in a window of size `(sub_group_size_,work_group_size // sub_group_size)`.""" @@ -714,7 +701,7 @@ def partial_sum_reduction( # the work item in the 2D index): col_idx = ( (dpex.get_group_id(one_idx) * sub_group_size) + local_col_idx - ) + ) # We must be careful to not read items outside of the array ! sum_axis_size = summands.shape[zero_idx] @@ -746,8 +733,8 @@ def partial_sum_reduction( work_item_row_idx = first_row_idx + local_row_idx + n_active_sub_groups if ( (local_row_idx < n_active_sub_groups) and - (_is_cpu or ((col_idx < n_cols) and - (work_item_row_idx < sum_axis_size))) + (col_idx < n_cols) and + (work_item_row_idx < sum_axis_size) ): local_values[local_row_idx, local_col_idx] += ( local_values[local_row_idx + n_active_sub_groups, local_col_idx] diff --git a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py index c70becd..38558d7 100644 --- a/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py +++ b/sklearn_numba_dpex/kmeans/kernels/_base_kmeans_kernel_funcs.py @@ -225,17 +225,31 @@ def _initialize_window_of_centroids( local_row_idx, # PARAM local_col_idx, # PARAM first_centroid_idx, # PARAM - centroids_half_l2_norm, # IN - window_of_centroids_half_l2_norms, # OUT - results, # OUT + centroids_half_l2_norm, # IN (self.n_clusters,) + window_of_centroids_half_l2_norms, # OUT (work_group_shape[1],) + results, # OUT (work_group_shape[1],) ): # fmt: on _initialize_results(results) - # The first `window_n_centroids` work items cooperate on loading the - # values of centroids_half_l2_norm relevant to current window. Each work - # item loads one single value. - if local_row_idx == zero_as_a_long: + # The work items are indexed in a 2D grid of shape + # `work_group_shape = (centroids_window_height, window_n_centroids)`, where + # `centroids_window_height` and `window_n_centroids` refer to a window of + # centroids that is entirely within the boundaries of the centroid array. + # The `window_n_centroids` work items in the first row cooperate on loading + # the values of `centroids_half_l2_norm` relevant to current window. Each + # work item loads one single value. + + # NB: Close to the boundaries, the value of `window_n_centroids` is + # adjusted so that the window fits within the boundaries of the array, + # however the shape of the work group does not change. The work items in + # the 2D grid such as `local_col_idx` is greater than the actual value of + # `window_n_centroids` at the boundaries must be discarded, to avoid + # reading unallocated space in global memory. + if ( + (local_row_idx == zero_as_a_long) # select first row + and (local_col_idx < window_n_centroids) # necessary condition at boundaries # noqa + ): window_of_centroids_half_l2_norms[local_col_idx] = ( centroids_half_l2_norm[first_centroid_idx + local_col_idx] ) diff --git a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py index 70dbb57..f8f6e5b 100644 --- a/sklearn_numba_dpex/kmeans/kernels/compute_labels.py +++ b/sklearn_numba_dpex/kmeans/kernels/compute_labels.py @@ -30,7 +30,6 @@ def make_label_assignment_fixed_window_kernel( required_memory_constant=window_n_centroids * dtype_itemsize, ) - centroids_window_width = window_n_centroids centroids_window_height = work_group_size // sub_group_size if work_group_size != input_work_group_size: @@ -70,8 +69,6 @@ def make_label_assignment_fixed_window_kernel( last_centroid_window_idx = n_windows_for_centroids - 1 last_feature_window_idx = n_windows_for_features - 1 - centroids_window_shape = (centroids_window_height, centroids_window_width) - inf = dtype(math.inf) zero_idx = np.int64(0) one_idx = np.int64(1) @@ -92,7 +89,7 @@ def assignment( + local_col_idx ) - centroids_window = dpex.local.array(shape=centroids_window_shape, dtype=dtype) + centroids_window = dpex.local.array(shape=work_group_shape, dtype=dtype) window_of_centroids_half_l2_norms = dpex.local.array( shape=window_n_centroids, dtype=dtype ) diff --git a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py index 02b0d57..5be9acb 100644 --- a/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py +++ b/sklearn_numba_dpex/kmeans/kernels/lloyd_single_step.py @@ -119,8 +119,8 @@ def make_lloyd_single_step_fixed_window_kernel( # information and apply it here. # NB: for more details about the privatization strategy the following variables - # refer too, please read the inline commenting that address it with more details in - # the kernel definition. + # refer too, please read the inline commenting that address it in the kernel + # definition. # Ensure that the memory allocated for privatization will not saturate the global # memory cache size. For this purpose we limit the number of private copies to @@ -130,7 +130,7 @@ def make_lloyd_single_step_fixed_window_kernel( ) // n_cluster_bytes # Each set of `sub_group_size` consecutive work items is assigned one private - # copy, and several such set can be assigned to the same private copy. Thus, at + # copy, and several such sets can be assigned to the same private copy. Thus, at # most `n_subgroups` private copies are needed. # Moreover, collisions can only occur between sub groups that execute concurrently. # Thus, at most `nb_concurrent_sub_groups` private copies are needed. @@ -199,7 +199,14 @@ def fused_lloyd_single_step( sub_group_idx = dpex.get_global_id(zero_idx) local_row_idx = dpex.get_local_id(zero_idx) local_col_idx = dpex.get_local_id(one_idx) + + # Let's start by remapping the 2D grid of work items to a 1D grid that reflect + # how contiguous work items address one contiguoue sample_idx: sample_idx = (sub_group_idx * sub_group_size) + local_col_idx + # NB: The 2D work group shape makes it easier (and less expensive) to map + # the local memory arrays to the array of centroids. Do not get confused by the + # fact that this shape is unrelated to how the kernel is parallelized on the + # samples, where each work item applies to one sample. # This array in shared memory is used as a sliding array over values of # current_centroids_t. During each iteration in the inner loop, a new one is diff --git a/sklearn_numba_dpex/kmeans/kernels/utils.py b/sklearn_numba_dpex/kmeans/kernels/utils.py index e92b004..cbe204c 100644 --- a/sklearn_numba_dpex/kmeans/kernels/utils.py +++ b/sklearn_numba_dpex/kmeans/kernels/utils.py @@ -211,12 +211,12 @@ def make_reduce_centroid_data_kernel( @dpex.kernel # fmt: off def _reduce_centroid_data_kernel( - cluster_sizes_private_copies, # IN (n_copies, n_clusters) - centroids_t_private_copies, # IN (n_copies, n_features * n_clusters) (reshaped) # noqa - cluster_sizes, # OUT (n_clusters,) - centroids_t, # OUT (n_features * n_clusters,) (flattened) - empty_clusters_list, # OUT (n_clusters,) - n_empty_clusters, # OUT (1,) + cluster_sizes_private_copies, # IN (n_copies, n_clusters) + centroids_t_private_copies_reshaped, # IN (n_copies, n_features * n_clusters) # noqa + cluster_sizes, # OUT (n_clusters,) + centroids_t_flattened, # OUT (n_features * n_clusters,) # noqa + empty_clusters_list, # OUT (n_clusters,) + n_empty_clusters, # OUT (1,) ): # fmt: on item_idx = dpex.get_global_id(zero_idx) @@ -228,21 +228,24 @@ def _reduce_centroid_data_kernel( # reduce the centroid values if item_idx < n_centroid_items: for copy_idx in range(n_centroids_private_copies): - sum_ += centroids_t_private_copies[copy_idx, item_idx] - centroids_t[item_idx] = sum_ + sum_ += centroids_t_private_copies_reshaped[copy_idx, item_idx] + centroids_t_flattened[item_idx] = sum_ + return - elif item_idx < n_sums: - cluster_idx = item_idx - n_centroid_items - for copy_idx in range(n_centroids_private_copies): - sum_ += cluster_sizes_private_copies[copy_idx, cluster_idx] - cluster_sizes[cluster_idx] = sum_ + cluster_idx = item_idx - n_centroid_items + for copy_idx in range(n_centroids_private_copies): + sum_ += cluster_sizes_private_copies[copy_idx, cluster_idx] + cluster_sizes[cluster_idx] = sum_ - # register empty clusters - if sum_ == zero: - current_n_empty_clusters = dpex.atomic.add( - n_empty_clusters, zero_idx, one_incr - ) - empty_clusters_list[current_n_empty_clusters] = cluster_idx + if sum_ > 0: + return + + # register empty clusters + if sum_ == zero: + current_n_empty_clusters = dpex.atomic.add( + n_empty_clusters, zero_idx, one_incr + ) + empty_clusters_list[current_n_empty_clusters] = cluster_idx reduce_centroid_data_kernel = _reduce_centroid_data_kernel[ global_size, work_group_size @@ -256,16 +259,16 @@ def reduce_centroid_data( empty_clusters_list, n_empty_clusters, ): - centroids_t_private_copies = dpt.reshape( + centroids_t_private_copies_reshaped = dpt.reshape( centroids_t_private_copies, (n_centroids_private_copies, n_features * n_clusters), ) - centroids_t = dpt.asarray(dpt.reshape(centroids_t, (-1,))) + centroids_t_flattened = dpt.asarray(dpt.reshape(centroids_t, (-1,))) reduce_centroid_data_kernel( cluster_sizes_private_copies, - centroids_t_private_copies, + centroids_t_private_copies_reshaped, cluster_sizes, - centroids_t, + centroids_t_flattened, empty_clusters_list, n_empty_clusters, )