in core/sdk/src/messages/send_messages.rs [222:372]
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
enum Field {
Partitioning,
Messages,
}
impl<'de> Deserialize<'de> for Field {
fn deserialize<D>(deserializer: D) -> Result<Field, D::Error>
where
D: Deserializer<'de>,
{
struct FieldVisitor;
impl Visitor<'_> for FieldVisitor {
type Value = Field;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("`partitioning` or `messages`")
}
fn visit_str<E>(self, value: &str) -> Result<Field, E>
where
E: de::Error,
{
match value {
"partitioning" => Ok(Field::Partitioning),
"messages" => Ok(Field::Messages),
_ => Err(de::Error::unknown_field(
value,
&["partitioning", "messages"],
)),
}
}
}
deserializer.deserialize_identifier(FieldVisitor)
}
}
struct SendMessagesVisitor;
impl<'de> Visitor<'de> for SendMessagesVisitor {
type Value = SendMessages;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("struct SendMessages")
}
fn visit_map<V>(self, mut map: V) -> Result<SendMessages, V::Error>
where
V: MapAccess<'de>,
{
let mut partitioning = None;
let mut messages = None;
while let Some(key) = map.next_key()? {
match key {
Field::Partitioning => {
if partitioning.is_some() {
return Err(de::Error::duplicate_field("partitioning"));
}
partitioning = Some(map.next_value()?);
}
Field::Messages => {
if messages.is_some() {
return Err(de::Error::duplicate_field("messages"));
}
let message_data: Vec<serde_json::Value> = map.next_value()?;
let mut iggy_messages = Vec::new();
for msg in message_data {
let id =
msg.get("id").and_then(|v| v.as_u64()).unwrap_or(0) as u128;
let payload = msg
.get("payload")
.and_then(|v| v.as_str())
.ok_or_else(|| de::Error::missing_field("payload"))?;
let payload_bytes = BASE64
.decode(payload)
.map_err(|_| de::Error::custom("Invalid base64 payload"))?;
let headers_map = if let Some(headers) = msg.get("headers") {
if headers.is_null() {
None
} else {
Some(serde_json::from_value(headers.clone()).map_err(
|_| de::Error::custom("Invalid headers format"),
)?)
}
} else {
None
};
let iggy_message = if let Some(headers) = headers_map {
IggyMessage::builder()
.id(id)
.payload(payload_bytes.into())
.user_headers(headers)
.build()
.map_err(|e| {
de::Error::custom(format!(
"Failed to create message with headers: {e}"
))
})?
} else {
IggyMessage::builder()
.id(id)
.payload(payload_bytes.into())
.build()
.map_err(|e| {
de::Error::custom(format!(
"Failed to create message: {e}"
))
})?
};
iggy_messages.push(iggy_message);
}
messages = Some(iggy_messages);
}
}
}
let partitioning =
partitioning.ok_or_else(|| de::Error::missing_field("partitioning"))?;
let messages = messages.ok_or_else(|| de::Error::missing_field("messages"))?;
let batch = IggyMessagesBatch::from(&messages);
Ok(SendMessages {
metadata_length: 0, // this field is used only for TCP/QUIC
stream_id: Identifier::default(),
topic_id: Identifier::default(),
partitioning,
batch,
})
}
}
deserializer.deserialize_struct(
"SendMessages",
&["partitioning", "messages"],
SendMessagesVisitor,
)
}