fn deserialize()

in core/sdk/src/messages/send_messages.rs [222:372]


    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        enum Field {
            Partitioning,
            Messages,
        }

        impl<'de> Deserialize<'de> for Field {
            fn deserialize<D>(deserializer: D) -> Result<Field, D::Error>
            where
                D: Deserializer<'de>,
            {
                struct FieldVisitor;

                impl Visitor<'_> for FieldVisitor {
                    type Value = Field;

                    fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
                        formatter.write_str("`partitioning` or `messages`")
                    }

                    fn visit_str<E>(self, value: &str) -> Result<Field, E>
                    where
                        E: de::Error,
                    {
                        match value {
                            "partitioning" => Ok(Field::Partitioning),
                            "messages" => Ok(Field::Messages),
                            _ => Err(de::Error::unknown_field(
                                value,
                                &["partitioning", "messages"],
                            )),
                        }
                    }
                }

                deserializer.deserialize_identifier(FieldVisitor)
            }
        }

        struct SendMessagesVisitor;

        impl<'de> Visitor<'de> for SendMessagesVisitor {
            type Value = SendMessages;

            fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
                formatter.write_str("struct SendMessages")
            }

            fn visit_map<V>(self, mut map: V) -> Result<SendMessages, V::Error>
            where
                V: MapAccess<'de>,
            {
                let mut partitioning = None;
                let mut messages = None;

                while let Some(key) = map.next_key()? {
                    match key {
                        Field::Partitioning => {
                            if partitioning.is_some() {
                                return Err(de::Error::duplicate_field("partitioning"));
                            }
                            partitioning = Some(map.next_value()?);
                        }
                        Field::Messages => {
                            if messages.is_some() {
                                return Err(de::Error::duplicate_field("messages"));
                            }

                            let message_data: Vec<serde_json::Value> = map.next_value()?;
                            let mut iggy_messages = Vec::new();

                            for msg in message_data {
                                let id =
                                    msg.get("id").and_then(|v| v.as_u64()).unwrap_or(0) as u128;

                                let payload = msg
                                    .get("payload")
                                    .and_then(|v| v.as_str())
                                    .ok_or_else(|| de::Error::missing_field("payload"))?;
                                let payload_bytes = BASE64
                                    .decode(payload)
                                    .map_err(|_| de::Error::custom("Invalid base64 payload"))?;

                                let headers_map = if let Some(headers) = msg.get("headers") {
                                    if headers.is_null() {
                                        None
                                    } else {
                                        Some(serde_json::from_value(headers.clone()).map_err(
                                            |_| de::Error::custom("Invalid headers format"),
                                        )?)
                                    }
                                } else {
                                    None
                                };

                                let iggy_message = if let Some(headers) = headers_map {
                                    IggyMessage::builder()
                                        .id(id)
                                        .payload(payload_bytes.into())
                                        .user_headers(headers)
                                        .build()
                                        .map_err(|e| {
                                            de::Error::custom(format!(
                                                "Failed to create message with headers: {e}"
                                            ))
                                        })?
                                } else {
                                    IggyMessage::builder()
                                        .id(id)
                                        .payload(payload_bytes.into())
                                        .build()
                                        .map_err(|e| {
                                            de::Error::custom(format!(
                                                "Failed to create message: {e}"
                                            ))
                                        })?
                                };

                                iggy_messages.push(iggy_message);
                            }

                            messages = Some(iggy_messages);
                        }
                    }
                }

                let partitioning =
                    partitioning.ok_or_else(|| de::Error::missing_field("partitioning"))?;
                let messages = messages.ok_or_else(|| de::Error::missing_field("messages"))?;

                let batch = IggyMessagesBatch::from(&messages);

                Ok(SendMessages {
                    metadata_length: 0, // this field is used only for TCP/QUIC
                    stream_id: Identifier::default(),
                    topic_id: Identifier::default(),
                    partitioning,
                    batch,
                })
            }
        }

        deserializer.deserialize_struct(
            "SendMessages",
            &["partitioning", "messages"],
            SendMessagesVisitor,
        )
    }