in source/extensions/filters/network/redis_proxy/codec_impl.cc [129:337]
void DecoderImpl::parseSlice(const Buffer::RawSlice& slice) {
const char* buffer = reinterpret_cast<const char*>(slice.mem_);
uint64_t remaining = slice.len_;
while (remaining || state_ == State::ValueComplete) {
ENVOY_LOG(trace, "parse slice: {} remaining", remaining);
switch (state_) {
case State::ValueRootStart: {
ENVOY_LOG(trace, "parse slice: ValueRootStart");
pending_value_root_ = std::make_unique<RespValue>();
pending_value_stack_.push_front({pending_value_root_.get(), 0});
state_ = State::ValueStart;
break;
}
case State::ValueStart: {
ENVOY_LOG(trace, "parse slice: ValueStart: {}", buffer[0]);
pending_integer_.reset();
switch (buffer[0]) {
case '*': {
state_ = State::IntegerStart;
pending_value_stack_.front().value_->type(RespType::Array);
break;
}
case '$': {
state_ = State::IntegerStart;
pending_value_stack_.front().value_->type(RespType::BulkString);
break;
}
case '-': {
state_ = State::SimpleString;
pending_value_stack_.front().value_->type(RespType::Error);
break;
}
case '+': {
state_ = State::SimpleString;
pending_value_stack_.front().value_->type(RespType::SimpleString);
break;
}
case ':': {
state_ = State::IntegerStart;
pending_value_stack_.front().value_->type(RespType::Integer);
break;
}
default: { throw ProtocolError("invalid value type"); }
}
remaining--;
buffer++;
break;
}
case State::IntegerStart: {
ENVOY_LOG(trace, "parse slice: IntegerStart: {}", buffer[0]);
if (buffer[0] == '-') {
pending_integer_.negative_ = true;
remaining--;
buffer++;
}
state_ = State::Integer;
break;
}
case State::Integer: {
ENVOY_LOG(trace, "parse slice: Integer: {}", buffer[0]);
char c = buffer[0];
if (buffer[0] == '\r') {
state_ = State::IntegerLF;
} else {
if (c < '0' || c > '9') {
throw ProtocolError("invalid integer character");
} else {
pending_integer_.integer_ = (pending_integer_.integer_ * 10) + (c - '0');
}
}
remaining--;
buffer++;
break;
}
case State::IntegerLF: {
if (buffer[0] != '\n') {
throw ProtocolError("expected new line");
}
ENVOY_LOG(trace, "parse slice: IntegerLF: {}", pending_integer_.integer_);
remaining--;
buffer++;
PendingValue& current_value = pending_value_stack_.front();
if (current_value.value_->type() == RespType::Array) {
if (pending_integer_.negative_) {
// Null array. Convert to null.
current_value.value_->type(RespType::Null);
state_ = State::ValueComplete;
} else if (pending_integer_.integer_ == 0) {
state_ = State::ValueComplete;
} else {
std::vector<RespValue> values(pending_integer_.integer_);
current_value.value_->asArray().swap(values);
pending_value_stack_.push_front({¤t_value.value_->asArray()[0], 0});
state_ = State::ValueStart;
}
} else if (current_value.value_->type() == RespType::Integer) {
if (pending_integer_.integer_ == 0 || !pending_integer_.negative_) {
current_value.value_->asInteger() = pending_integer_.integer_;
} else {
// By subtracting 1 (and later correcting) we ensure that we remain within the int64_t
// range to allow a valid static_cast. This is an issue when we have a value of -2^63,
// which cannot be represented as 2^63 in the intermediate int64_t.
current_value.value_->asInteger() =
static_cast<int64_t>(pending_integer_.integer_ - 1) * -1 - 1;
}
state_ = State::ValueComplete;
} else {
ASSERT(current_value.value_->type() == RespType::BulkString);
if (!pending_integer_.negative_) {
// TODO(mattklein123): reserve and define max length since we don't stream currently.
state_ = State::BulkStringBody;
} else {
// Null bulk string. Switch type to null and move to value complete.
current_value.value_->type(RespType::Null);
state_ = State::ValueComplete;
}
}
break;
}
case State::BulkStringBody: {
ASSERT(!pending_integer_.negative_);
uint64_t length_to_copy =
std::min(static_cast<uint64_t>(pending_integer_.integer_), remaining);
pending_value_stack_.front().value_->asString().append(buffer, length_to_copy);
pending_integer_.integer_ -= length_to_copy;
remaining -= length_to_copy;
buffer += length_to_copy;
if (pending_integer_.integer_ == 0) {
ENVOY_LOG(trace, "parse slice: BulkStringBody complete: {}",
pending_value_stack_.front().value_->asString());
state_ = State::CR;
}
break;
}
case State::CR: {
ENVOY_LOG(trace, "parse slice: CR");
if (buffer[0] != '\r') {
throw ProtocolError("expected carriage return");
}
remaining--;
buffer++;
state_ = State::LF;
break;
}
case State::LF: {
ENVOY_LOG(trace, "parse slice: LF");
if (buffer[0] != '\n') {
throw ProtocolError("expected new line");
}
remaining--;
buffer++;
state_ = State::ValueComplete;
break;
}
case State::SimpleString: {
ENVOY_LOG(trace, "parse slice: SimpleString: {}", buffer[0]);
if (buffer[0] == '\r') {
state_ = State::LF;
} else {
pending_value_stack_.front().value_->asString().push_back(buffer[0]);
}
remaining--;
buffer++;
break;
}
case State::ValueComplete: {
ENVOY_LOG(trace, "parse slice: ValueComplete");
ASSERT(!pending_value_stack_.empty());
pending_value_stack_.pop_front();
if (pending_value_stack_.empty()) {
callbacks_.onRespValue(std::move(pending_value_root_));
state_ = State::ValueRootStart;
} else {
PendingValue& current_value = pending_value_stack_.front();
ASSERT(current_value.value_->type() == RespType::Array);
if (current_value.current_array_element_ < current_value.value_->asArray().size() - 1) {
current_value.current_array_element_++;
pending_value_stack_.push_front(
{¤t_value.value_->asArray()[current_value.current_array_element_], 0});
state_ = State::ValueStart;
}
}
break;
}
}
}
}