fn transform_messages_to_protobuf()

in rust/src/producer.rs [121:223]


    fn transform_messages_to_protobuf(
        &self,
        messages: Vec<impl message::Message>,
    ) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
        if messages.is_empty() {
            return Err(ClientError::new(
                ErrorKind::InvalidMessage,
                "no message found",
                Self::OPERATION_SEND_MESSAGE,
            ));
        }

        let born_timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
            Ok(duration) => Some(Timestamp {
                seconds: duration.as_secs() as i64,
                nanos: 0,
            }),
            Err(_) => None,
        };

        let mut pb_messages = Vec::with_capacity(messages.len());
        let mut last_topic: Option<String> = None;
        let mut last_message_group: Option<Option<String>> = None;

        for mut message in messages {
            if let Some(last_topic) = last_topic.clone() {
                if last_topic.ne(&message.take_topic()) {
                    return Err(ClientError::new(
                        ErrorKind::InvalidMessage,
                        "Not all messages have the same topic.",
                        Self::OPERATION_SEND_MESSAGE,
                    ));
                }
            } else {
                last_topic = Some(message.take_topic());
            }

            let mut message_group = message.take_message_group();
            if let Some(last_message_group) = last_message_group.clone() {
                if last_message_group.ne(&message_group) {
                    return Err(ClientError::new(
                        ErrorKind::InvalidMessage,
                        "not all messages have the same message group",
                        Self::OPERATION_SEND_MESSAGE,
                    ));
                }
            } else {
                last_message_group = Some(message_group.clone());
            }

            let mut delivery_timestamp = message
                .take_delivery_timestamp()
                .map(|seconds| Timestamp { seconds, nanos: 0 });

            let message_type = if message.transaction_enabled() {
                message_group = None;
                delivery_timestamp = None;
                MessageType::Transaction as i32
            } else if delivery_timestamp.is_some() {
                message_group = None;
                MessageType::Delay as i32
            } else if message_group.is_some() {
                delivery_timestamp = None;
                MessageType::Fifo as i32
            } else {
                MessageType::Normal as i32
            };

            let pb_message = pb::Message {
                topic: Some(Resource {
                    name: message.take_topic(),
                    resource_namespace: self.option.namespace().to_string(),
                }),
                user_properties: message.take_properties(),
                system_properties: Some(SystemProperties {
                    tag: message.take_tag(),
                    keys: message.take_keys(),
                    message_id: message.take_message_id(),
                    message_group,
                    delivery_timestamp,
                    message_type,
                    born_host: HOST_NAME.clone(),
                    born_timestamp: born_timestamp.clone(),
                    body_digest: None,
                    body_encoding: Encoding::Identity as i32,
                    ..SystemProperties::default()
                }),
                body: message.take_body(),
            };
            pb_messages.push(pb_message);
        }

        let topic = last_topic.unwrap();
        if topic.is_empty() {
            return Err(ClientError::new(
                ErrorKind::InvalidMessage,
                "message topic is empty",
                Self::OPERATION_SEND_MESSAGE,
            ));
        }

        Ok((topic, last_message_group.unwrap(), pb_messages))
    }