in datafusion/expr/src/window_state.rs [542:661]
fn calculate_index_of_row<const SIDE: bool, const SEARCH_SIDE: bool>(
&mut self,
range_columns: &[ArrayRef],
idx: usize,
delta: Option<&ScalarValue>,
length: usize,
) -> Result<usize> {
let delta = if let Some(delta) = delta {
if let ScalarValue::UInt64(Some(value)) = delta {
*value as usize
} else {
return internal_err!(
"Unexpectedly got a non-UInt64 value in a GROUPS mode window frame"
);
}
} else {
0
};
let mut group_start = 0;
let last_group = self.group_end_indices.back_mut();
if let Some((group_row, group_end)) = last_group {
if *group_end < length {
let new_group_row = get_row_at_idx(range_columns, *group_end)?;
// If last/current group keys are the same, we extend the last group:
if new_group_row.eq(group_row) {
// Update the end boundary of the group (search right boundary):
*group_end = search_in_slice(
range_columns,
group_row,
check_equality,
*group_end,
length,
)?;
}
}
// Start searching from the last group boundary:
group_start = *group_end;
}
// Advance groups until `idx` is inside a group:
while idx >= group_start {
let group_row = get_row_at_idx(range_columns, group_start)?;
// Find end boundary of the group (search right boundary):
let group_end = search_in_slice(
range_columns,
&group_row,
check_equality,
group_start,
length,
)?;
self.group_end_indices.push_back((group_row, group_end));
group_start = group_end;
}
// Update the group index `idx` belongs to:
while self.current_group_idx < self.group_end_indices.len()
&& idx >= self.group_end_indices[self.current_group_idx].1
{
self.current_group_idx += 1;
}
// Find the group index of the frame boundary:
let group_idx = if SEARCH_SIDE {
if self.current_group_idx > delta {
self.current_group_idx - delta
} else {
0
}
} else {
self.current_group_idx + delta
};
// Extend `group_start_indices` until it includes at least `group_idx`:
while self.group_end_indices.len() <= group_idx && group_start < length {
let group_row = get_row_at_idx(range_columns, group_start)?;
// Find end boundary of the group (search right boundary):
let group_end = search_in_slice(
range_columns,
&group_row,
check_equality,
group_start,
length,
)?;
self.group_end_indices.push_back((group_row, group_end));
group_start = group_end;
}
// Calculate index of the group boundary:
Ok(match (SIDE, SEARCH_SIDE) {
// Window frame start:
(true, _) => {
let group_idx = std::cmp::min(group_idx, self.group_end_indices.len());
if group_idx > 0 {
// Normally, start at the boundary of the previous group.
self.group_end_indices[group_idx - 1].1
} else {
// If previous group is out of the table, start at zero.
0
}
}
// Window frame end, PRECEDING n
(false, true) => {
if self.current_group_idx >= delta {
let group_idx = self.current_group_idx - delta;
self.group_end_indices[group_idx].1
} else {
// Group is out of the table, therefore end at zero.
0
}
}
// Window frame end, FOLLOWING n
(false, false) => {
let group_idx = std::cmp::min(
self.current_group_idx + delta,
self.group_end_indices.len() - 1,
);
self.group_end_indices[group_idx].1
}
})
}