in datafusion/functions-window/src/nth_value.rs [365:464]
fn evaluate(
&mut self,
values: &[ArrayRef],
range: &Range<usize>,
) -> Result<ScalarValue> {
if let Some(ref result) = self.state.finalized_result {
Ok(result.clone())
} else {
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
let arr = &values[0];
let n_range = range.end - range.start;
if n_range == 0 {
// We produce None if the window is empty.
return ScalarValue::try_from(arr.data_type());
}
// If null values exist and need to be ignored, extract the valid indices.
let valid_indices = if self.ignore_nulls {
// Calculate valid indices, inside the window frame boundaries.
let slice = arr.slice(range.start, n_range);
match slice.nulls() {
Some(nulls) => {
let valid_indices = nulls
.valid_indices()
.map(|idx| {
// Add offset `range.start` to valid indices, to point correct index in the original arr.
idx + range.start
})
.collect::<Vec<_>>();
if valid_indices.is_empty() {
// If all values are null, return directly.
return ScalarValue::try_from(arr.data_type());
}
Some(valid_indices)
}
None => None,
}
} else {
None
};
match self.state.kind {
NthValueKind::First => {
if let Some(valid_indices) = &valid_indices {
ScalarValue::try_from_array(arr, valid_indices[0])
} else {
ScalarValue::try_from_array(arr, range.start)
}
}
NthValueKind::Last => {
if let Some(valid_indices) = &valid_indices {
ScalarValue::try_from_array(
arr,
valid_indices[valid_indices.len() - 1],
)
} else {
ScalarValue::try_from_array(arr, range.end - 1)
}
}
NthValueKind::Nth => {
match self.n.cmp(&0) {
Ordering::Greater => {
// SQL indices are not 0-based.
let index = (self.n as usize) - 1;
if index >= n_range {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if let Some(valid_indices) = valid_indices {
if index >= valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
ScalarValue::try_from_array(&arr, valid_indices[index])
} else {
ScalarValue::try_from_array(arr, range.start + index)
}
}
Ordering::Less => {
let reverse_index = (-self.n) as usize;
if n_range < reverse_index {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if let Some(valid_indices) = valid_indices {
if reverse_index > valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
let new_index =
valid_indices[valid_indices.len() - reverse_index];
ScalarValue::try_from_array(&arr, new_index)
} else {
ScalarValue::try_from_array(
arr,
range.start + n_range - reverse_index,
)
}
}
Ordering::Equal => ScalarValue::try_from(arr.data_type()),
}
}
}
}
}