diff --git a/janitor/functions/_numba.py b/janitor/functions/_numba.py index 445ab4040..c40daa69f 100644 --- a/janitor/functions/_numba.py +++ b/janitor/functions/_numba.py @@ -2,256 +2,19 @@ from __future__ import annotations -from math import ceil -from typing import Any, Union +from typing import Any import numpy as np -import pandas as pd -from numba import njit, prange +from numba import literal_unroll, njit, prange, types +from numba.extending import overload from pandas.api.types import ( is_datetime64_dtype, - is_extension_array_dtype, + is_numeric_dtype, + is_timedelta64_dtype, ) # https://numba.discourse.group/t/uint64-vs-int64-indexing-performance-difference/1500 # indexing with unsigned integers offers more performance -from janitor.functions.utils import ( - _generic_func_cond_join, - greater_than_join_types, -) - - -def _convert_to_numpy( - left: np.ndarray, right: np.ndarray -) -> tuple[np.ndarray, np.ndarray]: - """ - Ensure array is a numpy array. - """ - if is_extension_array_dtype(left): - array_dtype = left.dtype.numpy_dtype - left = left.astype(array_dtype) - right = right.astype(array_dtype) - if is_datetime64_dtype(left): - left = left.view(np.int64) - right = right.view(np.int64) - return left, right - - -def _numba_equi_join( - df: pd.DataFrame, - right: pd.DataFrame, - eqs: tuple, - ge_gt: tuple, - le_lt: tuple, -) -> Union[tuple[np.ndarray, np.ndarray], None]: - """ - Compute indices when an equi join is present. - """ - # the logic is to delay searching for actual matches - # while reducing the search space - # to get the smallest possible search area - # this serves as an alternative to pandas' hash join - # and in some cases, - # usually for many to many joins, - # can offer significant performance improvements. - # it relies on binary searches, within the groups, - # and relies on the fact that sorting ensures the first - # two columns from the right dataframe are in ascending order - # per group - this gives us the opportunity to - # only do a linear search, within the groups, - # for the last column (if any) - # (the third column is applicable only for range joins) - # Example : - # df1: - # id value_1 - # 0 1 2 - # 1 1 5 - # 2 1 7 - # 3 2 1 - # 4 2 3 - # 5 3 4 - # - # - # df2: - # id value_2A value_2B - # 0 1 0 1 - # 1 1 3 5 - # 2 1 7 9 - # 3 1 12 15 - # 4 2 0 1 - # 5 2 2 4 - # 6 2 3 6 - # 7 3 1 3 - # - # - # join condition -> - # ('id', 'id', '==') & - # ('value_1', 'value_2A','>') & - # ('value_1', 'value_2B', '<') - # - # - # note how for df2, id and value_2A - # are sorted per group - # the third column (relevant for range join) - # may or may not be sorted per group - # (the group is determined by the values of the id column) - # and as such, we do a linear search in that space, per group - # - # first we get the slice boundaries based on id -> ('id', 'id', '==') - # value start end - # 1 0 4 - # 1 0 4 - # 1 0 4 - # 2 4 7 - # 2 4 7 - # 3 7 8 - # - # next step is to get the slice end boundaries, - # based on the greater than condition - # -> ('value_1', 'value_2A', '>') - # the search will be within each boundary - # so for the first row, value_1 is 2 - # the boundary search will be between 0, 4 - # for the last row, value_1 is 4 - # and its boundary search will be between 7, 8 - # since value_2A is sorted per group, - # a binary search is employed - # value start end value_1 new_end - # 1 0 4 2 1 - # 1 0 4 5 2 - # 1 0 4 7 2 - # 2 4 7 1 4 - # 2 4 7 3 6 - # 3 7 8 4 8 - # - # next step is to get the start boundaries, - # based on the less than condition - # -> ('value_1', 'value_2B', '<') - # note that we have new end boundaries, - # and as such, our boundaries will use that - # so for the first row, value_1 is 2 - # the boundary search will be between 0, 1 - # for the 5th row, value_1 is 3 - # and its boundary search will be between 4, 6 - # for value_2B, which is the third column - # sinc we are not sure whether it is sorted or not, - # a cumulative max array is used, - # to get the earliest possible slice start - # value start end value_1 new_start new_end - # 1 0 4 2 -1 1 - # 1 0 4 5 -1 2 - # 1 0 4 7 -1 2 - # 2 4 7 1 -1 5 - # 2 4 7 3 5 6 - # 3 7 8 4 -1 8 - # - # if there are no matches, boundary is reported as -1 - # from above, we can see that our search space - # is limited to just 5, 6 - # we can then search for actual matches - # id value_1 id value_2A value_2B - # 2 3 2 2 4 - # - left_column, right_column, _ = eqs - # steal some perf here within the binary search - # search for uniques - # and later index them with left_positions - left_positions, left_arr = df[left_column].factorize(sort=False) - right_arr = right[right_column]._values - left_index = df.index._values - right_index = right.index._values - slice_starts = right_arr.searchsorted(left_arr, side="left") - slice_starts = slice_starts[left_positions] - slice_ends = right_arr.searchsorted(left_arr, side="right") - slice_ends = slice_ends[left_positions] - # check if there is a search space - # this also lets us know if there are equi matches - keep_rows = slice_starts < slice_ends - if not keep_rows.any(): - return None - if not keep_rows.all(): - left_index = left_index[keep_rows] - slice_starts = slice_starts[keep_rows] - slice_ends = slice_ends[keep_rows] - - ge_arr1 = None - ge_arr2 = None - ge_strict = None - if ge_gt: - left_column, right_column, op = ge_gt - ge_arr1 = df.loc[left_index, left_column]._values - ge_arr2 = right[right_column]._values - ge_arr1, ge_arr2 = _convert_to_numpy(left=ge_arr1, right=ge_arr2) - ge_strict = True if op == ">" else False - - le_arr1 = None - le_arr2 = None - le_strict = None - if le_lt: - left_column, right_column, op = le_lt - le_arr1 = df.loc[left_index, left_column]._values - le_arr2 = right[right_column]._values - le_arr1, le_arr2 = _convert_to_numpy(left=le_arr1, right=le_arr2) - le_strict = True if op == "<" else False - - if le_lt and ge_gt: - group = right.groupby(eqs[1])[le_lt[1]] - # is the last column (le_lt) monotonic increasing? - # fast path if it is - all_monotonic_increasing = all( - arr.is_monotonic_increasing for _, arr in group - ) - if all_monotonic_increasing: - cum_max_arr = le_arr2[:] - else: - cum_max_arr = group.cummax()._values - if is_extension_array_dtype(cum_max_arr): - array_dtype = cum_max_arr.dtype.numpy_dtype - cum_max_arr = cum_max_arr.astype(array_dtype) - if is_datetime64_dtype(cum_max_arr): - cum_max_arr = cum_max_arr.view(np.int64) - - left_index, right_index = _numba_equi_join_range_join( - left_index, - right_index, - slice_starts, - slice_ends, - ge_arr1, - ge_arr2, - ge_strict, - le_arr1, - le_arr2, - le_strict, - all_monotonic_increasing, - cum_max_arr, - ) - - elif le_lt: - left_index, right_index = _numba_equi_le_join( - left_index, - right_index, - slice_starts, - slice_ends, - le_arr1, - le_arr2, - le_strict, - ) - - else: - left_index, right_index = _numba_equi_ge_join( - left_index, - right_index, - slice_starts, - slice_ends, - ge_arr1, - ge_arr2, - ge_strict, - ) - - if left_index is None: - return None - - return left_index, right_index @njit(parallel=True, cache=True) @@ -564,792 +327,1016 @@ def _numba_equi_join_range_join( return l_index, r_index -def _numba_single_non_equi_join( - left: pd.Series, right: pd.Series, op: str, keep: str -) -> tuple[np.ndarray, np.ndarray]: - """Return matching indices for single non-equi join.""" - if op == "!=": - outcome = _generic_func_cond_join( - left=left, right=right, op=op, multiple_conditions=False, keep=keep - ) - if outcome is None: - return None - return outcome - - outcome = _generic_func_cond_join( - left=left, right=right, op=op, multiple_conditions=True, keep="all" - ) - if outcome is None: - return None - left_index, right_index, starts = outcome - if op in greater_than_join_types: - right_index = right_index[::-1] - starts = right_index.size - starts - if keep == "first": - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_first_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - left_indices=left_indices, - right_indices=right_indices, - ) - if keep == "last": - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_last_dual( - left_index=left_index, - right_index=right_index, +@njit(cache=True, parallel=False) +def _numba_non_equi_join_not_monotonic_keep_all( + tupled, + left_index, + right_index, + left_regions, + right_regions, + maxxes, + lengths, + sorted_array, + positions_array, + load_factor, + starts, +) -> tuple: + """ + Get indices if there are more than two join conditions + """ + left_indices, right_indices, counts = ( + _numba_non_equi_join_not_monotonic_keep_all_indices( + left_regions=left_regions, + right_regions=right_regions, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, starts=starts, - left_indices=left_indices, - right_indices=right_indices, + load_factor=load_factor, ) + ) + if left_indices is None: + return None, None + indices = np.ones(right_indices.size, dtype=np.bool_) + l_booleans = np.ones(left_index.size, dtype=np.bool_) start_indices = np.empty(left_index.size, dtype=np.intp) start_indices[0] = 0 - indices = (right_index.size - starts).cumsum() - start_indices[1:] = indices[:-1] - indices = indices[-1] - left_indices = np.empty(indices, dtype=np.intp) - right_indices = np.empty(indices, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_all_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - left_indices=left_indices, - right_indices=right_indices, - start_indices=start_indices, - ) + start_indices[1:] = counts.cumsum()[:-1] + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in range(left_index.size): + _n = np.uintp(n) + if (not l_booleans[_n]) | (not counts[_n]): + l_booleans[_n] = False + continue + counter = 0 + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_arr[_nn] + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_arr[_nnn] + boolean = _compare(left_val, right_val, op) + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + if not np.any(l_booleans): + return None, None + total = indices.sum() + left_indexes = np.empty(total, dtype=np.intp) + right_indexes = np.empty(total, dtype=np.intp) + indexer = 0 + counter = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_index[_nn] + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + boolean = indices[_ind] + if not boolean: + continue + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_index[_nnn] + _indexer = np.uintp(indexer) + left_indexes[_indexer] = left_val + right_indexes[_indexer] = right_val + indexer += 1 + if indexer == total: + counter = 1 + break + if counter == 1: + break + return left_indexes, right_indexes -def _numba_multiple_non_equi_join( - df: pd.DataFrame, right: pd.DataFrame, gt_lt: list, keep: str -) -> tuple[np.ndarray, np.ndarray]: +@njit(cache=True, parallel=False) +def _numba_non_equi_join_not_monotonic_keep_first( + tupled, + left_index, + right_index, + left_regions, + right_regions, + maxxes, + lengths, + sorted_array, + positions_array, + load_factor, + starts, +) -> tuple: """ - # https://www.scitepress.org/papers/2018/68268/68268.pdf - An alternative to the _range_indices algorithm - and more generalised - it covers any pair of non equi joins - in >, >=, <, <=. - Returns a tuple of left and right indices. + Get indices if there are more than two join conditions """ - # implementation is based on the algorithm described in this paper - - # https://www.scitepress.org/papers/2018/68268/68268.pdf - - # summary: - # get regions for first and second conditions in the pair - # (l_col1, r_col1, op1), (l_col2, r_col2, op2) - # the idea is that r_col1 should always be ahead of the - # appropriate value from lcol1; same applies to l_col2 & r_col2. - # if the operator is in less than join types - # the l_col should be in ascending order - # if in greater than join types, l_col should be - # in descending order - # Example : - # df1: - # id value_1 - # 0 1 2 - # 1 1 5 - # 2 1 7 - # 3 2 1 - # 4 2 3 - # 5 3 4 - # - # - # df2: - # id value_2A value_2B - # 0 1 0 1 - # 1 1 3 5 - # 2 1 7 9 - # 3 1 12 15 - # 4 2 0 1 - # 5 2 2 4 - # 6 2 3 6 - # 7 3 1 3 - # - # - # ('value_1', 'value_2A','>'), ('value_1', 'value_2B', '<') - # for the first pair, since op is greater than - # 'value_1' is sorted in descending order - # our pairing should be : - # value source region number - # 12 value_2A 0 - # 7 value_2A 1 - # 7 value_1 2 - # 5 value_1 2 - # 4 value_1 2 - # 3 value_2A 2 - # 3 value_2A 2 - # 3 value_1 3 - # 2 value_2A 3 - # 2 value_1 4 - # 1 value_2A 4 - # 1 value_1 5 - # 0 value_2A 5 - # 0 value_2A 5 - # - # note that 7 for value_2A is not matched with 7 of value_1 - # because it is >, not >=, hence the different region numbers - # looking at the output above, we can safely discard regions 0 and 1 - # since they do not have any matches with value_1 - # for the second pair, since op is <, value_1 is sorted - # in ascending order, and our pairing should be: - # value source region number - # 1 value_2B 0 - # 1 value_2B 1 - # 1 value_1 2 - # 2 value_1 2 - # 3 value_2B 2 - # 3 value_1 3 - # 4 value_2B 3 - # 4 value_1 4 - # 5 value_2B 4 - # 5 value_1 5 - # 6 value_2B 5 - # 7 value_1 6 - # 9 value_2B 6 - # 15 value_2B 6 - # - # from the above we can safely discard regions 0 and 1, since there are - # no matches with value_1 ... note that the index for regions 0 and 1 - # coincide with the index for region 5 values in value_2A(0, 0); - # as such those regions will be discarded. - # Similarly, the index for regions 0 and 1 of value_2A(12, 7) - # coincide with the index for regions 6 for value_2B(9, 15); - # these will be discarded as well. - # let's create a table of the regions, paired with the index - # - # - # value_1 : - ############################################### - # index--> 2 1 5 4 0 3 - # pair1--> 2 2 2 3 4 5 - # pair2--> 6 5 4 3 2 2 - ############################################### - # - # - # value_2A, value_2B - ############################################## - # index --> 1 6 5 7 - # pair1 --> 2 2 3 4 - # pair2 --> 4 5 3 2 - ############################################## - # - # To find matching indices, the regions from value_1 must be less than - # or equal to the regions in value_2A/2B. - # pair1 <= pair1 and pair2 <= pair2 - # Starting from the highest region in value_1 - # 5 in pair1 is not less than any in value_2A/2B, so we discard - # 4 in pair1 is matched to 4 in pair1 of value_2A/2B - # we look at the equivalent value in pair2 for 4, which is 2 - # 2 matches 2 in pair 2, so we have a match -> (0, 7) - # 3 in pair 1 from value_1 matches 3 and 4 in pair1 for value_2A/2B - # next we compare the equivalent value from pair2, which is 3 - # 3 matches only 3 in value_2A/2B, so our only match is -> (4, 5) - # next is 2 (we have 3 2s in value_1 for pair1) - # they all match 2, 2, 3, 4 in pair1 of value_2A/2B - # compare the first equivalent in pair2 -> 4 - # 4 matches only 4, 5 in pair2 of value_2A/2B - # ->(5, 1), (5, 6) - # the next equivalent is -> 5 - # 5 matches only 5 in pair2 of value_2A/2B - # -> (1, 6) - # the last equivalent is -> 6 - # 6 has no match in pair2 of value_2A/2B, so we discard - # our final matching indices for the left and right pairs - ######################################################### - # left_index right_index - # 0 7 - # 4 5 - # 5 1 - # 5 6 - # 1 6 - ######################################################## - # and if we index the dataframes, we should get the output below: - ################################# - # value_1 value_2A value_2B - # 0 2 1 3 - # 1 5 3 6 - # 2 3 2 4 - # 3 4 3 5 - # 4 4 3 6 - ################################ - left_df = df[:] - right_df = right[:] - left_column, right_column, _ = gt_lt[0] - # sorting on the first column - # helps to achieve more performance - # when iterating to compare left and right regions for matches - # note - keep track of the original index - if not left_df[left_column].is_monotonic_increasing: - left_df = df.sort_values(left_column) - left_index = left_df.index._values - left_df.index = range(len(left_df)) - else: - left_index = left_df.index._values - if not right_df[right_column].is_monotonic_increasing: - right_df = right_df.sort_values(right_column) - # original_index and right_is_sorted - # is relevant where keep in {'first','last'} - right_index = right_df.index._values - right_df.index = range(len(right)) - right_is_sorted = False - else: - right_index = right_df.index._values - right_is_sorted = True - shape = (len(left_df), len(gt_lt)) - # use the l_booleans and r_booleans to track rows that have complete matches - left_regions = np.empty(shape=shape, dtype=np.intp, order="F") - l_booleans = np.zeros(len(df), dtype=np.intp) - shape = (len(right_df), len(gt_lt)) - right_regions = np.empty(shape=shape, dtype=np.intp, order="F") - r_booleans = np.zeros(len(right), dtype=np.intp) - for position, (left_column, right_column, op) in enumerate(gt_lt): - outcome = _generic_func_cond_join( - left=left_df[left_column], - right=right_df[right_column], - op=op, - multiple_conditions=True, - keep="all", - ) - if outcome is None: - return None - left_indexer, right_indexer, search_indices = outcome - if op in greater_than_join_types: - search_indices = right_indexer.size - search_indices - right_indexer = right_indexer[::-1] - r_region = np.zeros(right_indexer.size, dtype=np.intp) - r_region[search_indices] = 1 - r_region[0] -= 1 - r_region = r_region.cumsum() - left_regions[left_indexer, position] = r_region[search_indices] - l_booleans[left_indexer] += 1 - right_regions[right_indexer, position] = r_region - r_booleans[right_indexer] += 1 - r_region = None - search_indices = None - left_df = None - right_df = None - booleans = l_booleans == len(gt_lt) - if not booleans.any(): - return None - if not booleans.all(): - left_regions = left_regions[booleans] - left_index = left_index[booleans] - booleans = r_booleans == len(gt_lt) - if not booleans.any(): - return None - if not booleans.all(): - right_regions = right_regions[booleans] - right_index = right_index[booleans] - l_booleans = None - r_booleans = None - if gt_lt[0][-1] in greater_than_join_types: - left_regions = left_regions[::-1] - left_index = left_index[::-1] - right_regions = right_regions[::-1] - right_index = right_index[::-1] - right_index_flipped = True - else: - right_index_flipped = False - starts = right_regions[:, 0].searchsorted(left_regions[:, 0]) - booleans = starts < len(right_regions) - if not booleans.any(): - return None - if not booleans.all(): - starts = starts[booleans] - left_regions = left_regions[booleans] - left_index = left_index[booleans] - arr = pd.Index(right_regions[:, 1]) - check_increasing = arr.is_monotonic_increasing - check_decreasing = arr.is_monotonic_decreasing - arr = None - if check_increasing: - search_indices = right_regions[:, 1].searchsorted(left_regions[:, 1]) - booleans = search_indices < len(right_regions) - if not booleans.any(): - return None - if not booleans.all(): - starts = starts[booleans] - search_indices = search_indices[booleans] - left_regions = left_regions[booleans] - left_index = left_index[booleans] - booleans = starts > search_indices - starts = np.where(booleans, starts, search_indices) - if right_is_sorted & (len(gt_lt) == 2): - ends = np.empty(left_index.size, dtype=np.intp) - ends[:] = right_index.size - elif check_decreasing: - ends = right_regions[::-1, 1].searchsorted(left_regions[:, 1]) - booleans = starts < len(right_regions) - if not booleans.any(): - return None - if not booleans.all(): - starts = starts[booleans] - left_regions = left_regions[booleans] - left_index = left_index[booleans] - ends = len(right_regions) - ends - booleans = starts < ends - if not booleans.any(): - return None - if not booleans.all(): - starts = starts[booleans] - left_regions = left_regions[booleans] - left_index = left_index[booleans] - ends = ends[booleans] - booleans = None - if ( - (check_decreasing | check_increasing) - & right_is_sorted - & (keep == "first") - & (len(gt_lt) == 2) - & right_index_flipped - ): - return left_index, right_index[ends - 1] - if ( - (check_decreasing | check_increasing) - & right_is_sorted - & (keep == "last") - & (len(gt_lt) == 2) - & right_index_flipped - ): - return left_index, right_index[starts] - if ( - (check_decreasing | check_increasing) - & right_is_sorted - & (keep == "first") - & (len(gt_lt) == 2) - ): - return left_index, right_index[starts] - if ( - (check_decreasing | check_increasing) - & right_is_sorted - & (keep == "last") - & (len(gt_lt) == 2) - ): - return left_index, right_index[ends - 1] - - if (check_increasing) & (len(gt_lt) == 2) & (keep == "all"): - start_indices = np.empty(left_index.size, dtype=np.intp) - start_indices[0] = 0 - indices = (right_index.size - starts).cumsum() - start_indices[1:] = indices[:-1] - indices = indices[-1] - left_indices = np.empty(indices, dtype=np.intp) - right_indices = np.empty(indices, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_all_dual( - left_index=left_index, - right_index=right_index, + left_indices, right_indices, counts = ( + _numba_non_equi_join_not_monotonic_keep_all_indices( + left_regions=left_regions, + right_regions=right_regions, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, starts=starts, - left_indices=left_indices, - right_indices=right_indices, - start_indices=start_indices, + load_factor=load_factor, ) - - if (check_increasing) & (len(gt_lt) == 2) & (keep == "first"): - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_first_dual( - left_index=left_index, - right_index=right_index, + ) + if left_indices is None: + return None, None + indices = np.ones(right_indices.size, dtype=np.bool_) + l_booleans = np.ones(left_index.size, dtype=np.bool_) + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + start_indices[1:] = counts.cumsum()[:-1] + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in range(left_index.size): + _n = np.uintp(n) + if (not l_booleans[_n]) | (not counts[_n]): + l_booleans[_n] = False + continue + counter = 0 + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_arr[_nn] + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_arr[_nnn] + boolean = _compare(left_val, right_val, op) + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + total = l_booleans.sum() + if not total: + return None, None + left_indexes = np.empty(total, dtype=np.intp) + right_indexes = np.empty(total, dtype=np.intp) + indexer = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_index[_nn] + base = -1 + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + boolean = indices[_ind] + if not boolean: + continue + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_index[_nnn] + if (base == -1) | (right_val < base): + base = right_val + _indexer = np.uintp(indexer) + left_indexes[_indexer] = left_val + right_indexes[_indexer] = base + indexer += 1 + return left_indexes, right_indexes + + +@njit(cache=True, parallel=False) +def _numba_non_equi_join_not_monotonic_keep_last( + tupled, + left_index, + right_index, + left_regions, + right_regions, + maxxes, + lengths, + sorted_array, + positions_array, + load_factor, + starts, +) -> tuple: + """ + Get indices if there are more than two join conditions + """ + left_indices, right_indices, counts = ( + _numba_non_equi_join_not_monotonic_keep_all_indices( + left_regions=left_regions, + right_regions=right_regions, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, starts=starts, - left_indices=left_indices, - right_indices=right_indices, + load_factor=load_factor, ) + ) + if left_indices is None: + return None, None + indices = np.ones(right_indices.size, dtype=np.bool_) + l_booleans = np.ones(left_index.size, dtype=np.bool_) + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + start_indices[1:] = counts.cumsum()[:-1] + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in range(left_index.size): + _n = np.uintp(n) + if (not l_booleans[_n]) | (not counts[_n]): + l_booleans[_n] = False + continue + counter = 0 + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_arr[_nn] + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_arr[_nnn] + boolean = _compare(left_val, right_val, op) + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + + total = l_booleans.sum() + if not total: + return None, None + left_indexes = np.empty(total, dtype=np.intp) + right_indexes = np.empty(total, dtype=np.intp) + indexer = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + nn = left_indices[_n] + _nn = np.uintp(nn) + size = counts[_n] + start_index = start_indices[_n] + left_val = left_index[_nn] + base = np.inf + for ind in range(start_index, start_index + size): + _ind = np.uintp(ind) + boolean = indices[_ind] + if not boolean: + continue + nnn = right_indices[_ind] + _nnn = np.uintp(nnn) + right_val = right_index[_nnn] + if (base == np.inf) | (right_val > base): + base = right_val + _indexer = np.uintp(indexer) + left_indexes[_indexer] = left_val + right_indexes[_indexer] = base + indexer += 1 + return left_indexes, right_indexes + + +@njit(inline="always") +def compare_values(left_val, right_val, op): + if op == 0: + return left_val > right_val + if op == 1: + return left_val >= right_val + if op == 2: + return left_val < right_val + if op == 3: + return left_val <= right_val + return left_val == right_val + + +def _compare(x, y, op): + if ( + (is_numeric_dtype(x) and is_numeric_dtype(y)) + or (is_datetime64_dtype(x) and is_datetime64_dtype(y)) + or (is_timedelta64_dtype(x) and is_timedelta64_dtype(y)) + ): + return compare_values(x, y, op) - if (check_increasing) & (len(gt_lt) == 2) & (keep == "last"): - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_increasing_keep_last_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - left_indices=left_indices, - right_indices=right_indices, - ) - if check_increasing: - if keep == "first": - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_increasing_keep_first( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) - ) - elif keep == "last": - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_increasing_keep_last( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) - ) - else: - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_increasing_keep_all( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) - ) - if left_indices is None: - return None - return left_indices, right_indices - - if (check_decreasing) & (len(gt_lt) == 2) & (keep == "all"): - start_indices = np.empty(left_index.size, dtype=np.intp) - start_indices[0] = 0 - indices = (ends - starts).cumsum() - start_indices[1:] = indices[:-1] - indices = indices[-1] - left_indices = np.empty(indices, dtype=np.intp) - right_indices = np.empty(indices, dtype=np.intp) - return _numba_non_equi_join_monotonic_decreasing_keep_all_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - left_indices=left_indices, - right_indices=right_indices, - start_indices=start_indices, - ) +accepted_types = ( + types.NPDatetime, + types.Integer, + types.Float, + types.NPTimedelta, +) - if (check_decreasing) & (len(gt_lt) == 2) & (keep == "first"): - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_decreasing_keep_first_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - left_indices=left_indices, - right_indices=right_indices, - ) - if (check_decreasing) & (len(gt_lt) == 2) & (keep == "last"): - left_indices = np.empty(left_index.size, dtype=np.intp) - right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_decreasing_keep_last_dual( - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - left_indices=left_indices, - right_indices=right_indices, - ) +@overload(_compare) +def _numba_compare(x, y, op): - if check_decreasing: - if keep == "first": - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_decreasing_keep_first( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) - ) + if ( + isinstance(x, accepted_types) + and isinstance(y, accepted_types) + and isinstance(op, types.Integer) + ): - elif keep == "last": - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_decreasing_keep_last( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) - ) + def impl(x, y, op): + return compare_values(x, y, op) - else: - left_indices, right_indices = ( - _numba_non_equi_join_monotonic_decreasing_keep_all( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) - ) - if left_indices is None: - return None - return left_indices, right_indices - # logic here is based on grantjenks' sortedcontainers - # https://github.com/grantjenks/python-sortedcontainers - load_factor = 1_000 - width = load_factor * 2 - length = ceil(right_index.size / load_factor) - # maintain a sorted array of the regions - sorted_array = np.empty( - (width, length), dtype=right_regions.dtype, order="F" - ) - # keep track of the positions of each region - # within the sorted array - positions_array = np.empty( - (width, length), dtype=right_regions.dtype, order="F" - ) - # keep track of the max value per column - maxxes = np.empty(length, dtype=np.intp) - # keep track of the length of actual data for each column - lengths = np.empty(length, dtype=np.intp) - if keep == "all": - left_indices, right_indices = ( - _numba_non_equi_join_not_monotonic_keep_all( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, - ) - ) - elif keep == "first": - left_indices, right_indices = ( - _numba_non_equi_join_not_monotonic_keep_first( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, - ) - ) - # keep == 'last' + return impl else: - left_indices, right_indices = ( - _numba_non_equi_join_not_monotonic_keep_last( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, - ) - ) - if left_indices is None: - return None - return left_indices, right_indices - - -@njit -def _numba_less_than(arr: np.ndarray, value: Any): - """ - Get earliest position in `arr` - where arr[i] <= `value` - """ - min_idx = 0 - max_idx = len(arr) - while min_idx < max_idx: - # to avoid overflow - mid_idx = min_idx + ((max_idx - min_idx) >> 1) - _mid_idx = np.uint64(mid_idx) - if arr[_mid_idx] < value: - min_idx = mid_idx + 1 - else: - max_idx = mid_idx - # it is greater than - # the max value in the array - if min_idx == len(arr): - return -1 - return min_idx + raise TypeError("Unsupported Type") -@njit(cache=True) -def _numba_non_equi_join_not_monotonic_keep_all( - left_regions: np.ndarray, - right_regions: np.ndarray, +@njit(cache=True, parallel=True) +def _range_join_sorted_dual_keep_all( left_index: np.ndarray, right_index: np.ndarray, - maxxes: np.ndarray, - lengths: np.ndarray, - sorted_array: np.ndarray, - positions_array: np.ndarray, starts: np.ndarray, - load_factor: int, + ends: np.ndarray, + start_indices: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, ): """ - Get indices for non-equi join, - where the right regions are not monotonic + Get indices for a dual non equi join """ - # first pass - get actual length - length = left_index.size - end = right_index.size - end -= 1 - # add the last region - # no need to have this checked within an if-else statement - # in the for loop below - region = right_regions[np.uint64(end), 0] - sorted_array[0, 0] = region - positions_array[0, 0] = end - # keep track of the maxxes array - # how many cells have actual values? - maxxes_counter = 1 - maxxes[0] = region - lengths[0] = 1 - r_count = 0 - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - for indexer in range(length - 1, -1, -1): - _indexer = np.uint64(indexer) - start = starts[_indexer] + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + end = ends[_ind] + indexer = start_indices[_ind] + lindex = left_index[_ind] for num in range(start, end): - _num = np.uint64(num) - region = right_regions[_num, 0] - arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=region) - # it is larger than the max in the maxxes array - # shove it into the last column - if posn == -1: - posn = maxxes_counter - 1 - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - len_arr_ = np.uint64(len_arr) - sorted_array[len_arr_, posn_] = region - positions_array[len_arr_, posn_] = num - maxxes[posn_] = region - lengths[posn_] += 1 - else: - sorted_array, positions_array, lengths, maxxes = ( - _numba_sorted_array( - sorted_array=sorted_array, - positions_array=positions_array, - maxxes=maxxes, - lengths=lengths, - region=region, - posn=posn, - num=num, - ) - ) - r_count += 1 - posn_ = np.uint64(posn) - # have we exceeded the size of this column? - # do we need to trim and move data to other columns? - check = (lengths[posn_] == (load_factor * 2)) & ( - r_count < right_index.size - ) - if check: - ( - sorted_array, - positions_array, - lengths, - maxxes, - maxxes_counter, - ) = _expand_sorted_array( - sorted_array=sorted_array, - positions_array=positions_array, - lengths=lengths, - maxxes=maxxes, - posn=posn, - maxxes_counter=maxxes_counter, - load_factor=load_factor, - ) - # now we do a binary search - # for left region in right region - l_region = left_regions[_indexer, 0] - arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=l_region) - if posn == -1: - end = start - continue - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - arr = sorted_array[:len_arr, posn_] - _posn = _numba_less_than(arr=arr, value=l_region) - for ind in range(_posn, len_arr): - ind_ = np.uint64(ind) - counter = 1 - # move along the columns - # and look for matches - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: + _num = np.uintp(num) + rindex = right_index[_num] + _indexer = np.uintp(indexer) + left_indices[_indexer] = lindex + right_indices[_indexer] = rindex + indexer += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_all( + tupled, left_index, right_index, starts, indices, start_indices +) -> tuple: + """ + Get indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + end = right_index.size + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: continue - total += 1 - # check the remaining columns, if any - for ind in range(posn + 1, maxxes_counter): - ind_ = np.uint64(ind) - len_arr = lengths[ind_] - for num in range(len_arr): - _num = np.uint64(num) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - total += 1 - l_booleans[_indexer] = True - end = start - if total == 0: + start = starts[_n] + pos = 0 + counter = 0 + left_val = left_arr[_n] + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + if not np.any(l_booleans): return None, None - # second pass - fill arrays with indices - length = left_index.size + total = indices.sum() + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 end = right_index.size - end -= 1 - region = right_regions[np.uint64(end), 0] - sorted_array[0, 0] = region + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = right_index[_nn] + indexer += 1 + if indexer == total: + counter = 1 + break + if counter == 1: + break + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_first( + tupled, left_index, right_index, starts, indices, start_indices +) -> tuple: + """ + Get indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + end = right_index.size + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + left_val = left_arr[_n] + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + + total = l_booleans.sum() + if not total: + return None, None + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 + end = right_index.size + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + base = -1 + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + value = right_index[_nn] + if (base == -1) | (value < base): + base = value + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = base + indexer += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_last( + tupled, left_index, right_index, starts, indices, start_indices +) -> tuple: + """ + Get indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + base = 0 + end = right_index.size + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + left_val = left_arr[_n] + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + base += 1 + total = l_booleans.sum() + if not total: + return None, None + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 + end = right_index.size + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + base = -1 + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + value = right_index[_nn] + if (base == -1) | (value > base): + base = value + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = base + indexer += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _range_join_sorted_multiple_keep_all( + tupled, left_index, right_index, starts, ends, indices, start_indices +) -> tuple: + """ + Get indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + left_val = left_arr[_n] + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + if not np.any(l_booleans): + return None, None + total = indices.sum() + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = right_index[_nn] + indexer += 1 + if indexer == total: + counter = 1 + break + if counter == 1: + break + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _range_join_sorted_multiple_keep_first( + tupled, left_index, right_index, starts, ends, indices, start_indices +) -> tuple: + """ + Get earliest indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + left_val = left_arr[_n] + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + total = l_booleans.sum() + if not total: + return None, None + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + base = -1 + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + value = right_index[_nn] + if (base == -1) | (value < base): + base = value + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = base + indexer += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _range_join_sorted_multiple_keep_last( + tupled, left_index, right_index, starts, ends, indices, start_indices +) -> tuple: + """ + Get the latest indices if there are more than two join conditions, + and a range join, sorted on both right columns, exists. + """ + l_booleans = np.ones(left_index.size, dtype=np.bool_) + for _tuple in literal_unroll(tupled): + left_arr = _tuple[0] + right_arr = _tuple[1] + op = _tuple[2] + for n in prange(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + for nn in range(start, end): + _nn = np.uintp(nn) + left_val = left_arr[_n] + right_val = right_arr[_nn] + boolean = _compare(left_val, right_val, op) + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the conditionals below + pos += 1 + indices[_ind] &= boolean + counter += np.intp(boolean) + boolean_ = counter > 0 + l_booleans[_n] &= boolean_ + total = l_booleans.sum() + if not total: + return None, None + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + indexer = 0 + for n in range(left_index.size): + _n = np.uintp(n) + if not l_booleans[_n]: + continue + start = starts[_n] + end = ends[_n] + pos = 0 + counter = 0 + ind = start_indices[_n] + l_index = left_index[_n] + base = -1 + for nn in range(start, end): + _ind = np.uintp(ind + pos) + # pos should always increment + # no matter what happens + # with the condition below + pos += 1 + if not indices[_ind]: + continue + _nn = np.uintp(nn) + value = right_index[_nn] + if (base == -1) | (value > base): + base = value + _indexer = np.uintp(indexer) + left_indices[_indexer] = l_index + right_indices[_indexer] = base + indexer += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_range_join_sorted_keep_first_dual( + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + ends: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, +): + """ + Get indices for a non equi join + """ + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + end = ends[_ind] + lindex = left_index[_ind] + base_index = right_index[np.uintp(start)] + for num in range(start, end): + _num = np.uintp(num) + rindex = right_index[_num] + if rindex < base_index: + base_index = rindex + left_indices[_ind] = lindex + right_indices[_ind] = base_index + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_range_join_sorted_keep_last_dual( + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + ends: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, +): + """ + Get indices for a non equi join + """ + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + end = ends[_ind] + lindex = left_index[_ind] + base_index = right_index[np.uintp(start)] + for num in range(start, end): + _num = np.uintp(num) + rindex = right_index[_num] + if rindex > base_index: + base_index = rindex + left_indices[_ind] = lindex + right_indices[_ind] = base_index + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_first_dual( + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, +): + """ + Get indices for a non equi join + """ + end = right_index.size + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + lindex = left_index[_ind] + base_index = right_index[np.uintp(start)] + for num in range(start, end): + _num = np.uintp(num) + rindex = right_index[_num] + if rindex < base_index: + base_index = rindex + left_indices[_ind] = lindex + right_indices[_ind] = base_index + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_last_dual( + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, +): + """ + Get indices for a non equi join + """ + end = right_index.size + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + lindex = left_index[_ind] + base_index = right_index[np.uintp(start)] + for num in range(start, end): + _num = np.uintp(num) + rindex = right_index[_num] + if rindex > base_index: + base_index = rindex + left_indices[_ind] = lindex + right_indices[_ind] = base_index + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_increasing_keep_all_dual( + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + start_indices: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, +): + """ + Get indices for a non equi join + """ + end = right_index.size + for ind in prange(left_index.size): + _ind = np.uintp(ind) + start = starts[_ind] + indexer = start_indices[_ind] + lindex = left_index[_ind] + for num in range(start, end): + _num = np.uintp(num) + rindex = right_index[_num] + _indexer = np.uintp(indexer) + left_indices[_indexer] = lindex + right_indices[_indexer] = rindex + indexer += 1 + return left_indices, right_indices + + +@njit +def _numba_less_than(arr: np.ndarray, value: Any): + """ + Get earliest position in `arr` + where arr[i] <= `value` + """ + min_idx = 0 + max_idx = len(arr) + while min_idx < max_idx: + # to avoid overflow + mid_idx = min_idx + ((max_idx - min_idx) >> 1) + _mid_idx = np.uintp(mid_idx) + if arr[_mid_idx] < value: + min_idx = mid_idx + 1 + else: + max_idx = mid_idx + return min_idx + + +@njit(cache=True) +def _numba_non_equi_join_not_monotonic_keep_all_indices( + left_regions: np.ndarray, + right_regions: np.ndarray, + maxxes: np.ndarray, + lengths: np.ndarray, + sorted_array: np.ndarray, + positions_array: np.ndarray, + starts: np.ndarray, + load_factor: int, +): + """ + Get indices for non-equi join, + where the right regions are not monotonic + """ + # first pass - get actual length + length = left_regions.size + end = right_regions.size + end -= 1 + # add the last region + # no need to have this checked within an if-else statement + # in the for loop below + region = right_regions[np.uintp(end)] + sorted_array[0, 0] = region positions_array[0, 0] = end + # keep track of the maxxes array + # how many cells have actual values? maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - begin = 0 + total = 0 + l_booleans = np.zeros(length, dtype=np.bool_) for indexer in range(length - 1, -1, -1): - _indexer = np.uint64(indexer) - if not l_booleans[_indexer]: - continue + _indexer = np.uintp(indexer) start = starts[_indexer] for num in range(start, end): - _num = np.uint64(num) - region = right_regions[_num, 0] + _num = np.uintp(num) + region = right_regions[_num] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=region) - if posn == -1: + if region > arr[-1]: + # it is larger than the max in the maxxes array + # shove it into the last column posn = maxxes_counter - 1 - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) len_arr = lengths[posn_] - len_arr_ = np.uint64(len_arr) + len_arr_ = np.uintp(len_arr) sorted_array[len_arr_, posn_] = region + # we dont need to compute positions in the first run? positions_array[len_arr_, posn_] = num maxxes[posn_] = region lengths[posn_] += 1 else: + posn = _numba_less_than(arr=arr, value=region) sorted_array, positions_array, lengths, maxxes = ( _numba_sorted_array( sorted_array=sorted_array, @@ -1362,11 +1349,11 @@ def _numba_non_equi_join_not_monotonic_keep_all( ) ) r_count += 1 - posn_ = np.uint64(posn) - # have we reached the max size of this column? + posn_ = np.uintp(posn) + # have we exceeded the size of this column? # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( - r_count < right_index.size + r_count < right_regions.size ) if check: ( @@ -1386,70 +1373,143 @@ def _numba_non_equi_join_not_monotonic_keep_all( ) # now we do a binary search # for left region in right region - l_region = left_regions[_indexer, 0] + # 1. find the position in maxxes + # - this indicates which column in sorted_arrays contains our region + # 2. search in the specific region for the positions + # where left_region <= right_region + l_region = left_regions[_indexer] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=l_region) - if posn == -1: + if l_region > arr[-1]: end = start continue - posn_ = np.uint64(posn) + posn = _numba_less_than(arr=arr, value=l_region) + posn_ = np.uintp(posn) len_arr = lengths[posn_] arr = sorted_array[:len_arr, posn_] _posn = _numba_less_than(arr=arr, value=l_region) - l_index = left_index[_indexer] - for ind in range(_posn, len_arr): - ind_ = np.uint64(ind) - counter = 1 - # move along the columns - # and look for matches - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - begin_ = np.uint64(begin) + difference = len_arr - _posn + total += difference + # step into the remaining columns + for ind in range(posn + 1, maxxes_counter): + ind_ = np.uintp(ind) + len_arr = lengths[ind_] + total += len_arr + l_booleans[_indexer] = True + end = start + if total == 0: + return None, None, None + # second pass - fill arrays with indices + length = left_regions.size + end = right_regions.size + end -= 1 + region = right_regions[np.uintp(end)] + sorted_array[0, 0] = region + positions_array[0, 0] = end + maxxes_counter = 1 + maxxes[0] = region + lengths[0] = 1 + r_count = 0 + left_counts = np.zeros(length, dtype=np.intp) + left_indices = np.empty(length, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + begin = 0 + l_indexer = 0 + for indexer in range(length - 1, -1, -1): + _indexer = np.uintp(indexer) + if not l_booleans[_indexer]: + l_indexer += 1 + continue + start = starts[_indexer] + for num in range(start, end): + _num = np.uintp(num) + region = right_regions[_num] + arr = maxxes[:maxxes_counter] + if region > arr[-1]: + posn = maxxes_counter - 1 + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + len_arr_ = np.uintp(len_arr) + sorted_array[len_arr_, posn_] = region + positions_array[len_arr_, posn_] = num + maxxes[posn_] = region + lengths[posn_] += 1 + else: + posn = _numba_less_than(arr=arr, value=region) + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) + r_count += 1 + posn_ = np.uintp(posn) + # have we reached the max size of this column? + # do we need to trim and move data to other columns? + check = (lengths[posn_] == (load_factor * 2)) & ( + r_count < right_regions.size + ) + if check: + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) + # now we do a binary search + # for left region in right region + l_region = left_regions[_indexer] + arr = maxxes[:maxxes_counter] + left_indices[np.uintp(l_indexer)] = indexer + if l_region > arr[-1]: + end = start + l_indexer += 1 + continue + counter = 0 + posn = _numba_less_than(arr=arr, value=l_region) + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + arr = sorted_array[:len_arr, posn_] + _posn = _numba_less_than(arr=arr, value=l_region) + for ind in range(_posn, len_arr): + ind_ = np.uintp(ind) + begin_ = np.uintp(begin) r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - r_index = right_index[r_pos] - left_indices[begin_] = l_index - right_indices[begin_] = r_index + right_indices[begin_] = r_pos begin += 1 + counter += 1 for ind in range(posn + 1, maxxes_counter): - ind_ = np.uint64(ind) + ind_ = np.uintp(ind) len_arr = lengths[ind_] for num in range(len_arr): - _num = np.uint64(num) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - begin_ = np.uint64(begin) - left_indices[begin_] = l_index + _num = np.uintp(num) + begin_ = np.uintp(begin) r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - r_index = right_index[r_pos] - right_indices[begin_] = r_index + right_indices[begin_] = r_pos begin += 1 + counter += 1 + left_counts[l_indexer] = counter + left_indices[l_indexer] = indexer + l_indexer += 1 end = start - return left_indices, right_indices + return left_indices, right_indices, left_counts @njit(cache=True) -def _numba_non_equi_join_not_monotonic_keep_first( +def _numba_non_equi_join_not_monotonic_dual_keep_all( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, @@ -1462,40 +1522,48 @@ def _numba_non_equi_join_not_monotonic_keep_first( load_factor: int, ): """ - Get indices for non-equi join - first match + Get indices for non-equi join, + where the right regions are not monotonic """ - # first pass - get the actual length + # first pass - get actual length length = left_index.size end = right_index.size end -= 1 - region = right_regions[np.uint64(end), 0] + # add the last region + # no need to have this checked within an if-else statement + # in the for loop below + region = right_regions[np.uintp(end)] sorted_array[0, 0] = region positions_array[0, 0] = end + # keep track of the maxxes array + # how many cells have actual values? maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 total = 0 l_booleans = np.zeros(length, dtype=np.bool_) - r_indices = np.empty(length, dtype=np.intp) for indexer in range(length - 1, -1, -1): - _indexer = np.uint64(indexer) + _indexer = np.uintp(indexer) start = starts[_indexer] for num in range(start, end): - _num = np.uint64(num) - region = right_regions[_num, 0] + _num = np.uintp(num) + region = right_regions[_num] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=region) - if posn == -1: + if region > arr[-1]: + # it is larger than the max in the maxxes array + # shove it into the last column posn = maxxes_counter - 1 - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) len_arr = lengths[posn_] - len_arr_ = np.uint64(len_arr) + len_arr_ = np.uintp(len_arr) sorted_array[len_arr_, posn_] = region + # we dont need to compute positions in the first run? positions_array[len_arr_, posn_] = num maxxes[posn_] = region lengths[posn_] += 1 else: + posn = _numba_less_than(arr=arr, value=region) sorted_array, positions_array, lengths, maxxes = ( _numba_sorted_array( sorted_array=sorted_array, @@ -1508,7 +1576,7 @@ def _numba_non_equi_join_not_monotonic_keep_first( ) ) r_count += 1 - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) # have we exceeded the size of this column? # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( @@ -1530,93 +1598,140 @@ def _numba_non_equi_join_not_monotonic_keep_first( maxxes_counter=maxxes_counter, load_factor=load_factor, ) - l_region = left_regions[_indexer, 0] + # now we do a binary search + # for left region in right region + # 1. find the position in maxxes + # - this indicates which column in sorted_arrays contains our region + # 2. search in the specific region for the positions + # where left_region <= right_region + l_region = left_regions[_indexer] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=l_region) - if posn == -1: + if l_region > arr[-1]: end = start continue - posn_ = np.uint64(posn) + posn = _numba_less_than(arr=arr, value=l_region) + posn_ = np.uintp(posn) len_arr = lengths[posn_] arr = sorted_array[:len_arr, posn_] _posn = _numba_less_than(arr=arr, value=l_region) - matches = 0 - base_index = -1 - for ind in range(_posn, len_arr): - ind_ = np.uint64(ind) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - r_index = right_index[r_pos] - if matches == 0: - base_index = r_index - matches = 1 - elif r_index < base_index: - base_index = r_index + total += len_arr - _posn # step into the remaining columns for ind in range(posn + 1, maxxes_counter): - ind_ = np.uint64(ind) + ind_ = np.uintp(ind) len_arr = lengths[ind_] - # step into the rows for each column - for num in range(len_arr): - _num = np.uint64(num) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - r_index = right_index[r_pos] - if matches == 0: - base_index = r_index - matches = 1 - elif r_index < base_index: - base_index = r_index - if matches == 0: - end = start - continue - total += 1 + total += len_arr l_booleans[_indexer] = True - r_indices[_indexer] = base_index end = start if total == 0: return None, None # second pass - fill arrays with indices + length = left_index.size + end = right_index.size + end -= 1 + region = right_regions[np.uintp(end)] + sorted_array[0, 0] = region + positions_array[0, 0] = end + maxxes_counter = 1 + maxxes[0] = region + lengths[0] = 1 + r_count = 0 left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) - n = 0 - for ind in range(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: + begin = 0 + for indexer in range(length - 1, -1, -1): + _indexer = np.uintp(indexer) + if not l_booleans[_indexer]: continue - _n = np.uint64(n) - left_indices[_n] = left_index[_ind] - right_indices[_n] = r_indices[_ind] - n += 1 + start = starts[_indexer] + for num in range(start, end): + _num = np.uintp(num) + region = right_regions[_num] + arr = maxxes[:maxxes_counter] + if region > arr[-1]: + posn = maxxes_counter - 1 + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + len_arr_ = np.uintp(len_arr) + sorted_array[len_arr_, posn_] = region + positions_array[len_arr_, posn_] = num + maxxes[posn_] = region + lengths[posn_] += 1 + else: + posn = _numba_less_than(arr=arr, value=region) + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) + r_count += 1 + posn_ = np.uintp(posn) + # have we reached the max size of this column? + # do we need to trim and move data to other columns? + check = (lengths[posn_] == (load_factor * 2)) & ( + r_count < right_index.size + ) + if check: + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) + # now we do a binary search + # for left region in right region + l_region = left_regions[_indexer] + arr = maxxes[:maxxes_counter] + if l_region > arr[-1]: + end = start + continue + posn = _numba_less_than(arr=arr, value=l_region) + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + arr = sorted_array[:len_arr, posn_] + _posn = _numba_less_than(arr=arr, value=l_region) + l_index = left_index[_indexer] + for ind in range(_posn, len_arr): + ind_ = np.uintp(ind) + begin_ = np.uintp(begin) + r_pos = positions_array[ind_, posn_] + r_pos = np.uintp(r_pos) + r_index = right_index[r_pos] + left_indices[begin_] = l_index + right_indices[begin_] = r_index + begin += 1 + for ind in range(posn + 1, maxxes_counter): + ind_ = np.uintp(ind) + len_arr = lengths[ind_] + for num in range(len_arr): + _num = np.uintp(num) + begin_ = np.uintp(begin) + left_indices[begin_] = l_index + r_pos = positions_array[_num, ind_] + r_pos = np.uintp(r_pos) + r_index = right_index[r_pos] + right_indices[begin_] = r_index + begin += 1 + end = start return left_indices, right_indices @njit(cache=True) -def _numba_non_equi_join_not_monotonic_keep_last( +def _numba_non_equi_join_not_monotonic_dual_keep_first( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, @@ -1629,13 +1744,13 @@ def _numba_non_equi_join_not_monotonic_keep_last( load_factor: int, ): """ - Get indices for non-equi join - last match + Get indices for non-equi join - first match """ # first pass - get the actual length length = left_index.size end = right_index.size end -= 1 - region = right_regions[np.uint64(end), 0] + region = right_regions[np.uintp(end)] sorted_array[0, 0] = region positions_array[0, 0] = end maxxes_counter = 1 @@ -1646,23 +1761,23 @@ def _numba_non_equi_join_not_monotonic_keep_last( l_booleans = np.zeros(length, dtype=np.bool_) r_indices = np.empty(length, dtype=np.intp) for indexer in range(length - 1, -1, -1): - _indexer = np.uint64(indexer) + _indexer = np.uintp(indexer) start = starts[_indexer] for num in range(start, end): - _num = np.uint64(num) - region = right_regions[_num, 0] + _num = np.uintp(num) + region = right_regions[_num] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=region) - if posn == -1: + if region > arr[-1]: posn = maxxes_counter - 1 - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) len_arr = lengths[posn_] - len_arr_ = np.uint64(len_arr) + len_arr_ = np.uintp(len_arr) sorted_array[len_arr_, posn_] = region positions_array[len_arr_, posn_] = num maxxes[posn_] = region lengths[posn_] += 1 else: + posn = _numba_less_than(arr=arr, value=region) sorted_array, positions_array, lengths, maxxes = ( _numba_sorted_array( sorted_array=sorted_array, @@ -1675,7 +1790,7 @@ def _numba_non_equi_join_not_monotonic_keep_last( ) ) r_count += 1 - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) # have we exceeded the size of this column? # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( @@ -1697,70 +1812,36 @@ def _numba_non_equi_join_not_monotonic_keep_last( maxxes_counter=maxxes_counter, load_factor=load_factor, ) - l_region = left_regions[_indexer, 0] + l_region = left_regions[_indexer] arr = maxxes[:maxxes_counter] - posn = _numba_less_than(arr=arr, value=l_region) - if posn == -1: + if l_region > arr[-1]: end = start continue - posn_ = np.uint64(posn) + posn = _numba_less_than(arr=arr, value=l_region) + posn_ = np.uintp(posn) len_arr = lengths[posn_] arr = sorted_array[:len_arr, posn_] _posn = _numba_less_than(arr=arr, value=l_region) - matches = 0 base_index = -1 for ind in range(_posn, len_arr): - ind_ = np.uint64(ind) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue + ind_ = np.uintp(ind) r_pos = positions_array[ind_, posn_] - r_pos = np.uint64(r_pos) + r_pos = np.uintp(r_pos) r_index = right_index[r_pos] - if matches == 0: - base_index = r_index - matches = 1 - elif r_index > base_index: + if (base_index == -1) | (r_index < base_index): base_index = r_index # step into the remaining columns for ind in range(posn + 1, maxxes_counter): - ind_ = np.uint64(ind) + ind_ = np.uintp(ind) len_arr = lengths[ind_] # step into the rows for each column for num in range(len_arr): - _num = np.uint64(num) - counter = 1 - for loc in range(1, right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_indexer, loc_] - r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) - next_right = right_regions[r_pos, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue + _num = np.uintp(num) r_pos = positions_array[_num, ind_] - r_pos = np.uint64(r_pos) + r_pos = np.uintp(r_pos) r_index = right_index[r_pos] - if matches == 0: + if (base_index == -1) | (r_index < base_index): base_index = r_index - matches = 1 - elif r_index > base_index: - base_index = r_index - if matches == 0: - end = start - continue total += 1 l_booleans[_indexer] = True r_indices[_indexer] = base_index @@ -1772,565 +1853,149 @@ def _numba_non_equi_join_not_monotonic_keep_last( right_indices = np.empty(total, dtype=np.intp) n = 0 for ind in range(length): - _ind = np.uint64(ind) + _ind = np.uintp(ind) if not l_booleans[_ind]: continue - _n = np.uint64(n) + _n = np.uintp(n) left_indices[_n] = left_index[_ind] right_indices[_n] = r_indices[_ind] n += 1 return left_indices, right_indices -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_all( +@njit(cache=True) +def _numba_non_equi_join_not_monotonic_dual_keep_last( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, right_index: np.ndarray, + maxxes: np.ndarray, + lengths: np.ndarray, + sorted_array: np.ndarray, + positions_array: np.ndarray, starts: np.ndarray, - ends: np.ndarray, + load_factor: int, ): """ - Get indices for a non equi join. + Get indices for non-equi join - last match """ + # first pass - get the actual length length = left_index.size + end = right_index.size + end -= 1 + region = right_regions[np.uintp(end)] + sorted_array[0, 0] = region + positions_array[0, 0] = end + maxxes_counter = 1 + maxxes[0] = region + lengths[0] = 1 + r_count = 0 total = 0 l_booleans = np.zeros(length, dtype=np.bool_) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - total += 1 - l_booleans[_ind] = True - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in values - for ind in range(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - start = starts[_ind] - end = ends[_ind] - lindex = left_index[_ind] + r_indices = np.empty(length, dtype=np.intp) + for indexer in range(length - 1, -1, -1): + _indexer = np.uintp(indexer) + start = starts[_indexer] for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - _n = np.uint64(n) - left_indices[_n] = lindex - right_indices[_n] = rindex - n += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_first( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - ends: np.ndarray, -): - """ - Get indices for a non equi join - first match. - """ - length = left_index.size - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - r_indices = np.empty(length, dtype=np.intp) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - matches = 0 - base = -1 - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - if matches == 0: - base = rindex - matches = 1 - elif rindex < base: - base = rindex - if matches == 0: - continue - total += 1 - l_booleans[_ind] = True - r_indices[_ind] = base - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in values - for ind in prange(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - _n = np.uint64(n) - left_indices[_n] = left_index[_ind] - right_indices[_n] = r_indices[_ind] - n += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_last( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - ends: np.ndarray, -): - """ - Get indices for a non equi join - last match. - """ - length = left_index.size - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - r_indices = np.empty(length, dtype=np.intp) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - matches = 0 - base = -1 - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - if matches == 0: - base = rindex - matches = 1 - elif rindex > base: - base = rindex - if matches == 0: + _num = np.uintp(num) + region = right_regions[_num] + arr = maxxes[:maxxes_counter] + if region > arr[-1]: + posn = maxxes_counter - 1 + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + len_arr_ = np.uintp(len_arr) + sorted_array[len_arr_, posn_] = region + positions_array[len_arr_, posn_] = num + maxxes[posn_] = region + lengths[posn_] += 1 + else: + posn = _numba_less_than(arr=arr, value=region) + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) + r_count += 1 + posn_ = np.uintp(posn) + # have we exceeded the size of this column? + # do we need to trim and move data to other columns? + check = (lengths[posn_] == (load_factor * 2)) & ( + r_count < right_index.size + ) + if check: + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) + l_region = left_regions[_indexer] + arr = maxxes[:maxxes_counter] + posn = _numba_less_than(arr=arr, value=l_region) + if l_region > arr[-1]: + end = start continue + posn_ = np.uintp(posn) + len_arr = lengths[posn_] + arr = sorted_array[:len_arr, posn_] + _posn = _numba_less_than(arr=arr, value=l_region) + base_index = np.inf + for ind in range(_posn, len_arr): + ind_ = np.uintp(ind) + r_pos = positions_array[ind_, posn_] + r_pos = np.uintp(r_pos) + r_index = right_index[r_pos] + if (base_index == np.inf) | (r_index > base_index): + base_index = r_index + # step into the remaining columns + for ind in range(posn + 1, maxxes_counter): + ind_ = np.uintp(ind) + len_arr = lengths[ind_] + # step into the rows for each column + for num in range(len_arr): + _num = np.uintp(num) + r_pos = positions_array[_num, ind_] + r_pos = np.uintp(r_pos) + r_index = right_index[r_pos] + if (base_index == np.inf) | (r_index > base_index): + base_index = r_index total += 1 - l_booleans[_ind] = True - r_indices[_ind] = base + l_booleans[_indexer] = True + r_indices[_indexer] = base_index + end = start if total == 0: return None, None - n = 0 + # second pass - fill arrays with indices left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in values - for ind in prange(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - _n = np.uint64(n) - left_indices[_n] = left_index[_ind] - right_indices[_n] = r_indices[_ind] - n += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_first( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, -): - """ - Get indices for a non equi join - first match - """ - length = left_index.size - end = len(right_regions) - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - r_indices = np.empty(length, dtype=np.intp) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - matches = 0 - base = -1 - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - if matches == 0: - base = rindex - matches = 1 - elif rindex < base: - base = rindex - if matches == 0: - continue - total += 1 - l_booleans[_ind] = True - r_indices[_ind] = base - if total == 0: - return None, None n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in actual values - for ind in prange(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - _n = np.uint64(n) - left_indices[_n] = left_index[_ind] - right_indices[_n] = r_indices[_ind] - n += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_last( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, -): - """ - Get indices for a non equi join - last match. - """ - length = left_index.size - end = len(right_regions) - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - r_indices = np.empty(length, dtype=np.intp) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - matches = 0 - base = -1 - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - if matches == 0: - base = rindex - matches = 1 - elif rindex > base: - base = rindex - if matches == 0: - continue - total += 1 - l_booleans[_ind] = True - r_indices[_ind] = base - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in values - for ind in prange(length): - _ind = np.uint64(ind) + for ind in range(length): + _ind = np.uintp(ind) if not l_booleans[_ind]: continue - _n = np.uint64(n) + _n = np.uintp(n) left_indices[_n] = left_index[_ind] right_indices[_n] = r_indices[_ind] n += 1 return left_indices, right_indices -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_all( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, -): - """ - Get indices for a non equi join. - """ - length = left_index.size - end = len(right_regions) - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - # first pass - get actual length - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - total += 1 - l_booleans[_ind] = True - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - # second pass - fill in values - for ind in range(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - start = starts[_ind] - lindex = left_index[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - _n = np.uint64(n) - left_indices[_n] = lindex - right_indices[_n] = rindex - n += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_all_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - start_indices: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - end = right_index.size - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - indexer = start_indices[_ind] - lindex = left_index[_ind] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - _indexer = np.uint64(indexer) - left_indices[_indexer] = lindex - right_indices[_indexer] = rindex - indexer += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_first_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - end = right_index.size - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - lindex = left_index[_ind] - base_index = right_index[np.uint64(start)] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - if rindex < base_index: - base_index = rindex - left_indices[_ind] = lindex - right_indices[_ind] = base_index - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_last_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - end = right_index.size - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - lindex = left_index[_ind] - base_index = right_index[np.uint64(start)] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - if rindex > base_index: - base_index = rindex - left_indices[_ind] = lindex - right_indices[_ind] = base_index - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_all_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - ends: np.ndarray, - start_indices: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - indexer = start_indices[_ind] - lindex = left_index[_ind] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - _indexer = np.uint64(indexer) - left_indices[_indexer] = lindex - right_indices[_indexer] = rindex - indexer += 1 - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_first_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - ends: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - lindex = left_index[_ind] - base_index = right_index[np.uint64(start)] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - if rindex < base_index: - base_index = rindex - left_indices[_ind] = lindex - right_indices[_ind] = base_index - return left_indices, right_indices - - -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_decreasing_keep_last_dual( - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, - ends: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, -): - """ - Get indices for a dual non equi join - """ - for ind in prange(left_index.size): - _ind = np.uint64(ind) - start = starts[_ind] - end = ends[_ind] - lindex = left_index[_ind] - base_index = right_index[np.uint64(start)] - for num in range(start, end): - _num = np.uint64(num) - rindex = right_index[_num] - if rindex > base_index: - base_index = rindex - left_indices[_ind] = lindex - right_indices[_ind] = base_index - return left_indices, right_indices - - @njit def _numba_sorted_array( sorted_array: np.ndarray, @@ -2359,7 +2024,7 @@ def _numba_sorted_array( """ # the sorted array is an adaptation # of grantjenks' sortedcontainers - posn_ = np.uint64(posn) + posn_ = np.uintp(posn) len_arr = lengths[posn_] # grab the specific column that the region falls into arr = sorted_array[:len_arr, posn_] @@ -2370,17 +2035,17 @@ def _numba_sorted_array( # shift in this order to avoid issues with assignment override # which could create wrong values for ind in range(len_arr - 1, insort_posn - 1, -1): - ind_ = np.uint64(ind) - _ind = np.uint64(ind + 1) + ind_ = np.uintp(ind) + _ind = np.uintp(ind + 1) sorted_array[_ind, posn_] = sorted_array[ind_, posn_] positions_array[_ind, posn_] = positions_array[ind_, posn_] # now we can safely insert the region - insort = np.uint64(insort_posn) + insort = np.uintp(insort_posn) sorted_array[insort, posn_] = region positions_array[insort, posn_] = num # update the length and the maxxes arrays lengths[posn_] += 1 - maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + maxxes[posn_] = sorted_array[np.uintp(len_arr), posn_] return sorted_array, positions_array, lengths, maxxes @@ -2400,15 +2065,15 @@ def _expand_sorted_array( """ # shift from left+1 to right for pos in range(maxxes_counter - 1, posn, -1): - forward = np.uint64(pos + 1) - current = np.uint64(pos) + forward = np.uintp(pos + 1) + current = np.uintp(pos) sorted_array[:, forward] = sorted_array[:, current] positions_array[:, forward] = positions_array[:, current] maxxes[forward] = maxxes[current] lengths[forward] = lengths[current] # share half the load from left to left+1 - forward = np.uint64(posn + 1) - current = np.uint64(posn) + forward = np.uintp(posn + 1) + current = np.uintp(posn) maxxes[forward] = sorted_array[-1, current] lengths[forward] = load_factor sorted_array[:load_factor, forward] = sorted_array[load_factor:, current] @@ -2417,6 +2082,6 @@ def _expand_sorted_array( ] # update the length and maxxes arrays lengths[current] = load_factor - maxxes[current] = sorted_array[np.uint64(load_factor - 1), current] + maxxes[current] = sorted_array[np.uintp(load_factor - 1), current] maxxes_counter += 1 return sorted_array, positions_array, lengths, maxxes, maxxes_counter diff --git a/janitor/functions/conditional_join.py b/janitor/functions/conditional_join.py index c48e7b125..5598a0877 100644 --- a/janitor/functions/conditional_join.py +++ b/janitor/functions/conditional_join.py @@ -1,5 +1,7 @@ from __future__ import annotations +import itertools +import math import operator from typing import Any, Hashable, Literal, Optional, Union @@ -532,8 +534,6 @@ def _conditional_join_compute( df=df, right=right, conditions=conditions, keep=keep ) elif use_numba: - from janitor.functions._numba import _numba_single_non_equi_join - result = _numba_single_non_equi_join( left=df[left_on], right=right[right_on], @@ -554,8 +554,6 @@ def _conditional_join_compute( if result is None: result = np.array([], dtype=np.intp), np.array([], dtype=np.intp) - # return result - if return_matching_indices: return result @@ -681,8 +679,6 @@ def _multiple_conditional_join_eq( ) if use_numba: - from janitor.functions._numba import _numba_equi_join - eqs = None for left_on, right_on, op in conditions: if op == _JoinOperator.STRICTLY_EQUAL.value: @@ -775,7 +771,7 @@ def _multiple_conditional_join_eq( indices = _generate_indices(*indices, rest) - if not indices: + if indices is None: return None return _keep_output(keep, *indices) @@ -795,34 +791,51 @@ def _multiple_conditional_join_eq( return_ragged_arrays=return_ragged_arrays, ) + left_df = df[:] + right_df = right[:] eqs = [ (left_on, right_on) for left_on, right_on, op in conditions if op == _JoinOperator.STRICTLY_EQUAL.value ] + left_on, right_on = zip(*eqs) + left_on = list(set(left_on)) + right_on = list(set(right_on)) + any_nulls = left_df.loc[:, left_on].isna().any(axis=1) + if any_nulls.all(): + return None + if any_nulls.any(): + left_df = left_df.loc[~any_nulls] + any_nulls = right_df.loc[:, right_on].isna().any(axis=1) + if any_nulls.all(): + return None + if any_nulls.any(): + right_df = right_df.loc[~any_nulls] left_on, right_on = zip(*eqs) left_on = [*left_on] right_on = [*right_on] - left_index, right_index = _MergeOperation( - df, - right, + left_df, + right_df, left_on=left_on, right_on=right_on, sort=False, )._get_join_indexers() + if left_index is not None: + if not left_index.size: + return None + left_index = left_df.index[left_index] # patch based on updates in internal code # pandas/core/reshape/merge.py#L1692 # for pandas 2.2 - if left_index is None: - left_index = df.index._values - if right_index is None: - right_index = right.index._values - - if not left_index.size: - return None + elif left_index is None: + left_index = left_df.index._values + if right_index is not None: + right_index = right_df.index[right_index] + else: + right_index = right_df.index._values rest = [ (df[left_on], right[right_on], op) @@ -834,7 +847,7 @@ def _multiple_conditional_join_eq( return _keep_output(keep, left_index, right_index) indices = _generate_indices(left_index, right_index, rest) - if not indices: + if indices is None: return None return _keep_output(keep, *indices) @@ -856,11 +869,6 @@ def _multiple_conditional_join_le_lt( Returns a tuple of (df_index, right_index) """ if use_numba: - from janitor.functions._numba import ( - _numba_multiple_non_equi_join, - _numba_single_non_equi_join, - ) - gt_lt = [ condition for condition in conditions @@ -870,17 +878,27 @@ def _multiple_conditional_join_le_lt( conditions = [ condition for condition in conditions if condition not in gt_lt ] - if (len(gt_lt) > 1) and not conditions: - return _numba_multiple_non_equi_join(df, right, gt_lt, keep=keep) - if len(gt_lt) == 1: + if len(gt_lt) > 1: + first_two = [op for *_, op in gt_lt[:2]] + range_join_ops = itertools.product( + less_than_join_types, greater_than_join_types + ) + range_join_ops = map(set, range_join_ops) + is_range_join = set(first_two) in range_join_ops + if is_range_join and (first_two[0] in less_than_join_types): + gt_lt = [gt_lt[1], gt_lt[0], *gt_lt[2:]] + if not conditions: + return _numba_multiple_non_equi_join( + df, right, gt_lt, keep=keep, is_range_join=is_range_join + ) + indices = _numba_multiple_non_equi_join( + df, right, gt_lt, keep="all", is_range_join=False + ) + else: left_on, right_on, op = gt_lt[0] indices = _numba_single_non_equi_join( df[left_on], right[right_on], op, keep="all" ) - else: - indices = _numba_multiple_non_equi_join( - df, right, gt_lt, keep="all" - ) if indices is None: return None else: @@ -913,29 +931,23 @@ def _multiple_conditional_join_le_lt( # the aim of this for loop is to see if there is # the possibility of a range join, and if there is, # then use the optimised path - le_lt = None - ge_gt = None - # keep the first match for le_lt or ge_gt - for condition in conditions: - *_, op = condition - if op in less_than_join_types: - if le_lt: - continue - le_lt = condition - elif op in greater_than_join_types: - if ge_gt: - continue - ge_gt = condition - if le_lt and ge_gt: - break + first_two = [op for *_, op in conditions[:2]] + range_join_ops = itertools.product( + less_than_join_types, greater_than_join_types + ) + range_join_ops = map(set, range_join_ops) + is_range_join = set(first_two) in range_join_ops # optimised path - if le_lt and ge_gt: + if is_range_join: + if first_two[0] in less_than_join_types: + le_lt, ge_gt = conditions[:2] + else: + ge_gt, le_lt = conditions[:2] conditions = [ condition for condition in conditions if condition not in (ge_gt, le_lt) ] - if conditions: _keep = None return_ragged_arrays = False @@ -960,7 +972,7 @@ def _multiple_conditional_join_le_lt( return_ragged_arrays=return_ragged_arrays, right_is_sorted=right_is_sorted, ) - if not indices: + if indices is None: return None if _keep or (return_ragged_arrays & isinstance(indices[1], list)): return indices @@ -968,17 +980,17 @@ def _multiple_conditional_join_le_lt( # no optimised path # blow up the rows and prune else: - if le_lt: - conditions = [ - condition for condition in conditions if condition != le_lt - ] - left_on, right_on, op = le_lt - else: - conditions = [ - condition for condition in conditions if condition != ge_gt - ] - left_on, right_on, op = ge_gt - + lt_or_gt = None + for condition in conditions: + if condition[-1] in less_than_join_types.union( + greater_than_join_types + ): + lt_or_gt = condition + break + conditions = [ + condition for condition in conditions if condition != lt_or_gt + ] + left_on, right_on, op = lt_or_gt indices = _generic_func_cond_join( left=df[left_on], right=right[right_on], @@ -986,17 +998,15 @@ def _multiple_conditional_join_le_lt( multiple_conditions=False, keep="all", ) - # return indices - if not indices: + if indices is None: return None if conditions: conditions = ( (df[left_on], right[right_on], op) for left_on, right_on, op in conditions ) - indices = _generate_indices(*indices, conditions) - if not indices: + if indices is None: return None return _keep_output(keep, *indices) @@ -1038,7 +1048,7 @@ def _range_indices( any_nulls = right[right_on].isna() if any_nulls.any(): right_c = right_c[~any_nulls] - any_nulls = right_c.hasnans + any_nulls = any_nulls.any() outcome = _generic_func_cond_join( left=left_c, @@ -1120,7 +1130,6 @@ def _range_indices( right_index = [right_index[start:end] for start, end in zip(starts, ends)] if return_ragged_arrays & fastpath: return left_index, right_index - # return right_index right_index = np.concatenate(right_index) left_index = left_index.repeat(repeater) if fastpath: @@ -1527,3 +1536,1117 @@ def construct_1d_array_from_inferred_fill_value( arr = sanitize_array(value, Index(range(1)), copy=False) taker = -1 * np.ones(length, dtype=np.intp) return take_nd(arr, taker) + + +def _numba_single_non_equi_join( + left: pd.Series, right: pd.Series, op: str, keep: str +) -> tuple[np.ndarray, np.ndarray]: + """Return matching indices for single non-equi join.""" + if op == "!=": + return _generic_func_cond_join( + left=left, right=right, op=op, multiple_conditions=False, keep=keep + ) + from janitor.functions import _numba + + outcome = _generic_func_cond_join( + left=left, right=right, op=op, multiple_conditions=True, keep="all" + ) + if outcome is None: + return None + left_index, right_index, starts = outcome + if op in greater_than_join_types: + right_index = right_index[::-1] + starts = right_index.size - starts + if keep == "first": + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return ( + _numba._numba_non_equi_join_monotonic_increasing_keep_first_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + ) + ) + if keep == "last": + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_non_equi_join_monotonic_increasing_keep_last_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + ) + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (right_index.size - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + left_indices = np.empty(indices, dtype=np.intp) + right_indices = np.empty(indices, dtype=np.intp) + return _numba._numba_non_equi_join_monotonic_increasing_keep_all_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + start_indices=start_indices, + ) + + +def _numba_multiple_non_equi_join( + df: pd.DataFrame, + right: pd.DataFrame, + gt_lt: list, + keep: str, + is_range_join: bool, +) -> tuple[np.ndarray, np.ndarray]: + """ + # https://www.scitepress.org/papers/2018/68268/68268.pdf + An alternative to the _range_indices algorithm + and more generalised - it covers any pair of non equi joins + in >, >=, <, <=. + Returns a tuple of left and right indices. + """ + # implementation is based on the algorithm described in this paper - + # https://www.scitepress.org/papers/2018/68268/68268.pdf + + # summary: + # get regions for first and second conditions in the pair + # (l_col1, r_col1, op1), (l_col2, r_col2, op2) + # the idea is that r_col1 should always be ahead of the + # appropriate value from lcol1; same applies to l_col2 & r_col2. + # if the operator is in less than join types + # the l_col should be in ascending order + # if in greater than join types, l_col should be + # in descending order + # Example : + # df1: + # id value_1 + # 0 1 2 + # 1 1 5 + # 2 1 7 + # 3 2 1 + # 4 2 3 + # 5 3 4 + # + # + # df2: + # id value_2A value_2B + # 0 1 0 1 + # 1 1 3 5 + # 2 1 7 9 + # 3 1 12 15 + # 4 2 0 1 + # 5 2 2 4 + # 6 2 3 6 + # 7 3 1 3 + # + # + # ('value_1', 'value_2A','>'), ('value_1', 'value_2B', '<') + # for the first pair, since op is greater than + # 'value_1' is sorted in descending order + # our pairing should be : + # value source region number + # 12 value_2A 0 + # 7 value_2A 1 + # 7 value_1 2 + # 5 value_1 2 + # 4 value_1 2 + # 3 value_2A 2 + # 3 value_2A 2 + # 3 value_1 3 + # 2 value_2A 3 + # 2 value_1 4 + # 1 value_2A 4 + # 1 value_1 5 + # 0 value_2A 5 + # 0 value_2A 5 + # + # note that 7 for value_2A is not matched with 7 of value_1 + # because it is >, not >=, hence the different region numbers + # looking at the output above, we can safely discard regions 0 and 1 + # since they do not have any matches with value_1 + # for the second pair, since op is <, value_1 is sorted + # in ascending order, and our pairing should be: + # value source region number + # 1 value_2B 0 + # 1 value_2B 1 + # 1 value_1 2 + # 2 value_1 2 + # 3 value_2B 2 + # 3 value_1 3 + # 4 value_2B 3 + # 4 value_1 4 + # 5 value_2B 4 + # 5 value_1 5 + # 6 value_2B 5 + # 7 value_1 6 + # 9 value_2B 6 + # 15 value_2B 6 + # + # from the above we can safely discard regions 0 and 1, since there are + # no matches with value_1 ... note that the index for regions 0 and 1 + # coincide with the index for region 5 values in value_2A(0, 0); + # as such those regions will be discarded. + # Similarly, the index for regions 0 and 1 of value_2A(12, 7) + # coincide with the index for regions 6 for value_2B(9, 15); + # these will be discarded as well. + # let's create a table of the regions, paired with the index + # + # + # value_1 : + ############################################### + # index--> 2 1 5 4 0 3 + # pair1--> 2 2 2 3 4 5 + # pair2--> 6 5 4 3 2 2 + ############################################### + # + # + # value_2A, value_2B + ############################################## + # index --> 1 6 5 7 + # pair1 --> 2 2 3 4 + # pair2 --> 4 5 3 2 + ############################################## + # + # To find matching indices, the regions from value_1 must be less than + # or equal to the regions in value_2A/2B. + # pair1 <= pair1 and pair2 <= pair2 + # Starting from the highest region in value_1 + # 5 in pair1 is not less than any in value_2A/2B, so we discard + # 4 in pair1 is matched to 4 in pair1 of value_2A/2B + # we look at the equivalent value in pair2 for 4, which is 2 + # 2 matches 2 in pair 2, so we have a match -> (0, 7) + # 3 in pair 1 from value_1 matches 3 and 4 in pair1 for value_2A/2B + # next we compare the equivalent value from pair2, which is 3 + # 3 matches only 3 in value_2A/2B, so our only match is -> (4, 5) + # next is 2 (we have 3 2s in value_1 for pair1) + # they all match 2, 2, 3, 4 in pair1 of value_2A/2B + # compare the first equivalent in pair2 -> 4 + # 4 matches only 4, 5 in pair2 of value_2A/2B + # ->(5, 1), (5, 6) + # the next equivalent is -> 5 + # 5 matches only 5 in pair2 of value_2A/2B + # -> (1, 6) + # the last equivalent is -> 6 + # 6 has no match in pair2 of value_2A/2B, so we discard + # our final matching indices for the left and right pairs + ######################################################### + # left_index right_index + # 0 7 + # 4 5 + # 5 1 + # 5 6 + # 1 6 + ######################################################## + # and if we index the dataframes, we should get the output below: + ################################# + # value_1 value_2A value_2B + # 0 2 1 3 + # 1 5 3 6 + # 2 3 2 4 + # 3 4 3 5 + # 4 4 3 6 + ################################ + mapping = {">": 0, ">=": 1, "<": 2, "<=": 3} + first, second, *rest = gt_lt + if rest: + left_on, right_on, _ = zip(*rest) + left_on = list(set(left_on)) + any_nulls = df.loc[:, left_on].isna().any(axis=1) + if any_nulls.all(): + return None + if any_nulls.any(): + df = df.loc[~any_nulls] + right_on = list(set(right_on)) + any_nulls = right.loc[:, right_on].isna().any(axis=1) + if any_nulls.all(): + return None + if any_nulls.any(): + right = right.loc[~any_nulls] + if right[first[1]].is_monotonic_increasing: + right_is_sorted = True + else: + right_is_sorted = False + right = right.sort_values([first[1], second[1]], ignore_index=False) + if is_range_join & right[second[1]].is_monotonic_increasing: + return _range_join_sorted( + first=first, + second=second, + df=df, + right=right, + keep=keep, + gt_lt=gt_lt, + mapping=mapping, + rest=rest, + right_is_sorted=right_is_sorted, + ) + + if not df[first[0]].is_monotonic_increasing: + df = df.sort_values(first[0], ignore_index=False) + left_index = df.index._values + right_index = right.index._values + l_index = pd.RangeIndex(start=0, stop=left_index.size) + df.index = l_index + r_index = pd.RangeIndex(start=0, stop=right_index.size) + right.index = r_index + shape = (left_index.size, 2) + # use the l_booleans and r_booleans + # to track rows that have complete matches + left_regions = np.empty(shape=shape, dtype=np.intp, order="F") + l_booleans = np.zeros(left_index.size, dtype=np.intp) + shape = (right_index.size, 2) + right_regions = np.empty(shape=shape, dtype=np.intp, order="F") + r_booleans = np.zeros(right_index.size, dtype=np.intp) + for position, (left_column, right_column, op) in enumerate( + (first, second) + ): + outcome = _generic_func_cond_join( + left=df[left_column], + right=right[right_column], + op=op, + multiple_conditions=True, + keep="all", + ) + if outcome is None: + return None + left_indexer, right_indexer, search_indices = outcome + if op in greater_than_join_types: + search_indices = right_indexer.size - search_indices + right_indexer = right_indexer[::-1] + r_region = np.zeros(right_indexer.size, dtype=np.intp) + r_region[search_indices] = 1 + r_region[0] -= 1 + r_region = r_region.cumsum() + left_regions[left_indexer, position] = r_region[search_indices] + l_booleans[left_indexer] += 1 + right_regions[right_indexer, position] = r_region + r_booleans[right_indexer[search_indices.min() :]] += 1 + r_region = None + search_indices = None + booleans = l_booleans == 2 + if not booleans.any(): + return None + if not booleans.all(): + left_regions = left_regions[booleans] + left_index = left_index[booleans] + l_index = l_index[booleans] + booleans = r_booleans == 2 + if not booleans.any(): + return None + if not booleans.all(): + right_regions = right_regions[booleans] + right_index = right_index[booleans] + r_index = r_index[booleans] + l_booleans = None + r_booleans = None + if gt_lt[0][-1] in greater_than_join_types: + left_regions = left_regions[::-1] + left_index = left_index[::-1] + l_index = l_index[::-1] + right_regions = right_regions[::-1] + right_index = right_index[::-1] + r_index = r_index[::-1] + starts = right_regions[:, 0].searchsorted(left_regions[:, 0]) + booleans = starts < len(right_regions) + if not booleans.any(): + return None + if not booleans.all(): + starts = starts[booleans] + left_regions = left_regions[booleans] + left_index = left_index[booleans] + l_index = l_index[booleans] + + rest = tuple( + ( + df.loc[l_index, left_on].to_numpy(), + right.loc[r_index, right_on].to_numpy(), + mapping[op], + ) + for left_on, right_on, op in rest + ) + + # a range join will have > and < + # > and < will be in opposite directions + # if the first condition is > + # and the second condition is < + # and the second condition is monotonic increasing + # then this kicks in + if pd.Index(right_regions[:, 1]).is_monotonic_decreasing: + return _range_join_right_region_monotonic_decreasing( + left_regions=left_regions, + right_regions=right_regions, + left_index=left_index, + right_index=right_index, + keep=keep, + rest=rest, + starts=starts, + gt_lt=gt_lt, + right_is_sorted=right_is_sorted, + ) + if pd.Index(right_regions[:, 1]).is_monotonic_increasing: + return _numba_non_equi_join_monotonic_increasing( + left_regions=left_regions, + right_regions=right_regions, + left_index=left_index, + right_index=right_index, + keep=keep, + gt_lt=gt_lt, + rest=rest, + starts=starts, + ) + from janitor.functions import _numba + + # logic here is based on grantjenks' sortedcontainers + # https://github.com/grantjenks/python-sortedcontainers + load_factor = 1_000 + width = load_factor * 2 + length = math.ceil(right_index.size / load_factor) + # maintain a sorted array of the regions + sorted_array = np.empty( + (width, length), dtype=right_regions.dtype, order="F" + ) + # keep track of the positions of each region + # within the sorted array + positions_array = np.empty( + (width, length), dtype=right_regions.dtype, order="F" + ) + # keep track of the max value per column + maxxes = np.empty(length, dtype=np.intp) + # keep track of the length of actual data for each column + lengths = np.empty(length, dtype=np.intp) + if (keep == "all") & (len(gt_lt) == 2): + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_dual_keep_all( + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) + ) + elif (keep == "first") & (len(gt_lt) == 2): + + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_dual_keep_first( + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) + ) + elif (keep == "last") & (len(gt_lt) == 2): + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_dual_keep_last( + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) + ) + + elif keep == "all": + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_keep_all( + tupled=rest, + left_index=left_index, + right_index=right_index, + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + load_factor=load_factor, + starts=starts, + ) + ) + elif keep == "first": + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_keep_first( + tupled=rest, + left_index=left_index, + right_index=right_index, + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + load_factor=load_factor, + starts=starts, + ) + ) + else: + left_indices, right_indices = ( + _numba._numba_non_equi_join_not_monotonic_keep_last( + tupled=rest, + left_index=left_index, + right_index=right_index, + left_regions=left_regions[:, 1], + right_regions=right_regions[:, 1], + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + load_factor=load_factor, + starts=starts, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices + + +def _range_join_sorted( + first: tuple, + second: tuple, + df: pd.DataFrame, + right: pd.DataFrame, + keep: str, + gt_lt: tuple, + mapping: dict, + rest: list, + right_is_sorted: bool, +) -> tuple: + """ + Get indices for a range join + if both columns from the right + are monotonically sorted + """ + from janitor.functions import _numba + + left_on, right_on, op = first + outcome = _generic_func_cond_join( + left=df[left_on], + right=right[right_on], + op=op, + multiple_conditions=True, + keep="all", + ) + if not outcome: + return None + left_index, right_index, ends = outcome + left_on, right_on, op = second + outcome = _generic_func_cond_join( + left=df.loc[left_index, left_on], + right=right.loc[right_index, right_on], + op=op, + multiple_conditions=True, + keep="all", + ) + if outcome is None: + return None + left_c, right_index, starts = outcome + if left_c.size < left_index.size: + keep_rows = pd.Index(left_c).get_indexer(left_index) != -1 + ends = ends[keep_rows] + left_index = left_c + # no point searching within (a, b) + # if a == b + # since range(a, b) yields none + keep_rows = starts < ends + if not keep_rows.any(): + return None + if not keep_rows.all(): + left_index = left_index[keep_rows] + starts = starts[keep_rows] + ends = ends[keep_rows] + repeater = ends - starts + if (len(gt_lt) == 2) & (repeater.max() == 1): + # no point running a comparison op + # if the width is all 1 + # this also implies that the intervals + # do not overlap on the right side + return left_index, right_index[starts] + if (len(gt_lt) == 2) & (keep == "first") & right_is_sorted: + return left_index, right_index[starts] + if (len(gt_lt) == 2) & (keep == "first"): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_range_join_sorted_keep_first_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + ) + if (len(gt_lt) == 2) & (keep == "last") & right_is_sorted: + return left_index, right_index[ends - 1] + if (len(gt_lt) == 2) & (keep == "last"): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_range_join_sorted_keep_last_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + ) + if (len(gt_lt) == 2) & (keep == "all"): + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (ends - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + left_indices = np.empty(indices, dtype=np.intp) + right_indices = np.empty(indices, dtype=np.intp) + return _numba._range_join_sorted_dual_keep_all( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + start_indices=start_indices, + ) + + rest = tuple( + ( + df.loc[left_index, left_on].to_numpy(), + right.loc[right_index, right_on].to_numpy(), + mapping[op], + ) + for left_on, right_on, op in rest + ) + + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (ends - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + indices = np.ones(indices, dtype=np.bool_) + if keep == "all": + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_all( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + elif keep == "first": + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_first( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + else: + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_last( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices + + +def _range_join_right_region_monotonic_decreasing( + left_regions: np.ndarray, + right_regions: np.ndarray, + left_index: np.ndarray, + right_index: np.ndarray, + keep: str, + gt_lt: tuple, + rest: tuple, + starts: np.ndarray, + right_is_sorted: bool, +): + """ + Get indices for a range join, + if the second column in the right region + is monotonic decreasing + """ + from janitor.functions import _numba + + ends = right_regions[::-1, 1].searchsorted(left_regions[:, 1]) + ends = len(right_regions) - ends + booleans = starts < ends + if not booleans.any(): + return None + if not booleans.all(): + starts = starts[booleans] + left_regions = left_regions[booleans] + left_index = left_index[booleans] + ends = ends[booleans] + rest = tuple( + (left_arr[booleans], right_arr, op) + for left_arr, right_arr, op in rest + ) + booleans = None + if (keep == "first") & (len(gt_lt) == 2) & right_is_sorted: + return left_index, right_index[ends - 1] + if (keep == "first") & (len(gt_lt) == 2): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_range_join_sorted_keep_first_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + ) + if (keep == "last") & (len(gt_lt) == 2) & right_is_sorted: + return left_index, right_index[starts] + if (keep == "last") & (len(gt_lt) == 2): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_range_join_sorted_keep_last_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + ) + if (keep == "all") & (len(gt_lt) == 2): + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (ends - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + left_indices = np.empty(indices, dtype=np.intp) + right_indices = np.empty(indices, dtype=np.intp) + return _numba._range_join_sorted_dual_keep_all( + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + left_indices=left_indices, + right_indices=right_indices, + start_indices=start_indices, + ) + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (ends - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + indices = np.ones(indices, dtype=np.bool_) + if keep == "all": + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_all( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + elif keep == "first": + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_first( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + + else: + left_indices, right_indices = ( + _numba._range_join_sorted_multiple_keep_last( + rest, + left_index=left_index, + starts=starts, + ends=ends, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices + + +def _numba_non_equi_join_monotonic_increasing( + left_regions: np.ndarray, + right_regions: np.ndarray, + left_index: np.ndarray, + right_index: np.ndarray, + keep: str, + gt_lt: tuple, + rest: tuple, + starts: np.ndarray, +): + """ + Get indices for a non equi join, + if the second column in the right region + is monotonic increasing + """ + from janitor.functions import _numba + + _starts = right_regions[:, 1].searchsorted(left_regions[:, 1]) + starts = np.where(starts > _starts, starts, _starts) + booleans = starts == right_index.size + if booleans.all(): + return None + if booleans.any(): + booleans = ~booleans + left_index = left_index[booleans] + starts = starts[booleans] + left_regions = left_regions[booleans] + rest = tuple( + (left_arr[booleans], right_arr, op) + for left_arr, right_arr, op in rest + ) + if (keep == "first") & (len(gt_lt) == 2): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return ( + _numba._numba_non_equi_join_monotonic_increasing_keep_first_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + ) + ) + if (keep == "last") & (len(gt_lt) == 2): + left_indices = np.empty(left_index.size, dtype=np.intp) + right_indices = np.empty(left_index.size, dtype=np.intp) + return _numba._numba_non_equi_join_monotonic_increasing_keep_last_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + ) + if (keep == "all") & (len(gt_lt) == 2): + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (right_index.size - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + left_indices = np.empty(indices, dtype=np.intp) + right_indices = np.empty(indices, dtype=np.intp) + return _numba._numba_non_equi_join_monotonic_increasing_keep_all_dual( + left_index=left_index, + right_index=right_index, + starts=starts, + left_indices=left_indices, + right_indices=right_indices, + start_indices=start_indices, + ) + start_indices = np.empty(left_index.size, dtype=np.intp) + start_indices[0] = 0 + indices = (right_index.size - starts).cumsum() + start_indices[1:] = indices[:-1] + indices = indices[-1] + indices = np.ones(indices, dtype=np.bool_) + if keep == "first": + left_indices, right_indices = ( + _numba._numba_non_equi_join_monotonic_increasing_keep_first( + rest, + left_index=left_index, + starts=starts, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + elif keep == "last": + left_indices, right_indices = ( + _numba._numba_non_equi_join_monotonic_increasing_keep_last( + rest, + left_index=left_index, + starts=starts, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + + else: + left_indices, right_indices = ( + _numba._numba_non_equi_join_monotonic_increasing_keep_all( + rest, + left_index=left_index, + starts=starts, + right_index=right_index, + indices=indices, + start_indices=start_indices, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices + + +def _numba_equi_join( + df: pd.DataFrame, + right: pd.DataFrame, + eqs: tuple, + ge_gt: tuple, + le_lt: tuple, +) -> Union[tuple[np.ndarray, np.ndarray], None]: + """ + Compute indices when an equi join is present. + """ + # the logic is to delay searching for actual matches + # while reducing the search space + # to get the smallest possible search area + # this serves as an alternative to pandas' hash join + # and in some cases, + # usually for many to many joins, + # can offer significant performance improvements. + # it relies on binary searches, within the groups, + # and relies on the fact that sorting ensures the first + # two columns from the right dataframe are in ascending order + # per group - this gives us the opportunity to + # only do a linear search, within the groups, + # for the last column (if any) + # (the third column is applicable only for range joins) + # Example : + # df1: + # id value_1 + # 0 1 2 + # 1 1 5 + # 2 1 7 + # 3 2 1 + # 4 2 3 + # 5 3 4 + # + # + # df2: + # id value_2A value_2B + # 0 1 0 1 + # 1 1 3 5 + # 2 1 7 9 + # 3 1 12 15 + # 4 2 0 1 + # 5 2 2 4 + # 6 2 3 6 + # 7 3 1 3 + # + # + # join condition -> + # ('id', 'id', '==') & + # ('value_1', 'value_2A','>') & + # ('value_1', 'value_2B', '<') + # + # + # note how for df2, id and value_2A + # are sorted per group + # the third column (relevant for range join) + # may or may not be sorted per group + # (the group is determined by the values of the id column) + # and as such, we do a linear search in that space, per group + # + # first we get the slice boundaries based on id -> ('id', 'id', '==') + # value start end + # 1 0 4 + # 1 0 4 + # 1 0 4 + # 2 4 7 + # 2 4 7 + # 3 7 8 + # + # next step is to get the slice end boundaries, + # based on the greater than condition + # -> ('value_1', 'value_2A', '>') + # the search will be within each boundary + # so for the first row, value_1 is 2 + # the boundary search will be between 0, 4 + # for the last row, value_1 is 4 + # and its boundary search will be between 7, 8 + # since value_2A is sorted per group, + # a binary search is employed + # value start end value_1 new_end + # 1 0 4 2 1 + # 1 0 4 5 2 + # 1 0 4 7 2 + # 2 4 7 1 4 + # 2 4 7 3 6 + # 3 7 8 4 8 + # + # next step is to get the start boundaries, + # based on the less than condition + # -> ('value_1', 'value_2B', '<') + # note that we have new end boundaries, + # and as such, our boundaries will use that + # so for the first row, value_1 is 2 + # the boundary search will be between 0, 1 + # for the 5th row, value_1 is 3 + # and its boundary search will be between 4, 6 + # for value_2B, which is the third column + # sinc we are not sure whether it is sorted or not, + # a cumulative max array is used, + # to get the earliest possible slice start + # value start end value_1 new_start new_end + # 1 0 4 2 -1 1 + # 1 0 4 5 -1 2 + # 1 0 4 7 -1 2 + # 2 4 7 1 -1 5 + # 2 4 7 3 5 6 + # 3 7 8 4 -1 8 + # + # if there are no matches, boundary is reported as -1 + # from above, we can see that our search space + # is limited to just 5, 6 + # we can then search for actual matches + # id value_1 id value_2A value_2B + # 2 3 2 2 4 + # + from janitor.functions import _numba + + left_column, right_column, _ = eqs + # steal some perf here within the binary search + # search for uniques + # and later index them with left_positions + left_positions, left_arr = df[left_column].factorize(sort=False) + right_arr = right[right_column]._values + left_index = df.index._values + right_index = right.index._values + slice_starts = right_arr.searchsorted(left_arr, side="left") + slice_starts = slice_starts[left_positions] + slice_ends = right_arr.searchsorted(left_arr, side="right") + slice_ends = slice_ends[left_positions] + # check if there is a search space + # this also lets us know if there are equi matches + keep_rows = slice_starts < slice_ends + if not keep_rows.any(): + return None + if not keep_rows.all(): + left_index = left_index[keep_rows] + slice_starts = slice_starts[keep_rows] + slice_ends = slice_ends[keep_rows] + + ge_arr1 = None + ge_arr2 = None + ge_strict = None + if ge_gt: + left_column, right_column, op = ge_gt + ge_arr1 = df.loc[left_index, left_column]._values + ge_arr2 = right[right_column]._values + ge_arr1, ge_arr2 = _convert_to_numpy(left=ge_arr1, right=ge_arr2) + ge_strict = True if op == ">" else False + + le_arr1 = None + le_arr2 = None + le_strict = None + if le_lt: + left_column, right_column, op = le_lt + le_arr1 = df.loc[left_index, left_column]._values + le_arr2 = right[right_column]._values + le_arr1, le_arr2 = _convert_to_numpy(left=le_arr1, right=le_arr2) + le_strict = True if op == "<" else False + + if le_lt and ge_gt: + group = right.groupby(eqs[1])[le_lt[1]] + # is the last column (le_lt) monotonic increasing? + # fast path if it is + all_monotonic_increasing = all( + arr.is_monotonic_increasing for _, arr in group + ) + if all_monotonic_increasing: + cum_max_arr = le_arr2[:] + else: + cum_max_arr = group.cummax()._values + if is_extension_array_dtype(cum_max_arr): + array_dtype = cum_max_arr.dtype.numpy_dtype + cum_max_arr = cum_max_arr.astype(array_dtype) + if is_datetime64_dtype(cum_max_arr): + cum_max_arr = cum_max_arr.view(np.int64) + + left_index, right_index = _numba._numba_equi_join_range_join( + left_index, + right_index, + slice_starts, + slice_ends, + ge_arr1, + ge_arr2, + ge_strict, + le_arr1, + le_arr2, + le_strict, + all_monotonic_increasing, + cum_max_arr, + ) + + elif le_lt: + left_index, right_index = _numba._numba_equi_le_join( + left_index, + right_index, + slice_starts, + slice_ends, + le_arr1, + le_arr2, + le_strict, + ) + + else: + left_index, right_index = _numba._numba_equi_ge_join( + left_index, + right_index, + slice_starts, + slice_ends, + ge_arr1, + ge_arr2, + ge_strict, + ) + + if left_index is None: + return None + + return left_index, right_index + + +def _convert_to_numpy( + left: np.ndarray, right: np.ndarray +) -> tuple[np.ndarray, np.ndarray]: + """ + Ensure array is a numpy array. + """ + if is_extension_array_dtype(left): + array_dtype = left.dtype.numpy_dtype + left = left.astype(array_dtype) + right = right.astype(array_dtype) + if is_datetime64_dtype(left): + left = left.view(np.int64) + right = right.view(np.int64) + return left, right diff --git a/janitor/functions/utils.py b/janitor/functions/utils.py index 9be1edc2b..ecba2ca19 100644 --- a/janitor/functions/utils.py +++ b/janitor/functions/utils.py @@ -487,7 +487,6 @@ def _greater_than_indices( if not outcome: return None left, right, left_index, right_index, right_is_sorted, any_nulls = outcome - search_indices = right.searchsorted(left, side="right") # if any of the positions in `search_indices` # is equal to 0 (less than 1), it implies that diff --git a/tests/functions/test_conditional_join.py b/tests/functions/test_conditional_join.py index d456fa08e..e119a65f8 100644 --- a/tests/functions/test_conditional_join.py +++ b/tests/functions/test_conditional_join.py @@ -2355,7 +2355,12 @@ def test_dual_conditions_ne_and_eq(df, right): filters = ["A", "E", "Integers", "Dates"] expected = ( df[["A", "E"]] - .merge(right[["Integers", "Dates"]], left_on="E", right_on="Dates") + .dropna(subset="E") + .merge( + right[["Integers", "Dates"]].dropna(subset="Dates"), + left_on="E", + right_on="Dates", + ) .loc[lambda df: df.A.ne(df.Integers)] .sort_values(filters, ignore_index=True) ) @@ -2924,8 +2929,13 @@ def test_ge_eq_and_le_numbers_variant(df, right): columns = ["B", "A", "E", "Floats", "Integers", "Dates"] expected = ( - df.merge( - right, left_on="B", right_on="Floats", how="inner", sort=False + df.dropna(subset="B") + .merge( + right.dropna(subset="Floats"), + left_on="B", + right_on="Floats", + how="inner", + sort=False, ) .loc[lambda df: df.A.ge(df.Integers) & df.E.le(df.Dates), columns] .sort_values(columns, ignore_index=True) @@ -3249,8 +3259,13 @@ def test_ge_eq_and_le_numbers_variant_numba(df, right): columns = ["B", "A", "E", "Floats", "Integers", "Dates"] expected = ( - df.merge( - right, left_on="B", right_on="Floats", how="inner", sort=False + df.dropna(subset="B") + .merge( + right.dropna(subset="Floats"), + left_on="B", + right_on="Floats", + how="inner", + sort=False, ) .loc[lambda df: df.A.lt(df.Integers) & df.E.gt(df.Dates), columns] .sort_values(columns, ignore_index=True) @@ -3598,10 +3613,8 @@ def test_ge_eq_and_le_datess_numba_indices(df, right): expected = pd.Index(expected) actual, _ = get_join_indices( - df[["B", "A", "E"]].dropna(subset=["E"]), - right[["Floats", "Integers", "Dates", "Numeric"]].dropna( - subset=["Dates"] - ), + df[["B", "A", "E"]], + right[["Floats", "Integers", "Dates", "Numeric"]], [ ("A", "Integers", "<"), ("E", "Dates", "=="), @@ -3634,10 +3647,9 @@ def test_eq_indices(df, right): ) expected = pd.Index(expected) - # get rid of the dropna in future PR actual, _ = get_join_indices( - df.dropna(subset=["E"]), - right.dropna(subset=["Dates"]), + df, + right, [ ("E", "Dates", "=="), ], @@ -3676,15 +3688,11 @@ def test_eq_indices_ragged_arrays(df, right): ], return_ragged_arrays=True, ) - if isinstance(ractual, list): - ractual = [right.index[arr] for arr in ractual] - ractual = np.concatenate(ractual) - ractual = pd.Index(ractual) - lactual = pd.Index(lactual) - if isinstance(ractual, list): + if isinstance(ractual, (slice, list)): ractual = [right.index[arr] for arr in ractual] lengths = [len(arr) for arr in ractual] ractual = np.concatenate(ractual) + ractual = pd.Index(ractual) lactual = pd.Index(lactual).repeat(lengths) ractual = pd.Index(ractual) lactual = pd.Index(lactual) @@ -4046,9 +4054,10 @@ def test_range_indices_ragged_arrays(df, right): def test_ge_eq_and_le_datess_indices(df, right): """compare join indices for multiple conditions.""" expected = ( - df.reset_index() + df.dropna(subset="E") + .reset_index() .merge( - right, + right.dropna(subset="Dates"), left_on="E", right_on="Dates", how="inner",