fn freeze_streamed()

in datafusion/physical-plan/src/joins/sort_merge_join.rs [1814:2005]


    fn freeze_streamed(&mut self) -> Result<()> {
        for chunk in self.streamed_batch.output_indices.iter_mut() {
            // The row indices of joined streamed batch
            let left_indices = chunk.streamed_indices.finish();

            if left_indices.is_empty() {
                continue;
            }

            let mut left_columns = self
                .streamed_batch
                .batch
                .columns()
                .iter()
                .map(|column| take(column, &left_indices, None))
                .collect::<Result<Vec<_>, ArrowError>>()?;

            // The row indices of joined buffered batch
            let right_indices: UInt64Array = chunk.buffered_indices.finish();
            let mut right_columns = if matches!(self.join_type, JoinType::LeftMark) {
                vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
            } else if matches!(
                self.join_type,
                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightAnti
            ) {
                vec![]
            } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
                fetch_right_columns_by_idxs(
                    &self.buffered_data,
                    buffered_idx,
                    &right_indices,
                )?
            } else {
                // If buffered batch none, meaning it is null joined batch.
                // We need to create null arrays for buffered columns to join with streamed rows.
                create_unmatched_columns(
                    self.join_type,
                    &self.buffered_schema,
                    right_indices.len(),
                )
            };

            // Prepare the columns we apply join filter on later.
            // Only for joined rows between streamed and buffered.
            let filter_columns = if chunk.buffered_batch_idx.is_some() {
                if !matches!(self.join_type, JoinType::Right) {
                    if matches!(
                        self.join_type,
                        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark
                    ) {
                        let right_cols = fetch_right_columns_by_idxs(
                            &self.buffered_data,
                            chunk.buffered_batch_idx.unwrap(),
                            &right_indices,
                        )?;

                        get_filter_column(&self.filter, &left_columns, &right_cols)
                    } else if matches!(self.join_type, JoinType::RightAnti) {
                        let right_cols = fetch_right_columns_by_idxs(
                            &self.buffered_data,
                            chunk.buffered_batch_idx.unwrap(),
                            &right_indices,
                        )?;

                        get_filter_column(&self.filter, &right_cols, &left_columns)
                    } else {
                        get_filter_column(&self.filter, &left_columns, &right_columns)
                    }
                } else {
                    get_filter_column(&self.filter, &right_columns, &left_columns)
                }
            } else {
                // This chunk is totally for null joined rows (outer join), we don't need to apply join filter.
                // Any join filter applied only on either streamed or buffered side will be pushed already.
                vec![]
            };

            let columns = if !matches!(self.join_type, JoinType::Right) {
                left_columns.extend(right_columns);
                left_columns
            } else {
                right_columns.extend(left_columns);
                right_columns
            };

            let output_batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?;
            // Apply join filter if any
            if !filter_columns.is_empty() {
                if let Some(f) = &self.filter {
                    // Construct batch with only filter columns
                    let filter_batch =
                        RecordBatch::try_new(Arc::clone(f.schema()), filter_columns)?;

                    let filter_result = f
                        .expression()
                        .evaluate(&filter_batch)?
                        .into_array(filter_batch.num_rows())?;

                    // The boolean selection mask of the join filter result
                    let pre_mask =
                        datafusion_common::cast::as_boolean_array(&filter_result)?;

                    // If there are nulls in join filter result, exclude them from selecting
                    // the rows to output.
                    let mask = if pre_mask.null_count() > 0 {
                        compute::prep_null_mask_filter(
                            datafusion_common::cast::as_boolean_array(&filter_result)?,
                        )
                    } else {
                        pre_mask.clone()
                    };

                    // Push the filtered batch which contains rows passing join filter to the output
                    if matches!(
                        self.join_type,
                        JoinType::Left
                            | JoinType::LeftSemi
                            | JoinType::Right
                            | JoinType::LeftAnti
                            | JoinType::RightAnti
                            | JoinType::LeftMark
                            | JoinType::Full
                    ) {
                        self.staging_output_record_batches
                            .batches
                            .push(output_batch);
                    } else {
                        let filtered_batch = filter_record_batch(&output_batch, &mask)?;
                        self.staging_output_record_batches
                            .batches
                            .push(filtered_batch);
                    }

                    if !matches!(self.join_type, JoinType::Full) {
                        self.staging_output_record_batches.filter_mask.extend(&mask);
                    } else {
                        self.staging_output_record_batches
                            .filter_mask
                            .extend(pre_mask);
                    }
                    self.staging_output_record_batches
                        .row_indices
                        .extend(&left_indices);
                    self.staging_output_record_batches.batch_ids.resize(
                        self.staging_output_record_batches.batch_ids.len()
                            + left_indices.len(),
                        self.streamed_batch_counter.load(Relaxed),
                    );

                    // For outer joins, we need to push the null joined rows to the output if
                    // all joined rows are failed on the join filter.
                    // I.e., if all rows joined from a streamed row are failed with the join filter,
                    // we need to join it with nulls as buffered side.
                    if matches!(self.join_type, JoinType::Full) {
                        let buffered_batch = &mut self.buffered_data.batches
                            [chunk.buffered_batch_idx.unwrap()];

                        for i in 0..pre_mask.len() {
                            // If the buffered row is not joined with streamed side,
                            // skip it.
                            if right_indices.is_null(i) {
                                continue;
                            }

                            let buffered_index = right_indices.value(i);

                            buffered_batch.join_filter_not_matched_map.insert(
                                buffered_index,
                                *buffered_batch
                                    .join_filter_not_matched_map
                                    .get(&buffered_index)
                                    .unwrap_or(&true)
                                    && !pre_mask.value(i),
                            );
                        }
                    }
                } else {
                    self.staging_output_record_batches
                        .batches
                        .push(output_batch);
                }
            } else {
                self.staging_output_record_batches
                    .batches
                    .push(output_batch);
            }
        }

        self.streamed_batch.output_indices.clear();

        Ok(())
    }