in core/server/src/state/command.rs [110:165]
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
where
Self: Sized,
{
let code = bytes.slice(0..4).get_u32_le();
let length = bytes.slice(4..8).get_u32_le();
let payload = bytes.slice(8..8 + length as usize);
match code {
CREATE_STREAM_CODE => Ok(EntryCommand::CreateStream(CreateStreamWithId::from_bytes(
payload,
)?)),
UPDATE_STREAM_CODE => Ok(EntryCommand::UpdateStream(UpdateStream::from_bytes(
payload,
)?)),
DELETE_STREAM_CODE => Ok(EntryCommand::DeleteStream(DeleteStream::from_bytes(
payload,
)?)),
PURGE_STREAM_CODE => Ok(EntryCommand::PurgeStream(PurgeStream::from_bytes(payload)?)),
CREATE_TOPIC_CODE => Ok(EntryCommand::CreateTopic(CreateTopicWithId::from_bytes(
payload,
)?)),
UPDATE_TOPIC_CODE => Ok(EntryCommand::UpdateTopic(UpdateTopic::from_bytes(payload)?)),
DELETE_TOPIC_CODE => Ok(EntryCommand::DeleteTopic(DeleteTopic::from_bytes(payload)?)),
PURGE_TOPIC_CODE => Ok(EntryCommand::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
CREATE_PARTITIONS_CODE => Ok(EntryCommand::CreatePartitions(
CreatePartitions::from_bytes(payload)?,
)),
DELETE_PARTITIONS_CODE => Ok(EntryCommand::DeletePartitions(
DeletePartitions::from_bytes(payload)?,
)),
CREATE_CONSUMER_GROUP_CODE => Ok(EntryCommand::CreateConsumerGroup(
CreateConsumerGroupWithId::from_bytes(payload)?,
)),
DELETE_CONSUMER_GROUP_CODE => Ok(EntryCommand::DeleteConsumerGroup(
DeleteConsumerGroup::from_bytes(payload)?,
)),
CREATE_USER_CODE => Ok(EntryCommand::CreateUser(CreateUserWithId::from_bytes(
payload,
)?)),
UPDATE_USER_CODE => Ok(EntryCommand::UpdateUser(UpdateUser::from_bytes(payload)?)),
DELETE_USER_CODE => Ok(EntryCommand::DeleteUser(DeleteUser::from_bytes(payload)?)),
CHANGE_PASSWORD_CODE => Ok(EntryCommand::ChangePassword(ChangePassword::from_bytes(
payload,
)?)),
UPDATE_PERMISSIONS_CODE => Ok(EntryCommand::UpdatePermissions(
UpdatePermissions::from_bytes(payload)?,
)),
CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::CreatePersonalAccessToken(
CreatePersonalAccessTokenWithHash::from_bytes(payload)?,
)),
DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::DeletePersonalAccessToken(
DeletePersonalAccessToken::from_bytes(payload)?,
)),
_ => Err(IggyError::InvalidCommand),
}
}