Status TracerHelper::DecodeTraceRecord()

in trace_replay/trace_replay.cc [129:341]


Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
                                       std::unique_ptr<TraceRecord>* record) {
  assert(trace != nullptr);

  if (record != nullptr) {
    record->reset(nullptr);
  }

  switch (trace->type) {
    // Write
    case kTraceWrite: {
      PinnableSlice rep;
      if (trace_file_version < 2) {
        rep.PinSelf(trace->payload);
      } else {
        Slice buf(trace->payload);
        GetFixed64(&buf, &trace->payload_map);
        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
        Slice write_batch_data;
        while (payload_map) {
          // Find the rightmost set bit.
          uint32_t set_pos =
              static_cast<uint32_t>(log2(payload_map & -payload_map));
          switch (set_pos) {
            case TracePayloadType::kWriteBatchData: {
              GetLengthPrefixedSlice(&buf, &write_batch_data);
              break;
            }
            default: {
              assert(false);
            }
          }
          // unset the rightmost bit.
          payload_map &= (payload_map - 1);
        }
        rep.PinSelf(write_batch_data);
      }

      if (record != nullptr) {
        record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
      }

      return Status::OK();
    }
    // Get
    case kTraceGet: {
      uint32_t cf_id = 0;
      Slice get_key;

      if (trace_file_version < 2) {
        DecodeCFAndKey(trace->payload, &cf_id, &get_key);
      } else {
        Slice buf(trace->payload);
        GetFixed64(&buf, &trace->payload_map);
        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
        while (payload_map) {
          // Find the rightmost set bit.
          uint32_t set_pos =
              static_cast<uint32_t>(log2(payload_map & -payload_map));
          switch (set_pos) {
            case TracePayloadType::kGetCFID: {
              GetFixed32(&buf, &cf_id);
              break;
            }
            case TracePayloadType::kGetKey: {
              GetLengthPrefixedSlice(&buf, &get_key);
              break;
            }
            default: {
              assert(false);
            }
          }
          // unset the rightmost bit.
          payload_map &= (payload_map - 1);
        }
      }

      if (record != nullptr) {
        PinnableSlice ps;
        ps.PinSelf(get_key);
        record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
      }

      return Status::OK();
    }
    // Iterator Seek and SeekForPrev
    case kTraceIteratorSeek:
    case kTraceIteratorSeekForPrev: {
      uint32_t cf_id = 0;
      Slice iter_key;
      Slice lower_bound;
      Slice upper_bound;

      if (trace_file_version < 2) {
        DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
      } else {
        Slice buf(trace->payload);
        GetFixed64(&buf, &trace->payload_map);
        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
        while (payload_map) {
          // Find the rightmost set bit.
          uint32_t set_pos =
              static_cast<uint32_t>(log2(payload_map & -payload_map));
          switch (set_pos) {
            case TracePayloadType::kIterCFID: {
              GetFixed32(&buf, &cf_id);
              break;
            }
            case TracePayloadType::kIterKey: {
              GetLengthPrefixedSlice(&buf, &iter_key);
              break;
            }
            case TracePayloadType::kIterLowerBound: {
              GetLengthPrefixedSlice(&buf, &lower_bound);
              break;
            }
            case TracePayloadType::kIterUpperBound: {
              GetLengthPrefixedSlice(&buf, &upper_bound);
              break;
            }
            default: {
              assert(false);
            }
          }
          // unset the rightmost bit.
          payload_map &= (payload_map - 1);
        }
      }

      if (record != nullptr) {
        PinnableSlice ps_key;
        ps_key.PinSelf(iter_key);
        PinnableSlice ps_lower;
        ps_lower.PinSelf(lower_bound);
        PinnableSlice ps_upper;
        ps_upper.PinSelf(upper_bound);
        record->reset(new IteratorSeekQueryTraceRecord(
            static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
            cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
            trace->ts));
      }

      return Status::OK();
    }
    // MultiGet
    case kTraceMultiGet: {
      if (trace_file_version < 2) {
        return Status::Corruption("MultiGet is not supported.");
      }

      uint32_t multiget_size = 0;
      std::vector<uint32_t> cf_ids;
      std::vector<PinnableSlice> multiget_keys;

      Slice cfids_payload;
      Slice keys_payload;
      Slice buf(trace->payload);
      GetFixed64(&buf, &trace->payload_map);
      int64_t payload_map = static_cast<int64_t>(trace->payload_map);
      while (payload_map) {
        // Find the rightmost set bit.
        uint32_t set_pos =
            static_cast<uint32_t>(log2(payload_map & -payload_map));
        switch (set_pos) {
          case TracePayloadType::kMultiGetSize: {
            GetFixed32(&buf, &multiget_size);
            break;
          }
          case TracePayloadType::kMultiGetCFIDs: {
            GetLengthPrefixedSlice(&buf, &cfids_payload);
            break;
          }
          case TracePayloadType::kMultiGetKeys: {
            GetLengthPrefixedSlice(&buf, &keys_payload);
            break;
          }
          default: {
            assert(false);
          }
        }
        // unset the rightmost bit.
        payload_map &= (payload_map - 1);
      }
      if (multiget_size == 0) {
        return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
      }

      // Decode the cfids_payload and keys_payload
      cf_ids.reserve(multiget_size);
      multiget_keys.reserve(multiget_size);
      for (uint32_t i = 0; i < multiget_size; i++) {
        uint32_t tmp_cfid;
        Slice tmp_key;
        GetFixed32(&cfids_payload, &tmp_cfid);
        GetLengthPrefixedSlice(&keys_payload, &tmp_key);
        cf_ids.push_back(tmp_cfid);
        Slice s(tmp_key);
        PinnableSlice ps;
        ps.PinSelf(s);
        multiget_keys.push_back(std::move(ps));
      }

      if (record != nullptr) {
        record->reset(new MultiGetQueryTraceRecord(
            std::move(cf_ids), std::move(multiget_keys), trace->ts));
      }

      return Status::OK();
    }
    default:
      return Status::NotSupported("Unsupported trace type.");
  }
}