fn try_encode()

in src/codec.rs [101:148]


    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
        if let Some(reader) = node.as_any().downcast_ref::<DFRayStageReaderExec>() {
            let schema: protobuf::Schema = reader.schema().try_into()?;
            let partitioning: protobuf::Partitioning = serialize_partitioning(
                reader.properties().output_partitioning(),
                &DefaultPhysicalExtensionCodec {},
            )?;

            let pb = DfRayStageReaderExecNode {
                schema: Some(schema),
                partitioning: Some(partitioning),
                stage_id: reader.stage_id as u64,
            };

            pb.encode(buf)
                .map_err(|e| internal_datafusion_err!("can't encode ray stage reader pb: {e}"))?;
            Ok(())
        } else if let Some(pi) = node.as_any().downcast_ref::<PartitionIsolatorExec>() {
            let pb = PartitionIsolatorExecNode {
                dummy: 0.0,
                partition_count: pi.partition_count as u64,
            };

            pb.encode(buf)
                .map_err(|e| internal_datafusion_err!("can't encode partition isolator pb: {e}"))?;

            Ok(())
        } else if let Some(max) = node.as_any().downcast_ref::<MaxRowsExec>() {
            let pb = MaxRowsExecNode {
                max_rows: max.max_rows as u64,
            };
            pb.encode(buf)
                .map_err(|e| internal_datafusion_err!("can't encode max rows pb: {e}"))?;

            Ok(())
        } else if let Some(pre) = node.as_any().downcast_ref::<PrefetchExec>() {
            let pb = PrefetchExecNode {
                dummy: 0,
                buf_size: pre.buf_size as u64,
            };
            pb.encode(buf)
                .map_err(|e| internal_datafusion_err!("can't encode prefetch pb: {e}"))?;

            Ok(())
        } else {
            internal_err!("Not supported")
        }
    }