in core/sdk/src/topics/create_topic.rs [133:186]
fn from_bytes(bytes: Bytes) -> std::result::Result<CreateTopic, IggyError> {
if bytes.len() < 18 {
return Err(IggyError::InvalidCommand);
}
let mut position = 0;
let stream_id = Identifier::from_bytes(bytes.clone())?;
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = u32::from_le_bytes(
bytes[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let topic_id = if topic_id == 0 { None } else { Some(topic_id) };
let partitions_count = u32::from_le_bytes(
bytes[position + 4..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let compression_algorithm = CompressionAlgorithm::from_code(bytes[position + 8])?;
let message_expiry = u64::from_le_bytes(
bytes[position + 9..position + 17]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let message_expiry: IggyExpiry = message_expiry.into();
let max_topic_size = u64::from_le_bytes(
bytes[position + 17..position + 25]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let max_topic_size: MaxTopicSize = max_topic_size.into();
let replication_factor = match bytes[position + 25] {
0 => None,
factor => Some(factor),
};
let name_length = bytes[position + 26];
let name = from_utf8(&bytes[position + 27..(position + 27 + name_length as usize)])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
if name.len() != name_length as usize {
return Err(IggyError::InvalidCommand);
}
let command = CreateTopic {
stream_id,
topic_id,
partitions_count,
compression_algorithm,
message_expiry,
max_topic_size,
replication_factor,
name,
};
Ok(command)
}