in trace_replay/trace_replay.cc [129:341]
Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
if (record != nullptr) {
record->reset(nullptr);
}
switch (trace->type) {
// Write
case kTraceWrite: {
PinnableSlice rep;
if (trace_file_version < 2) {
rep.PinSelf(trace->payload);
} else {
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
Slice write_batch_data;
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kWriteBatchData: {
GetLengthPrefixedSlice(&buf, &write_batch_data);
break;
}
default: {
assert(false);
}
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
rep.PinSelf(write_batch_data);
}
if (record != nullptr) {
record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
}
return Status::OK();
}
// Get
case kTraceGet: {
uint32_t cf_id = 0;
Slice get_key;
if (trace_file_version < 2) {
DecodeCFAndKey(trace->payload, &cf_id, &get_key);
} else {
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kGetCFID: {
GetFixed32(&buf, &cf_id);
break;
}
case TracePayloadType::kGetKey: {
GetLengthPrefixedSlice(&buf, &get_key);
break;
}
default: {
assert(false);
}
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
}
if (record != nullptr) {
PinnableSlice ps;
ps.PinSelf(get_key);
record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
}
return Status::OK();
}
// Iterator Seek and SeekForPrev
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev: {
uint32_t cf_id = 0;
Slice iter_key;
Slice lower_bound;
Slice upper_bound;
if (trace_file_version < 2) {
DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
} else {
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kIterCFID: {
GetFixed32(&buf, &cf_id);
break;
}
case TracePayloadType::kIterKey: {
GetLengthPrefixedSlice(&buf, &iter_key);
break;
}
case TracePayloadType::kIterLowerBound: {
GetLengthPrefixedSlice(&buf, &lower_bound);
break;
}
case TracePayloadType::kIterUpperBound: {
GetLengthPrefixedSlice(&buf, &upper_bound);
break;
}
default: {
assert(false);
}
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
}
if (record != nullptr) {
PinnableSlice ps_key;
ps_key.PinSelf(iter_key);
PinnableSlice ps_lower;
ps_lower.PinSelf(lower_bound);
PinnableSlice ps_upper;
ps_upper.PinSelf(upper_bound);
record->reset(new IteratorSeekQueryTraceRecord(
static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
trace->ts));
}
return Status::OK();
}
// MultiGet
case kTraceMultiGet: {
if (trace_file_version < 2) {
return Status::Corruption("MultiGet is not supported.");
}
uint32_t multiget_size = 0;
std::vector<uint32_t> cf_ids;
std::vector<PinnableSlice> multiget_keys;
Slice cfids_payload;
Slice keys_payload;
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kMultiGetSize: {
GetFixed32(&buf, &multiget_size);
break;
}
case TracePayloadType::kMultiGetCFIDs: {
GetLengthPrefixedSlice(&buf, &cfids_payload);
break;
}
case TracePayloadType::kMultiGetKeys: {
GetLengthPrefixedSlice(&buf, &keys_payload);
break;
}
default: {
assert(false);
}
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
if (multiget_size == 0) {
return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
}
// Decode the cfids_payload and keys_payload
cf_ids.reserve(multiget_size);
multiget_keys.reserve(multiget_size);
for (uint32_t i = 0; i < multiget_size; i++) {
uint32_t tmp_cfid;
Slice tmp_key;
GetFixed32(&cfids_payload, &tmp_cfid);
GetLengthPrefixedSlice(&keys_payload, &tmp_key);
cf_ids.push_back(tmp_cfid);
Slice s(tmp_key);
PinnableSlice ps;
ps.PinSelf(s);
multiget_keys.push_back(std::move(ps));
}
if (record != nullptr) {
record->reset(new MultiGetQueryTraceRecord(
std::move(cf_ids), std::move(multiget_keys), trace->ts));
}
return Status::OK();
}
default:
return Status::NotSupported("Unsupported trace type.");
}
}