in datafusion/physical-plan/src/joins/sort_merge_join.rs [1814:2005]
fn freeze_streamed(&mut self) -> Result<()> {
for chunk in self.streamed_batch.output_indices.iter_mut() {
// The row indices of joined streamed batch
let left_indices = chunk.streamed_indices.finish();
if left_indices.is_empty() {
continue;
}
let mut left_columns = self
.streamed_batch
.batch
.columns()
.iter()
.map(|column| take(column, &left_indices, None))
.collect::<Result<Vec<_>, ArrowError>>()?;
// The row indices of joined buffered batch
let right_indices: UInt64Array = chunk.buffered_indices.finish();
let mut right_columns = if matches!(self.join_type, JoinType::LeftMark) {
vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
} else if matches!(
self.join_type,
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightAnti
) {
vec![]
} else if let Some(buffered_idx) = chunk.buffered_batch_idx {
fetch_right_columns_by_idxs(
&self.buffered_data,
buffered_idx,
&right_indices,
)?
} else {
// If buffered batch none, meaning it is null joined batch.
// We need to create null arrays for buffered columns to join with streamed rows.
create_unmatched_columns(
self.join_type,
&self.buffered_schema,
right_indices.len(),
)
};
// Prepare the columns we apply join filter on later.
// Only for joined rows between streamed and buffered.
let filter_columns = if chunk.buffered_batch_idx.is_some() {
if !matches!(self.join_type, JoinType::Right) {
if matches!(
self.join_type,
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark
) {
let right_cols = fetch_right_columns_by_idxs(
&self.buffered_data,
chunk.buffered_batch_idx.unwrap(),
&right_indices,
)?;
get_filter_column(&self.filter, &left_columns, &right_cols)
} else if matches!(self.join_type, JoinType::RightAnti) {
let right_cols = fetch_right_columns_by_idxs(
&self.buffered_data,
chunk.buffered_batch_idx.unwrap(),
&right_indices,
)?;
get_filter_column(&self.filter, &right_cols, &left_columns)
} else {
get_filter_column(&self.filter, &left_columns, &right_columns)
}
} else {
get_filter_column(&self.filter, &right_columns, &left_columns)
}
} else {
// This chunk is totally for null joined rows (outer join), we don't need to apply join filter.
// Any join filter applied only on either streamed or buffered side will be pushed already.
vec![]
};
let columns = if !matches!(self.join_type, JoinType::Right) {
left_columns.extend(right_columns);
left_columns
} else {
right_columns.extend(left_columns);
right_columns
};
let output_batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?;
// Apply join filter if any
if !filter_columns.is_empty() {
if let Some(f) = &self.filter {
// Construct batch with only filter columns
let filter_batch =
RecordBatch::try_new(Arc::clone(f.schema()), filter_columns)?;
let filter_result = f
.expression()
.evaluate(&filter_batch)?
.into_array(filter_batch.num_rows())?;
// The boolean selection mask of the join filter result
let pre_mask =
datafusion_common::cast::as_boolean_array(&filter_result)?;
// If there are nulls in join filter result, exclude them from selecting
// the rows to output.
let mask = if pre_mask.null_count() > 0 {
compute::prep_null_mask_filter(
datafusion_common::cast::as_boolean_array(&filter_result)?,
)
} else {
pre_mask.clone()
};
// Push the filtered batch which contains rows passing join filter to the output
if matches!(
self.join_type,
JoinType::Left
| JoinType::LeftSemi
| JoinType::Right
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::Full
) {
self.staging_output_record_batches
.batches
.push(output_batch);
} else {
let filtered_batch = filter_record_batch(&output_batch, &mask)?;
self.staging_output_record_batches
.batches
.push(filtered_batch);
}
if !matches!(self.join_type, JoinType::Full) {
self.staging_output_record_batches.filter_mask.extend(&mask);
} else {
self.staging_output_record_batches
.filter_mask
.extend(pre_mask);
}
self.staging_output_record_batches
.row_indices
.extend(&left_indices);
self.staging_output_record_batches.batch_ids.resize(
self.staging_output_record_batches.batch_ids.len()
+ left_indices.len(),
self.streamed_batch_counter.load(Relaxed),
);
// For outer joins, we need to push the null joined rows to the output if
// all joined rows are failed on the join filter.
// I.e., if all rows joined from a streamed row are failed with the join filter,
// we need to join it with nulls as buffered side.
if matches!(self.join_type, JoinType::Full) {
let buffered_batch = &mut self.buffered_data.batches
[chunk.buffered_batch_idx.unwrap()];
for i in 0..pre_mask.len() {
// If the buffered row is not joined with streamed side,
// skip it.
if right_indices.is_null(i) {
continue;
}
let buffered_index = right_indices.value(i);
buffered_batch.join_filter_not_matched_map.insert(
buffered_index,
*buffered_batch
.join_filter_not_matched_map
.get(&buffered_index)
.unwrap_or(&true)
&& !pre_mask.value(i),
);
}
}
} else {
self.staging_output_record_batches
.batches
.push(output_batch);
}
} else {
self.staging_output_record_batches
.batches
.push(output_batch);
}
}
self.streamed_batch.output_indices.clear();
Ok(())
}