in src/codec.rs [34:99]
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODO: clean this up
if let Ok(node) = PartitionIsolatorExecNode::decode(buf) {
if inputs.len() != 1 {
Err(internal_datafusion_err!(
"PartitionIsolatorExec requires one input"
))
} else {
Ok(Arc::new(PartitionIsolatorExec::new(
inputs[0].clone(),
node.partition_count as usize,
)))
}
} else if let Ok(node) = DfRayStageReaderExecNode::decode(buf) {
let schema: Schema = node
.schema
.as_ref()
.ok_or(internal_datafusion_err!("missing schema in proto"))?
.try_into()?;
let part = parse_protobuf_partitioning(
node.partitioning.as_ref(),
registry,
&schema,
&DefaultPhysicalExtensionCodec {},
)?
.ok_or(internal_datafusion_err!("missing partitioning in proto"))?;
Ok(Arc::new(DFRayStageReaderExec::try_new(
part,
Arc::new(schema),
node.stage_id as usize,
)?))
} else if let Ok(node) = MaxRowsExecNode::decode(buf) {
if inputs.len() != 1 {
Err(internal_datafusion_err!(
"MaxRowsExec requires one input, got {}",
inputs.len()
))
} else {
Ok(Arc::new(MaxRowsExec::new(
inputs[0].clone(),
node.max_rows as usize,
)))
}
} else if let Ok(node) = PrefetchExecNode::decode(buf) {
if inputs.len() != 1 {
Err(internal_datafusion_err!(
"MaxRowsExec requires one input, got {}",
inputs.len()
))
} else {
Ok(Arc::new(PrefetchExec::new(
inputs[0].clone(),
node.buf_size as usize,
)))
}
} else {
internal_err!("Should not reach this point")
}
}