in core/sdk/src/binary/mapper.rs [677:740]
fn map_to_topic(payload: Bytes, position: usize) -> Result<(Topic, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = u64::from_le_bytes(
payload[position + 4..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = created_at.into();
let partitions_count = u32::from_le_bytes(
payload[position + 12..position + 16]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let message_expiry = match u64::from_le_bytes(
payload[position + 16..position + 24]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) {
0 => IggyExpiry::NeverExpire,
message_expiry => message_expiry.into(),
};
let compression_algorithm = CompressionAlgorithm::from_code(payload[position + 24])?;
let max_topic_size = u64::from_le_bytes(
payload[position + 25..position + 33]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let max_topic_size: MaxTopicSize = max_topic_size.into();
let replication_factor = payload[position + 33];
let size_bytes = IggyByteSize::from(u64::from_le_bytes(
payload[position + 34..position + 42]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
));
let messages_count = u64::from_le_bytes(
payload[position + 42..position + 50]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let name_length = payload[position + 50];
let name = from_utf8(&payload[position + 51..position + 51 + name_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let read_bytes = 4 + 8 + 4 + 8 + 8 + 8 + 8 + 1 + 1 + 1 + name_length as usize;
Ok((
Topic {
id,
created_at,
name,
partitions_count,
size: size_bytes,
messages_count,
message_expiry,
compression_algorithm,
max_topic_size,
replication_factor,
},
read_bytes,
))
}