utils/kvrocks2redis/parser.cc (137 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "parser.h"
#include <rocksdb/write_batch.h>
#include <memory>
#include "cluster/redis_slot.h"
#include "db_util.h"
#include "logging.h"
#include "server/redis_reply.h"
#include "storage/redis_metadata.h"
#include "types/redis_string.h"
Status Parser::ParseFullDB() {
rocksdb::DB *db = storage_->GetDB();
rocksdb::ColumnFamilyHandle *metadata_cf_handle = storage_->GetCFHandle(ColumnFamilyID::Metadata);
// Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based read", we don't need to set the snapshot
// parameter. However, until we proactively invoke TryCatchUpWithPrimary, this replica is read-only, which can be
// considered as a snapshot.
rocksdb::ReadOptions read_options;
read_options.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_options, metadata_cf_handle));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone);
auto ds = metadata.Decode(iter->value());
if (!ds.ok()) {
continue;
}
if (metadata.Expired()) { // ignore the expired key
continue;
}
Status s;
if (metadata.Type() == kRedisString) {
s = parseSimpleKV(iter->key(), iter->value(), metadata.expire);
} else {
s = parseComplexKV(iter->key(), metadata);
}
if (!s.IsOK()) return s;
}
return Status::OK();
}
Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t expire) {
auto [ns, user_key] = ExtractNamespaceKey<std::string>(ns_key, slot_id_encoded_);
auto command =
redis::ArrayOfBulkStrings({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
Status s = writer_->Write(ns, {command});
if (!s.IsOK()) return s;
if (expire > 0) {
command = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(expire / 1000)});
s = writer_->Write(ns, {command});
}
return s;
}
Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
RedisType type = metadata.Type();
if (type < kRedisHash || type > kRedisSortedint) {
return {Status::NotOK, "unknown metadata type: " + std::to_string(type)};
}
auto [ns, user_key] = ExtractNamespaceKey<std::string>(ns_key, slot_id_encoded_);
std::string prefix_key = InternalKey(ns_key, "", metadata.version, slot_id_encoded_).Encode();
std::string next_version_prefix_key = InternalKey(ns_key, "", metadata.version + 1, slot_id_encoded_).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
std::string output;
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
auto iter = util::UniqueIterator(no_txn_ctx, read_options);
for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(prefix_key)) {
break;
}
InternalKey ikey(iter->key(), slot_id_encoded_);
std::string sub_key = ikey.GetSubKey().ToString();
std::string value = iter->value().ToString();
switch (type) {
case kRedisHash:
output = redis::ArrayOfBulkStrings({"HSET", user_key, sub_key, value});
break;
case kRedisSet:
output = redis::ArrayOfBulkStrings({"SADD", user_key, sub_key});
break;
case kRedisList:
output = redis::ArrayOfBulkStrings({"RPUSH", user_key, value});
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
output = redis::ArrayOfBulkStrings({"ZADD", user_key, util::Float2String(score), sub_key});
break;
}
case kRedisBitmap: {
int index = std::stoi(sub_key);
auto s = Parser::parseBitmapSegment(ns, user_key, index, value);
if (!s.IsOK()) return s.Prefixed("failed to parse bitmap segment");
break;
}
case kRedisSortedint: {
std::string val = std::to_string(DecodeFixed64(ikey.GetSubKey().data()));
output = redis::ArrayOfBulkStrings({"ZADD", user_key, val, val});
break;
}
default:
break; // should never get here
}
if (type != kRedisBitmap) {
auto s = writer_->Write(ns, {output});
if (!s.IsOK()) return s.Prefixed(fmt::format("failed to write the '{}' command to AOF", output));
}
}
if (metadata.expire > 0) {
output = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)});
Status s = writer_->Write(ns, {output});
if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF");
}
return Status::OK();
}
Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int index, const Slice &bitmap) {
Status s;
for (size_t i = 0; i < bitmap.size(); i++) {
if (bitmap[i] == 0) continue; // ignore zero byte
for (int j = 0; j < 8; j++) {
if (!(bitmap[i] & (1 << j))) continue; // ignore zero bit
s = writer_->Write(
ns.ToString(),
{redis::ArrayOfBulkStrings({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})});
if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF");
}
}
return Status::OK();
}
Status Parser::ParseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
WriteBatchExtractor write_batch_extractor(slot_id_encoded_, -1, true);
auto db_status = write_batch.Iterate(&write_batch_extractor);
if (!db_status.ok())
return {Status::NotOK, fmt::format("failed to iterate over the write batch: {}", db_status.ToString())};
auto resp_commands = write_batch_extractor.GetRESPCommands();
for (const auto &iter : *resp_commands) {
auto s = writer_->Write(iter.first, iter.second);
if (!s.IsOK()) {
error("Failed to write to AOF from the write batch. Error: {}", s.Msg());
}
}
return Status::OK();
}