in core/sdk/src/messages/polled_messages.rs [90:130]
fn messages_from_bytes_and_count(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> {
let mut messages = Vec::with_capacity(count as usize);
let mut position = 0;
let buf_len = buffer.len();
while position < buf_len {
if position + IGGY_MESSAGE_HEADER_SIZE > buf_len {
break;
}
let header_bytes = buffer.slice(position..position + IGGY_MESSAGE_HEADER_SIZE);
let header = match IggyMessageHeader::from_bytes(header_bytes) {
Ok(h) => h,
Err(e) => {
error!("Failed to deserialize message header: {}", e);
return Err(e);
}
};
position += IGGY_MESSAGE_HEADER_SIZE;
let payload_end = position + header.payload_length as usize;
if payload_end > buf_len {
break;
}
let payload = buffer.slice(position..payload_end);
position = payload_end;
let user_headers = if header.user_headers_length > 0 {
Some(buffer.slice(position..position + header.user_headers_length as usize))
} else {
None
};
position += header.user_headers_length as usize;
messages.push(IggyMessage {
header,
payload,
user_headers,
});
}
Ok(messages)
}