fn evaluate_stateful()

in datafusion/physical-expr/src/window/built_in.rs [186:270]


    fn evaluate_stateful(
        &self,
        partition_batches: &PartitionBatches,
        window_agg_state: &mut PartitionWindowAggStates,
    ) -> Result<()> {
        let field = self.expr.field()?;
        let out_type = field.data_type();
        let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
        for (partition_row, partition_batch_state) in partition_batches.iter() {
            let window_state =
                if let Some(window_state) = window_agg_state.get_mut(partition_row) {
                    window_state
                } else {
                    let evaluator = self.expr.create_evaluator()?;
                    window_agg_state
                        .entry(partition_row.clone())
                        .or_insert(WindowState {
                            state: WindowAggState::new(out_type)?,
                            window_fn: WindowFn::Builtin(evaluator),
                        })
                };
            let evaluator = match &mut window_state.window_fn {
                WindowFn::Builtin(evaluator) => evaluator,
                _ => unreachable!(),
            };
            let state = &mut window_state.state;

            let batch_ref = &partition_batch_state.record_batch;
            let mut values = self.evaluate_args(batch_ref)?;
            let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
                get_orderby_values(self.order_by_columns(batch_ref)?)
            } else {
                vec![]
            };
            let n_args = values.len();
            values.extend(order_bys);
            let order_bys_ref = &values[n_args..];

            // We iterate on each row to perform a running calculation.
            let record_batch = &partition_batch_state.record_batch;
            let num_rows = record_batch.num_rows();
            let mut row_wise_results: Vec<ScalarValue> = vec![];
            for idx in state.last_calculated_index..num_rows {
                let frame_range = if evaluator.uses_window_frame() {
                    state
                        .window_frame_ctx
                        .get_or_insert_with(|| {
                            WindowFrameContext::new(
                                self.window_frame.clone(),
                                sort_options.clone(),
                            )
                        })
                        .calculate_range(
                            order_bys_ref,
                            // Start search from the last range
                            &state.window_frame_range,
                            num_rows,
                            idx,
                        )
                } else {
                    evaluator.get_range(idx, num_rows)
                }?;

                // Exit if the range extends all the way:
                if frame_range.end == num_rows && !partition_batch_state.is_end {
                    break;
                }
                // Update last range
                state.window_frame_range = frame_range;
                row_wise_results
                    .push(evaluator.evaluate(&values, &state.window_frame_range)?);
            }
            let out_col = if row_wise_results.is_empty() {
                new_empty_array(out_type)
            } else {
                ScalarValue::iter_to_array(row_wise_results.into_iter())?
            };

            state.update(&out_col, partition_batch_state)?;
            if self.window_frame.start_bound.is_unbounded() {
                evaluator.memoize(state)?;
            }
        }
        Ok(())
    }