in rust/src/producer.rs [121:223]
fn transform_messages_to_protobuf(
&self,
messages: Vec<impl message::Message>,
) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
if messages.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"no message found",
Self::OPERATION_SEND_MESSAGE,
));
}
let born_timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => Some(Timestamp {
seconds: duration.as_secs() as i64,
nanos: 0,
}),
Err(_) => None,
};
let mut pb_messages = Vec::with_capacity(messages.len());
let mut last_topic: Option<String> = None;
let mut last_message_group: Option<Option<String>> = None;
for mut message in messages {
if let Some(last_topic) = last_topic.clone() {
if last_topic.ne(&message.take_topic()) {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"Not all messages have the same topic.",
Self::OPERATION_SEND_MESSAGE,
));
}
} else {
last_topic = Some(message.take_topic());
}
let mut message_group = message.take_message_group();
if let Some(last_message_group) = last_message_group.clone() {
if last_message_group.ne(&message_group) {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"not all messages have the same message group",
Self::OPERATION_SEND_MESSAGE,
));
}
} else {
last_message_group = Some(message_group.clone());
}
let mut delivery_timestamp = message
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });
let message_type = if message.transaction_enabled() {
message_group = None;
delivery_timestamp = None;
MessageType::Transaction as i32
} else if delivery_timestamp.is_some() {
message_group = None;
MessageType::Delay as i32
} else if message_group.is_some() {
delivery_timestamp = None;
MessageType::Fifo as i32
} else {
MessageType::Normal as i32
};
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
resource_namespace: self.option.namespace().to_string(),
}),
user_properties: message.take_properties(),
system_properties: Some(SystemProperties {
tag: message.take_tag(),
keys: message.take_keys(),
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
message_type,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
body_encoding: Encoding::Identity as i32,
..SystemProperties::default()
}),
body: message.take_body(),
};
pb_messages.push(pb_message);
}
let topic = last_topic.unwrap();
if topic.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"message topic is empty",
Self::OPERATION_SEND_MESSAGE,
));
}
Ok((topic, last_message_group.unwrap(), pb_messages))
}