in datafusion/physical-expr/src/window/built_in.rs [186:270]
fn evaluate_stateful(
&self,
partition_batches: &PartitionBatches,
window_agg_state: &mut PartitionWindowAggStates,
) -> Result<()> {
let field = self.expr.field()?;
let out_type = field.data_type();
let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
for (partition_row, partition_batch_state) in partition_batches.iter() {
let window_state =
if let Some(window_state) = window_agg_state.get_mut(partition_row) {
window_state
} else {
let evaluator = self.expr.create_evaluator()?;
window_agg_state
.entry(partition_row.clone())
.or_insert(WindowState {
state: WindowAggState::new(out_type)?,
window_fn: WindowFn::Builtin(evaluator),
})
};
let evaluator = match &mut window_state.window_fn {
WindowFn::Builtin(evaluator) => evaluator,
_ => unreachable!(),
};
let state = &mut window_state.state;
let batch_ref = &partition_batch_state.record_batch;
let mut values = self.evaluate_args(batch_ref)?;
let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
get_orderby_values(self.order_by_columns(batch_ref)?)
} else {
vec![]
};
let n_args = values.len();
values.extend(order_bys);
let order_bys_ref = &values[n_args..];
// We iterate on each row to perform a running calculation.
let record_batch = &partition_batch_state.record_batch;
let num_rows = record_batch.num_rows();
let mut row_wise_results: Vec<ScalarValue> = vec![];
for idx in state.last_calculated_index..num_rows {
let frame_range = if evaluator.uses_window_frame() {
state
.window_frame_ctx
.get_or_insert_with(|| {
WindowFrameContext::new(
self.window_frame.clone(),
sort_options.clone(),
)
})
.calculate_range(
order_bys_ref,
// Start search from the last range
&state.window_frame_range,
num_rows,
idx,
)
} else {
evaluator.get_range(idx, num_rows)
}?;
// Exit if the range extends all the way:
if frame_range.end == num_rows && !partition_batch_state.is_end {
break;
}
// Update last range
state.window_frame_range = frame_range;
row_wise_results
.push(evaluator.evaluate(&values, &state.window_frame_range)?);
}
let out_col = if row_wise_results.is_empty() {
new_empty_array(out_type)
} else {
ScalarValue::iter_to_array(row_wise_results.into_iter())?
};
state.update(&out_col, partition_batch_state)?;
if self.window_frame.start_bound.is_unbounded() {
evaluator.memoize(state)?;
}
}
Ok(())
}