in datafusion/physical-plan/src/aggregates/row_hash.rs [645:814]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
loop {
match &self.exec_state {
ExecutionState::ReadingInput => 'reading_input: {
match ready!(self.input.poll_next_unpin(cx)) {
// New batch to aggregate in partial aggregation operator
Some(Ok(batch)) if self.mode == AggregateMode::Partial => {
let timer = elapsed_compute.timer();
let input_rows = batch.num_rows();
// Do the grouping
self.group_aggregate_batch(batch)?;
self.update_skip_aggregation_probe(input_rows);
// If we can begin emitting rows, do so,
// otherwise keep consuming input
assert!(!self.input_done);
// If the number of group values equals or exceeds the soft limit,
// emit all groups and switch to producing output
if self.hit_soft_group_limit() {
timer.done();
self.set_input_done_and_produce_output()?;
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
if let Some(to_emit) = self.group_ordering.emit_to() {
timer.done();
if let Some(batch) = self.emit(to_emit, false)? {
self.exec_state =
ExecutionState::ProducingOutput(batch);
};
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
self.emit_early_if_necessary()?;
self.switch_to_skip_aggregation()?;
timer.done();
}
// New batch to aggregate in terminal aggregation operator
// (Final/FinalPartitioned/Single/SinglePartitioned)
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
// Make sure we have enough capacity for `batch`, otherwise spill
self.spill_previous_if_necessary(&batch)?;
// Do the grouping
self.group_aggregate_batch(batch)?;
// If we can begin emitting rows, do so,
// otherwise keep consuming input
assert!(!self.input_done);
// If the number of group values equals or exceeds the soft limit,
// emit all groups and switch to producing output
if self.hit_soft_group_limit() {
timer.done();
self.set_input_done_and_produce_output()?;
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
if let Some(to_emit) = self.group_ordering.emit_to() {
timer.done();
if let Some(batch) = self.emit(to_emit, false)? {
self.exec_state =
ExecutionState::ProducingOutput(batch);
};
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
timer.done();
}
// Found error from input stream
Some(Err(e)) => {
// inner had error, return to caller
return Poll::Ready(Some(Err(e)));
}
// Found end from input stream
None => {
// inner is done, emit all rows and switch to producing output
self.set_input_done_and_produce_output()?;
}
}
}
ExecutionState::SkippingAggregation => {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let _timer = elapsed_compute.timer();
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
probe.record_skipped(&batch);
}
let states = self.transform_to_states(batch)?;
return Poll::Ready(Some(Ok(
states.record_output(&self.baseline_metrics)
)));
}
Some(Err(e)) => {
// inner had error, return to caller
return Poll::Ready(Some(Err(e)));
}
None => {
// inner is done, switching to `Done` state
self.exec_state = ExecutionState::Done;
}
}
}
ExecutionState::ProducingOutput(batch) => {
// slice off a part of the batch, if needed
let output_batch;
let size = self.batch_size;
(self.exec_state, output_batch) = if batch.num_rows() <= size {
(
if self.input_done {
ExecutionState::Done
}
// In Partial aggregation, we also need to check
// if we should trigger partial skipping
else if self.mode == AggregateMode::Partial
&& self.should_skip_aggregation()
{
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
},
batch.clone(),
)
} else {
// output first batch_size rows
let size = self.batch_size;
let num_remaining = batch.num_rows() - size;
let remaining = batch.slice(size, num_remaining);
let output = batch.slice(0, size);
(ExecutionState::ProducingOutput(remaining), output)
};
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handled separately
debug_assert!(output_batch.num_rows() > 0);
return Poll::Ready(Some(Ok(
output_batch.record_output(&self.baseline_metrics)
)));
}
ExecutionState::Done => {
// release the memory reservation since sending back output batch itself needs
// some memory reservation, so make some room for it.
self.clear_all();
let _ = self.update_memory_reservation();
return Poll::Ready(None);
}
}
}
}