in core/sdk/src/messages/poll_messages.rs [155:209]
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
if bytes.len() < 29 {
return Err(IggyError::InvalidCommand);
}
let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0])?;
let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
let stream_id = Identifier::from_bytes(bytes.slice(position..))?;
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
let partition_id = u32::from_le_bytes(
bytes[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let partition_id = match partition_id {
0 => None,
partition_id => Some(partition_id),
};
let polling_kind = PollingKind::from_code(bytes[position + 4])?;
position += 5;
let value = u64::from_le_bytes(
bytes[position..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let strategy = PollingStrategy {
kind: polling_kind,
value,
};
let count = u32::from_le_bytes(
bytes[position + 8..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let auto_commit = bytes[position + 12];
let auto_commit = matches!(auto_commit, 1);
let command = PollMessages {
consumer,
stream_id,
topic_id,
partition_id,
strategy,
count,
auto_commit,
};
Ok(command)
}