in src/codec.rs [101:148]
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
if let Some(reader) = node.as_any().downcast_ref::<DFRayStageReaderExec>() {
let schema: protobuf::Schema = reader.schema().try_into()?;
let partitioning: protobuf::Partitioning = serialize_partitioning(
reader.properties().output_partitioning(),
&DefaultPhysicalExtensionCodec {},
)?;
let pb = DfRayStageReaderExecNode {
schema: Some(schema),
partitioning: Some(partitioning),
stage_id: reader.stage_id as u64,
};
pb.encode(buf)
.map_err(|e| internal_datafusion_err!("can't encode ray stage reader pb: {e}"))?;
Ok(())
} else if let Some(pi) = node.as_any().downcast_ref::<PartitionIsolatorExec>() {
let pb = PartitionIsolatorExecNode {
dummy: 0.0,
partition_count: pi.partition_count as u64,
};
pb.encode(buf)
.map_err(|e| internal_datafusion_err!("can't encode partition isolator pb: {e}"))?;
Ok(())
} else if let Some(max) = node.as_any().downcast_ref::<MaxRowsExec>() {
let pb = MaxRowsExecNode {
max_rows: max.max_rows as u64,
};
pb.encode(buf)
.map_err(|e| internal_datafusion_err!("can't encode max rows pb: {e}"))?;
Ok(())
} else if let Some(pre) = node.as_any().downcast_ref::<PrefetchExec>() {
let pb = PrefetchExecNode {
dummy: 0,
buf_size: pre.buf_size as u64,
};
pb.encode(buf)
.map_err(|e| internal_datafusion_err!("can't encode prefetch pb: {e}"))?;
Ok(())
} else {
internal_err!("Not supported")
}
}