fn try_decode()

in src/codec.rs [34:99]


    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        registry: &dyn FunctionRegistry,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // TODO: clean this up
        if let Ok(node) = PartitionIsolatorExecNode::decode(buf) {
            if inputs.len() != 1 {
                Err(internal_datafusion_err!(
                    "PartitionIsolatorExec requires one input"
                ))
            } else {
                Ok(Arc::new(PartitionIsolatorExec::new(
                    inputs[0].clone(),
                    node.partition_count as usize,
                )))
            }
        } else if let Ok(node) = DfRayStageReaderExecNode::decode(buf) {
            let schema: Schema = node
                .schema
                .as_ref()
                .ok_or(internal_datafusion_err!("missing schema in proto"))?
                .try_into()?;

            let part = parse_protobuf_partitioning(
                node.partitioning.as_ref(),
                registry,
                &schema,
                &DefaultPhysicalExtensionCodec {},
            )?
            .ok_or(internal_datafusion_err!("missing partitioning in proto"))?;

            Ok(Arc::new(DFRayStageReaderExec::try_new(
                part,
                Arc::new(schema),
                node.stage_id as usize,
            )?))
        } else if let Ok(node) = MaxRowsExecNode::decode(buf) {
            if inputs.len() != 1 {
                Err(internal_datafusion_err!(
                    "MaxRowsExec requires one input, got {}",
                    inputs.len()
                ))
            } else {
                Ok(Arc::new(MaxRowsExec::new(
                    inputs[0].clone(),
                    node.max_rows as usize,
                )))
            }
        } else if let Ok(node) = PrefetchExecNode::decode(buf) {
            if inputs.len() != 1 {
                Err(internal_datafusion_err!(
                    "MaxRowsExec requires one input, got {}",
                    inputs.len()
                ))
            } else {
                Ok(Arc::new(PrefetchExec::new(
                    inputs[0].clone(),
                    node.buf_size as usize,
                )))
            }
        } else {
            internal_err!("Should not reach this point")
        }
    }