fn evaluate()

in datafusion/functions-window/src/nth_value.rs [365:464]


    fn evaluate(
        &mut self,
        values: &[ArrayRef],
        range: &Range<usize>,
    ) -> Result<ScalarValue> {
        if let Some(ref result) = self.state.finalized_result {
            Ok(result.clone())
        } else {
            // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
            let arr = &values[0];
            let n_range = range.end - range.start;
            if n_range == 0 {
                // We produce None if the window is empty.
                return ScalarValue::try_from(arr.data_type());
            }

            // If null values exist and need to be ignored, extract the valid indices.
            let valid_indices = if self.ignore_nulls {
                // Calculate valid indices, inside the window frame boundaries.
                let slice = arr.slice(range.start, n_range);
                match slice.nulls() {
                    Some(nulls) => {
                        let valid_indices = nulls
                            .valid_indices()
                            .map(|idx| {
                                // Add offset `range.start` to valid indices, to point correct index in the original arr.
                                idx + range.start
                            })
                            .collect::<Vec<_>>();
                        if valid_indices.is_empty() {
                            // If all values are null, return directly.
                            return ScalarValue::try_from(arr.data_type());
                        }
                        Some(valid_indices)
                    }
                    None => None,
                }
            } else {
                None
            };
            match self.state.kind {
                NthValueKind::First => {
                    if let Some(valid_indices) = &valid_indices {
                        ScalarValue::try_from_array(arr, valid_indices[0])
                    } else {
                        ScalarValue::try_from_array(arr, range.start)
                    }
                }
                NthValueKind::Last => {
                    if let Some(valid_indices) = &valid_indices {
                        ScalarValue::try_from_array(
                            arr,
                            valid_indices[valid_indices.len() - 1],
                        )
                    } else {
                        ScalarValue::try_from_array(arr, range.end - 1)
                    }
                }
                NthValueKind::Nth => {
                    match self.n.cmp(&0) {
                        Ordering::Greater => {
                            // SQL indices are not 0-based.
                            let index = (self.n as usize) - 1;
                            if index >= n_range {
                                // Outside the range, return NULL:
                                ScalarValue::try_from(arr.data_type())
                            } else if let Some(valid_indices) = valid_indices {
                                if index >= valid_indices.len() {
                                    return ScalarValue::try_from(arr.data_type());
                                }
                                ScalarValue::try_from_array(&arr, valid_indices[index])
                            } else {
                                ScalarValue::try_from_array(arr, range.start + index)
                            }
                        }
                        Ordering::Less => {
                            let reverse_index = (-self.n) as usize;
                            if n_range < reverse_index {
                                // Outside the range, return NULL:
                                ScalarValue::try_from(arr.data_type())
                            } else if let Some(valid_indices) = valid_indices {
                                if reverse_index > valid_indices.len() {
                                    return ScalarValue::try_from(arr.data_type());
                                }
                                let new_index =
                                    valid_indices[valid_indices.len() - reverse_index];
                                ScalarValue::try_from_array(&arr, new_index)
                            } else {
                                ScalarValue::try_from_array(
                                    arr,
                                    range.start + n_range - reverse_index,
                                )
                            }
                        }
                        Ordering::Equal => ScalarValue::try_from(arr.data_type()),
                    }
                }
            }
        }
    }