in datafusion/expr/src/logical_plan/display.rs [313:654]
fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
match node {
LogicalPlan::EmptyRelation(_) => {
json!({
"Node Type": "EmptyRelation",
})
}
LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
json!({
"Node Type": "RecursiveQuery",
"Is Distinct": is_distinct,
})
}
LogicalPlan::Values(Values { ref values, .. }) => {
let str_values = values
.iter()
// limit to only 5 values to avoid horrible display
.take(5)
.map(|row| {
let item = row
.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({item})")
})
.collect::<Vec<_>>()
.join(", ");
let eclipse = if values.len() > 5 { "..." } else { "" };
let values_str = format!("{}{}", str_values, eclipse);
json!({
"Node Type": "Values",
"Values": values_str
})
}
LogicalPlan::TableScan(TableScan {
ref source,
ref table_name,
ref filters,
ref fetch,
..
}) => {
let mut object = json!({
"Node Type": "TableScan",
"Relation Name": table_name.table(),
});
if let Some(s) = table_name.schema() {
object["Schema"] = serde_json::Value::String(s.to_string());
}
if let Some(c) = table_name.catalog() {
object["Catalog"] = serde_json::Value::String(c.to_string());
}
if !filters.is_empty() {
let mut full_filter = vec![];
let mut partial_filter = vec![];
let mut unsupported_filters = vec![];
let filters: Vec<&Expr> = filters.iter().collect();
if let Ok(results) = source.supports_filters_pushdown(&filters) {
filters.iter().zip(results.iter()).for_each(
|(x, res)| match res {
TableProviderFilterPushDown::Exact => full_filter.push(x),
TableProviderFilterPushDown::Inexact => {
partial_filter.push(x)
}
TableProviderFilterPushDown::Unsupported => {
unsupported_filters.push(x)
}
},
);
}
if !full_filter.is_empty() {
object["Full Filters"] =
serde_json::Value::String(expr_vec_fmt!(full_filter));
};
if !partial_filter.is_empty() {
object["Partial Filters"] =
serde_json::Value::String(expr_vec_fmt!(partial_filter));
}
if !unsupported_filters.is_empty() {
object["Unsupported Filters"] =
serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
}
}
if let Some(f) = fetch {
object["Fetch"] = serde_json::Value::Number((*f).into());
}
object
}
LogicalPlan::Projection(Projection { ref expr, .. }) => {
json!({
"Node Type": "Projection",
"Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
})
}
LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
json!({
"Node Type": "Projection",
"Operation": op.name(),
"Table Name": table_name.table()
})
}
LogicalPlan::Copy(CopyTo {
input: _,
output_url,
file_type,
partition_by: _,
options,
}) => {
let op_str = options
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(", ");
json!({
"Node Type": "CopyTo",
"Output URL": output_url,
"File Type": format!("{}", file_type.get_ext()),
"Options": op_str
})
}
LogicalPlan::Ddl(ddl) => {
json!({
"Node Type": "Ddl",
"Operation": format!("{}", ddl.display())
})
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
}) => {
json!({
"Node Type": "Filter",
"Condition": format!("{}", expr)
})
}
LogicalPlan::Window(Window {
ref window_expr, ..
}) => {
json!({
"Node Type": "WindowAggr",
"Expressions": expr_vec_fmt!(window_expr)
})
}
LogicalPlan::Aggregate(Aggregate {
ref group_expr,
ref aggr_expr,
..
}) => {
json!({
"Node Type": "Aggregate",
"Group By": expr_vec_fmt!(group_expr),
"Aggregates": expr_vec_fmt!(aggr_expr)
})
}
LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
let mut object = json!({
"Node Type": "Sort",
"Sort Key": expr_vec_fmt!(expr),
});
if let Some(fetch) = fetch {
object["Fetch"] = serde_json::Value::Number((*fetch).into());
}
object
}
LogicalPlan::Join(Join {
on: ref keys,
filter,
join_constraint,
join_type,
..
}) => {
let join_expr: Vec<String> =
keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
let filter_expr = filter
.as_ref()
.map(|expr| format!(" Filter: {expr}"))
.unwrap_or_else(|| "".to_string());
json!({
"Node Type": format!("{} Join", join_type),
"Join Constraint": format!("{:?}", join_constraint),
"Join Keys": join_expr.join(", "),
"Filter": format!("{}", filter_expr)
})
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => {
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "RoundRobinBatch",
"Partition Count": n
})
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "Hash",
"Partition Count": n,
"Partitioning Key": hash_expr
})
}
Partitioning::DistributeBy(expr) => {
let dist_by_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "DistributeBy",
"Partitioning Key": dist_by_expr
})
}
},
LogicalPlan::Limit(Limit {
ref skip,
ref fetch,
..
}) => {
let mut object = serde_json::json!(
{
"Node Type": "Limit",
}
);
if let Some(s) = skip {
object["Skip"] = s.to_string().into()
};
if let Some(f) = fetch {
object["Fetch"] = f.to_string().into()
};
object
}
LogicalPlan::Subquery(Subquery { .. }) => {
json!({
"Node Type": "Subquery"
})
}
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
json!({
"Node Type": "Subquery",
"Alias": alias.table(),
})
}
LogicalPlan::Statement(statement) => {
json!({
"Node Type": "Statement",
"Statement": format!("{}", statement.display())
})
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => {
json!({
"Node Type": "DistinctAll"
})
}
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => {
let mut object = json!({
"Node Type": "DistinctOn",
"On": expr_vec_fmt!(on_expr),
"Select": expr_vec_fmt!(select_expr),
});
if let Some(sort_expr) = sort_expr {
object["Sort"] =
serde_json::Value::String(expr_vec_fmt!(sort_expr));
}
object
}
},
LogicalPlan::Explain { .. } => {
json!({
"Node Type": "Explain"
})
}
LogicalPlan::Analyze { .. } => {
json!({
"Node Type": "Analyze"
})
}
LogicalPlan::Union(_) => {
json!({
"Node Type": "Union"
})
}
LogicalPlan::Extension(e) => {
json!({
"Node Type": e.node.name(),
"Detail": format!("{:?}", e.node)
})
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
})
}
LogicalPlan::Unnest(Unnest {
input: plan,
list_type_columns: list_col_indices,
struct_type_columns: struct_col_indices,
..
}) => {
let input_columns = plan.schema().columns();
let list_type_columns = list_col_indices
.iter()
.map(|(i, unnest_info)| {
format!(
"{}|depth={:?}",
&input_columns[*i].to_string(),
unnest_info.depth
)
})
.collect::<Vec<String>>();
let struct_type_columns = struct_col_indices
.iter()
.map(|i| &input_columns[*i])
.collect::<Vec<&Column>>();
json!({
"Node Type": "Unnest",
"ListColumn": expr_vec_fmt!(list_type_columns),
"StructColumn": expr_vec_fmt!(struct_type_columns),
})
}
}
}