in datafusion/expr/src/logical_plan/plan.rs [1718:2022]
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self.0 {
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
LogicalPlan::RecursiveQuery(RecursiveQuery {
is_distinct, ..
}) => {
write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
}
LogicalPlan::Values(Values { ref values, .. }) => {
let str_values: Vec<_> = values
.iter()
// limit to only 5 values to avoid horrible display
.take(5)
.map(|row| {
let item = row
.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({item})")
})
.collect();
let eclipse = if values.len() > 5 { "..." } else { "" };
write!(f, "Values: {}{}", str_values.join(", "), eclipse)
}
LogicalPlan::TableScan(TableScan {
ref source,
ref table_name,
ref projection,
ref filters,
ref fetch,
..
}) => {
let projected_fields = match projection {
Some(indices) => {
let schema = source.schema();
let names: Vec<&str> = indices
.iter()
.map(|i| schema.field(*i).name().as_str())
.collect();
format!(" projection=[{}]", names.join(", "))
}
_ => "".to_string(),
};
write!(f, "TableScan: {table_name}{projected_fields}")?;
if !filters.is_empty() {
let mut full_filter = vec![];
let mut partial_filter = vec![];
let mut unsupported_filters = vec![];
let filters: Vec<&Expr> = filters.iter().collect();
if let Ok(results) =
source.supports_filters_pushdown(&filters)
{
filters.iter().zip(results.iter()).for_each(
|(x, res)| match res {
TableProviderFilterPushDown::Exact => {
full_filter.push(x)
}
TableProviderFilterPushDown::Inexact => {
partial_filter.push(x)
}
TableProviderFilterPushDown::Unsupported => {
unsupported_filters.push(x)
}
},
);
}
if !full_filter.is_empty() {
write!(
f,
", full_filters=[{}]",
expr_vec_fmt!(full_filter)
)?;
};
if !partial_filter.is_empty() {
write!(
f,
", partial_filters=[{}]",
expr_vec_fmt!(partial_filter)
)?;
}
if !unsupported_filters.is_empty() {
write!(
f,
", unsupported_filters=[{}]",
expr_vec_fmt!(unsupported_filters)
)?;
}
}
if let Some(n) = fetch {
write!(f, ", fetch={n}")?;
}
Ok(())
}
LogicalPlan::Projection(Projection { ref expr, .. }) => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{expr_item}")?;
}
Ok(())
}
LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
write!(f, "Dml: op=[{op}] table=[{table_name}]")
}
LogicalPlan::Copy(CopyTo {
input: _,
output_url,
file_type,
options,
..
}) => {
let op_str = options
.iter()
.map(|(k, v)| format!("{k} {v}"))
.collect::<Vec<String>>()
.join(", ");
write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
}
LogicalPlan::Ddl(ddl) => {
write!(f, "{}", ddl.display())
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
}) => write!(f, "Filter: {expr}"),
LogicalPlan::Window(Window {
ref window_expr, ..
}) => {
write!(
f,
"WindowAggr: windowExpr=[[{}]]",
expr_vec_fmt!(window_expr)
)
}
LogicalPlan::Aggregate(Aggregate {
ref group_expr,
ref aggr_expr,
..
}) => write!(
f,
"Aggregate: groupBy=[[{}]], aggr=[[{}]]",
expr_vec_fmt!(group_expr),
expr_vec_fmt!(aggr_expr)
),
LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
write!(f, "Sort: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{expr_item}")?;
}
if let Some(a) = fetch {
write!(f, ", fetch={a}")?;
}
Ok(())
}
LogicalPlan::Join(Join {
on: ref keys,
filter,
join_constraint,
join_type,
..
}) => {
let join_expr: Vec<String> =
keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
let filter_expr = filter
.as_ref()
.map(|expr| format!(" Filter: {expr}"))
.unwrap_or_else(|| "".to_string());
let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
"Cross".to_string()
} else {
join_type.to_string()
};
match join_constraint {
JoinConstraint::On => {
write!(
f,
"{} Join: {}{}",
join_type,
join_expr.join(", "),
filter_expr
)
}
JoinConstraint::Using => {
write!(
f,
"{} Join: Using {}{}",
join_type,
join_expr.join(", "),
filter_expr,
)
}
}
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => {
write!(f, "Repartition: RoundRobinBatch partition_count={n}")
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
write!(
f,
"Repartition: Hash({}) partition_count={}",
hash_expr.join(", "),
n
)
}
Partitioning::DistributeBy(expr) => {
let dist_by_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
write!(
f,
"Repartition: DistributeBy({})",
dist_by_expr.join(", "),
)
}
},
LogicalPlan::Limit(limit) => {
// Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
let skip_str = match limit.get_skip_type() {
Ok(SkipType::Literal(n)) => n.to_string(),
_ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
};
let fetch_str = match limit.get_fetch_type() {
Ok(FetchType::Literal(Some(n))) => n.to_string(),
Ok(FetchType::Literal(None)) => "None".to_string(),
_ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
};
write!(
f,
"Limit: skip={}, fetch={}", skip_str,fetch_str,
)
}
LogicalPlan::Subquery(Subquery { .. }) => {
write!(f, "Subquery:")
}
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {alias}")
}
LogicalPlan::Statement(statement) => {
write!(f, "{}", statement.display())
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => write!(f, "Distinct:"),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => write!(
f,
"DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
expr_vec_fmt!(on_expr),
expr_vec_fmt!(select_expr),
if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
),
},
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
LogicalPlan::Unnest(Unnest {
input: plan,
list_type_columns: list_col_indices,
struct_type_columns: struct_col_indices, .. }) => {
let input_columns = plan.schema().columns();
let list_type_columns = list_col_indices
.iter()
.map(|(i,unnest_info)|
format!("{}|depth={}", &input_columns[*i].to_string(),
unnest_info.depth))
.collect::<Vec<String>>();
let struct_type_columns = struct_col_indices
.iter()
.map(|i| &input_columns[*i])
.collect::<Vec<&Column>>();
// get items from input_columns indexed by list_col_indices
write!(f, "Unnest: lists[{}] structs[{}]",
expr_vec_fmt!(list_type_columns),
expr_vec_fmt!(struct_type_columns))
}
}
}