fn validate()

in core/sdk/src/models/messaging/messages_batch.rs [209:297]


    fn validate(&self) -> Result<(), IggyError> {
        if self.is_empty() {
            return Err(IggyError::InvalidMessagesCount);
        }

        let indexes_count = self.indexes.count();
        let indexes_size = self.indexes.size();

        if indexes_size % INDEX_SIZE as u32 != 0 {
            tracing::error!(
                "Indexes size {} is not a multiple of index size {}",
                indexes_size,
                INDEX_SIZE
            );
            return Err(IggyError::InvalidIndexesByteSize(indexes_size));
        }

        if indexes_count != self.count() {
            tracing::error!(
                "Indexes count {} does not match messages count {}",
                indexes_count,
                self.count()
            );
            return Err(IggyError::InvalidIndexesCount(indexes_count, self.count()));
        }

        let mut messages_count = 0;
        let mut messages_size = 0;

        for i in 0..self.count() {
            if let Some(index_view) = self.indexes.get(i) {
                if index_view.offset() != 0 {
                    tracing::error!("Non-zero offset {} at index: {}", index_view.offset(), i);
                    return Err(IggyError::NonZeroOffset(index_view.offset() as u64, i));
                }
                if index_view.timestamp() != 0 {
                    tracing::error!(
                        "Non-zero timestamp {} at index: {}",
                        index_view.timestamp(),
                        i
                    );
                    return Err(IggyError::NonZeroTimestamp(index_view.timestamp(), i));
                }
            } else {
                tracing::error!("Index {} is missing", i);
                return Err(IggyError::MissingIndex(i));
            }

            if let Some(message) = self.get(i as usize) {
                if message.payload().len() as u32 > MAX_PAYLOAD_SIZE {
                    tracing::error!(
                        "Message payload size {} exceeds maximum payload size {}",
                        message.payload().len(),
                        MAX_PAYLOAD_SIZE
                    );
                    return Err(IggyError::TooBigMessagePayload);
                }

                messages_size += message.size();
                messages_count += 1;
            } else {
                tracing::error!("Missing index {}", i);
                return Err(IggyError::MissingIndex(i));
            }
        }

        if indexes_count != messages_count {
            tracing::error!(
                "Indexes count {} does not match messages count {}",
                indexes_count,
                messages_count
            );
            return Err(IggyError::InvalidMessagesCount);
        }

        if messages_size != self.messages.len() {
            tracing::error!(
                "Messages size {} does not match messages buffer size {}",
                messages_size,
                self.messages.len() as u64
            );
            return Err(IggyError::InvalidMessagesSize(
                messages_size as u32,
                self.messages.len() as u32,
            ));
        }

        Ok(())
    }