in core/sdk/src/models/messaging/messages_batch.rs [209:297]
fn validate(&self) -> Result<(), IggyError> {
if self.is_empty() {
return Err(IggyError::InvalidMessagesCount);
}
let indexes_count = self.indexes.count();
let indexes_size = self.indexes.size();
if indexes_size % INDEX_SIZE as u32 != 0 {
tracing::error!(
"Indexes size {} is not a multiple of index size {}",
indexes_size,
INDEX_SIZE
);
return Err(IggyError::InvalidIndexesByteSize(indexes_size));
}
if indexes_count != self.count() {
tracing::error!(
"Indexes count {} does not match messages count {}",
indexes_count,
self.count()
);
return Err(IggyError::InvalidIndexesCount(indexes_count, self.count()));
}
let mut messages_count = 0;
let mut messages_size = 0;
for i in 0..self.count() {
if let Some(index_view) = self.indexes.get(i) {
if index_view.offset() != 0 {
tracing::error!("Non-zero offset {} at index: {}", index_view.offset(), i);
return Err(IggyError::NonZeroOffset(index_view.offset() as u64, i));
}
if index_view.timestamp() != 0 {
tracing::error!(
"Non-zero timestamp {} at index: {}",
index_view.timestamp(),
i
);
return Err(IggyError::NonZeroTimestamp(index_view.timestamp(), i));
}
} else {
tracing::error!("Index {} is missing", i);
return Err(IggyError::MissingIndex(i));
}
if let Some(message) = self.get(i as usize) {
if message.payload().len() as u32 > MAX_PAYLOAD_SIZE {
tracing::error!(
"Message payload size {} exceeds maximum payload size {}",
message.payload().len(),
MAX_PAYLOAD_SIZE
);
return Err(IggyError::TooBigMessagePayload);
}
messages_size += message.size();
messages_count += 1;
} else {
tracing::error!("Missing index {}", i);
return Err(IggyError::MissingIndex(i));
}
}
if indexes_count != messages_count {
tracing::error!(
"Indexes count {} does not match messages count {}",
indexes_count,
messages_count
);
return Err(IggyError::InvalidMessagesCount);
}
if messages_size != self.messages.len() {
tracing::error!(
"Messages size {} does not match messages buffer size {}",
messages_size,
self.messages.len() as u64
);
return Err(IggyError::InvalidMessagesSize(
messages_size as u32,
self.messages.len() as u32,
));
}
Ok(())
}