in native/core/src/execution/planner.rs [1453:1592]
fn parse_join_parameters(
&self,
inputs: &mut Vec<Arc<GlobalRef>>,
children: &[Operator],
left_join_keys: &[Expr],
right_join_keys: &[Expr],
join_type: i32,
condition: &Option<Expr>,
partition_count: usize,
) -> Result<(JoinParameters, Vec<ScanExec>), ExecutionError> {
assert!(children.len() == 2);
let (mut left_scans, left) = self.create_plan(&children[0], inputs, partition_count)?;
let (mut right_scans, right) = self.create_plan(&children[1], inputs, partition_count)?;
left_scans.append(&mut right_scans);
let left_join_exprs: Vec<_> = left_join_keys
.iter()
.map(|expr| self.create_expr(expr, left.schema()))
.collect::<Result<Vec<_>, _>>()?;
let right_join_exprs: Vec<_> = right_join_keys
.iter()
.map(|expr| self.create_expr(expr, right.schema()))
.collect::<Result<Vec<_>, _>>()?;
let join_on = left_join_exprs
.into_iter()
.zip(right_join_exprs)
.collect::<Vec<_>>();
let join_type = match join_type.try_into() {
Ok(JoinType::Inner) => DFJoinType::Inner,
Ok(JoinType::LeftOuter) => DFJoinType::Left,
Ok(JoinType::RightOuter) => DFJoinType::Right,
Ok(JoinType::FullOuter) => DFJoinType::Full,
Ok(JoinType::LeftSemi) => DFJoinType::LeftSemi,
Ok(JoinType::RightSemi) => DFJoinType::RightSemi,
Ok(JoinType::LeftAnti) => DFJoinType::LeftAnti,
Ok(JoinType::RightAnti) => DFJoinType::RightAnti,
Err(_) => {
return Err(GeneralError(format!(
"Unsupported join type: {:?}",
join_type
)));
}
};
// Handle join filter as DataFusion `JoinFilter` struct
let join_filter = if let Some(expr) = condition {
let left_schema = left.schema();
let right_schema = right.schema();
let left_fields = left_schema.fields();
let right_fields = right_schema.fields();
let all_fields: Vec<_> = left_fields
.into_iter()
.chain(right_fields)
.cloned()
.collect();
let full_schema = Arc::new(Schema::new(all_fields));
// Because we cast dictionary array to array in scan operator,
// we need to change dictionary type to data type for join filter expression.
let fields: Vec<_> = full_schema
.fields()
.iter()
.map(|f| match f.data_type() {
DataType::Dictionary(_, val_type) => Arc::new(Field::new(
f.name(),
val_type.as_ref().clone(),
f.is_nullable(),
)),
_ => Arc::clone(f),
})
.collect();
let full_schema = Arc::new(Schema::new(fields));
let physical_expr = self.create_expr(expr, full_schema)?;
let (left_field_indices, right_field_indices) =
expr_to_columns(&physical_expr, left_fields.len(), right_fields.len())?;
let column_indices = JoinFilter::build_column_indices(
left_field_indices.clone(),
right_field_indices.clone(),
);
let filter_fields: Vec<Field> = left_field_indices
.clone()
.into_iter()
.map(|i| left.schema().field(i).clone())
.chain(
right_field_indices
.clone()
.into_iter()
.map(|i| right.schema().field(i).clone()),
)
// Because we cast dictionary array to array in scan operator,
// we need to change dictionary type to data type for join filter expression.
.map(|f| match f.data_type() {
DataType::Dictionary(_, val_type) => {
Field::new(f.name(), val_type.as_ref().clone(), f.is_nullable())
}
_ => f.clone(),
})
.collect_vec();
let filter_schema = Schema::new_with_metadata(filter_fields, HashMap::new());
// Rewrite the physical expression to use the new column indices.
// DataFusion's join filter is bound to intermediate schema which contains
// only the fields used in the filter expression. But the Spark's join filter
// expression is bound to the full schema. We need to rewrite the physical
// expression to use the new column indices.
let rewritten_physical_expr = rewrite_physical_expr(
physical_expr,
left_schema.fields.len(),
right_schema.fields.len(),
&left_field_indices,
&right_field_indices,
)?;
Some(JoinFilter::new(
rewritten_physical_expr,
column_indices,
filter_schema.into(),
))
} else {
None
};
Ok((
JoinParameters {
left: Arc::clone(&left),
right: Arc::clone(&right),
join_on,
join_type,
join_filter,
},
left_scans,
))
}