core/sdk/src/models/messaging/messages_batch.rs (280 lines of code) (raw):
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use super::{IggyIndexes, IggyMessage, IggyMessageView, IggyMessageViewIterator};
use crate::{
error::IggyError,
models::messaging::{INDEX_SIZE, MAX_PAYLOAD_SIZE},
prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable},
};
use bytes::{BufMut, Bytes, BytesMut};
use std::ops::{Deref, Index};
/// An immutable messages container that holds a buffer of messages
#[derive(Clone, Debug, PartialEq)]
pub struct IggyMessagesBatch {
/// The number of messages in the batch
count: u32,
/// The byte-indexes of messages in the buffer, represented as array of u32's. Offsets are relative.
/// Each index consists of offset, position (byte offset in the buffer) and timestamp.
indexes: IggyIndexes,
/// The buffer containing the messages
messages: Bytes,
}
impl IggyMessagesBatch {
/// Create a batch from indexes buffer and messages buffer
pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self {
Self {
count,
indexes,
messages,
}
}
/// Creates a empty messages batch
pub fn empty() -> Self {
Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0)
}
/// Create iterator over messages
pub fn iter(&self) -> IggyMessageViewIterator {
IggyMessageViewIterator::new(&self.messages)
}
/// Get the number of messages
pub fn count(&self) -> u32 {
self.count
}
/// Check if the batch is empty
pub fn is_empty(&self) -> bool {
self.count() == 0
}
/// Get the total size of all messages in bytes
pub fn size(&self) -> u32 {
self.messages.len() as u32
}
/// Get access to the underlying buffer
pub fn buffer(&self) -> &[u8] {
&self.messages
}
/// Get the indexes slice
pub fn indexes_slice(&self) -> &[u8] {
&self.indexes
}
/// Take the indexes from the batch
pub fn take_indexes(&mut self) -> IggyIndexes {
std::mem::take(&mut self.indexes)
}
/// Decompose the batch into its components
pub fn decompose(self) -> (u32, IggyIndexes, Bytes) {
(self.count, self.indexes, self.messages)
}
/// Get index of first message
pub fn first_offset(&self) -> Option<u64> {
self.iter().next().map(|msg| msg.header().offset())
}
/// Get timestamp of first message
pub fn first_timestamp(&self) -> Option<u64> {
self.iter().next().map(|msg| msg.header().timestamp())
}
/// Get offset of last message
pub fn last_offset(&self) -> Option<u64> {
self.iter().last().map(|msg| msg.header().offset())
}
/// Get timestamp of last message
pub fn last_timestamp(&self) -> Option<u64> {
self.iter().last().map(|msg| msg.header().timestamp())
}
/// Calculates the start position of a message at the given index in the buffer
fn message_start_position(&self, index: usize) -> usize {
if index == 0 {
0
} else {
self.position_at(index as u32 - 1) as usize - self.indexes.base_position() as usize
}
}
/// Calculates the end position of a message at the given index in the buffer
fn message_end_position(&self, index: usize) -> usize {
if index >= self.count as usize - 1 {
self.messages.len()
} else {
self.position_at(index as u32) as usize - self.indexes.base_position() as usize
}
}
/// Gets the byte range for a message at the given index
fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
if index >= self.count as usize {
return None;
}
let start = self.message_start_position(index);
let end = self.message_end_position(index);
if start > self.messages.len() || end > self.messages.len() || start > end {
return None;
}
Some((start, end))
}
/// Helper method to read a position (u32) from the byte array at the given index
fn position_at(&self, position_index: u32) -> u32 {
if let Some(index) = self.indexes.get(position_index) {
index.position()
} else {
0
}
}
/// Get the message at the specified index.
/// Returns None if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<IggyMessageView> {
if let Some((start, end)) = self.get_message_boundaries(index) {
Some(IggyMessageView::new(&self.messages[start..end]))
} else {
None
}
}
}
impl Index<usize> for IggyMessagesBatch {
type Output = [u8];
fn index(&self, index: usize) -> &Self::Output {
if index >= self.count as usize {
panic!(
"Index out of bounds: the len is {} but the index is {}",
self.count, index
);
}
let (start, end) = self
.get_message_boundaries(index)
.expect("Invalid message boundaries");
&self.messages[start..end]
}
}
impl BytesSerializable for IggyMessagesBatch {
fn to_bytes(&self) -> Bytes {
panic!("should not be used");
}
fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> {
panic!("don't use");
}
fn write_to_buffer(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.count);
buf.put_slice(&self.indexes);
buf.put_slice(&self.messages);
}
fn get_buffer_size(&self) -> usize {
4 + self.indexes.len() + self.messages.len()
}
}
impl Validatable<IggyError> for IggyMessagesBatch {
fn validate(&self) -> Result<(), IggyError> {
if self.is_empty() {
return Err(IggyError::InvalidMessagesCount);
}
let indexes_count = self.indexes.count();
let indexes_size = self.indexes.size();
if indexes_size % INDEX_SIZE as u32 != 0 {
tracing::error!(
"Indexes size {} is not a multiple of index size {}",
indexes_size,
INDEX_SIZE
);
return Err(IggyError::InvalidIndexesByteSize(indexes_size));
}
if indexes_count != self.count() {
tracing::error!(
"Indexes count {} does not match messages count {}",
indexes_count,
self.count()
);
return Err(IggyError::InvalidIndexesCount(indexes_count, self.count()));
}
let mut messages_count = 0;
let mut messages_size = 0;
for i in 0..self.count() {
if let Some(index_view) = self.indexes.get(i) {
if index_view.offset() != 0 {
tracing::error!("Non-zero offset {} at index: {}", index_view.offset(), i);
return Err(IggyError::NonZeroOffset(index_view.offset() as u64, i));
}
if index_view.timestamp() != 0 {
tracing::error!(
"Non-zero timestamp {} at index: {}",
index_view.timestamp(),
i
);
return Err(IggyError::NonZeroTimestamp(index_view.timestamp(), i));
}
} else {
tracing::error!("Index {} is missing", i);
return Err(IggyError::MissingIndex(i));
}
if let Some(message) = self.get(i as usize) {
if message.payload().len() as u32 > MAX_PAYLOAD_SIZE {
tracing::error!(
"Message payload size {} exceeds maximum payload size {}",
message.payload().len(),
MAX_PAYLOAD_SIZE
);
return Err(IggyError::TooBigMessagePayload);
}
messages_size += message.size();
messages_count += 1;
} else {
tracing::error!("Missing index {}", i);
return Err(IggyError::MissingIndex(i));
}
}
if indexes_count != messages_count {
tracing::error!(
"Indexes count {} does not match messages count {}",
indexes_count,
messages_count
);
return Err(IggyError::InvalidMessagesCount);
}
if messages_size != self.messages.len() {
tracing::error!(
"Messages size {} does not match messages buffer size {}",
messages_size,
self.messages.len() as u64
);
return Err(IggyError::InvalidMessagesSize(
messages_size as u32,
self.messages.len() as u32,
));
}
Ok(())
}
}
impl Sizeable for IggyMessagesBatch {
fn get_size_bytes(&self) -> IggyByteSize {
IggyByteSize::from(self.messages.len() as u64)
}
}
impl Deref for IggyMessagesBatch {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buffer()
}
}
/// Converts a slice of IggyMessage objects into an IggyMessagesBatch.
///
/// This trait implementation enables idiomatic conversion from message slices:
/// `let batch = IggyMessagesBatch::from(messages_slice);`
///
/// 1. Messages are serialized into a contiguous buffer
/// 2. Index entries are created for each message with:
/// - offset: Set to 0 (will be filled by the server during append)
/// - position: Cumulative byte position of each message in the buffer
/// Subsequent indexes point to the next message in the buffer
/// - timestamp: Set to 0 (will be filled by the server during append)
///
/// # Performance note
///
/// This layout is optimized for server-side processing. The server can efficiently:
/// - Allocate offsets sequentially
/// - Assign timestamps
/// - Write the entire message batch and index data to disk without additional allocations
/// - Update the offset and timestamp fields in-place before persistence
impl From<&[IggyMessage]> for IggyMessagesBatch {
fn from(messages: &[IggyMessage]) -> Self {
if messages.is_empty() {
return Self::empty();
}
let messages_count = messages.len() as u32;
let mut total_size = 0;
for msg in messages.iter() {
total_size += msg.get_size_bytes().as_bytes_usize();
}
let mut messages_buffer = BytesMut::with_capacity(total_size);
let mut indexes_buffer = BytesMut::with_capacity(messages_count as usize * INDEX_SIZE);
let mut current_position = 0;
for message in messages.iter() {
message.write_to_buffer(&mut messages_buffer);
let msg_size = message.get_size_bytes().as_bytes_u32();
current_position += msg_size;
indexes_buffer.put_u32_le(0);
indexes_buffer.put_u32_le(current_position);
indexes_buffer.put_u64_le(0);
}
let indexes = IggyIndexes::new(indexes_buffer.freeze(), 0);
Self {
count: messages_count,
indexes,
messages: messages_buffer.freeze(),
}
}
}
/// Converts a reference to Vec<IggyMessage> into an IggyMessagesBatch.
///
/// This implementation delegates to the slice implementation via `as_slice()`.
/// It's provided for convenience so it's possible to use `&messages` without
/// explicit slice conversion.
impl From<&Vec<IggyMessage>> for IggyMessagesBatch {
fn from(messages: &Vec<IggyMessage>) -> Self {
Self::from(messages.as_slice())
}
}
/// Converts a Vec<IggyMessage> into an IggyMessagesBatch.
impl From<Vec<IggyMessage>> for IggyMessagesBatch {
fn from(messages: Vec<IggyMessage>) -> Self {
Self::from(messages.as_slice())
}
}