in src/kudu/tools/tool_action_table.cc [682:851]
Status LocateRow(const RunnerContext& context) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
// Create an equality predicate for each primary key column.
const string& row_str = FindOrDie(context.required_args, kKeyArg);
JsonReader reader(row_str);
RETURN_NOT_OK(reader.Init());
vector<const rapidjson::Value*> values;
RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
/*field=*/nullptr,
&values));
const auto& schema = table->schema();
vector<int> key_indexes;
schema.GetPrimaryKeyColumnIndexes(&key_indexes);
if (values.size() != key_indexes.size()) {
return Status::InvalidArgument(
Substitute("wrong number of key columns specified: expected $0 but received $1",
key_indexes.size(),
values.size()));
}
vector<unique_ptr<KuduPredicate>> predicates;
for (int i = 0; i < values.size(); i++) {
const auto key_index = key_indexes[i];
const auto& column = schema.Column(key_index);
const auto& col_name = column.name();
const auto type = column.type();
switch (type) {
case KuduColumnSchema::INT8:
case KuduColumnSchema::INT16:
case KuduColumnSchema::INT32:
case KuduColumnSchema::INT64:
case KuduColumnSchema::DATE:
case KuduColumnSchema::SERIAL:
case KuduColumnSchema::UNIXTIME_MICROS: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromInt(value)));
break;
}
case KuduColumnSchema::BINARY:
case KuduColumnSchema::STRING:
case KuduColumnSchema::VARCHAR: {
string value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::CopyString(value)));
break;
}
case KuduColumnSchema::BOOL: {
// As of the writing of this tool, BOOL is not a supported key column
// type, but just in case it becomes one, we pre-load support for it.
bool value;
RETURN_NOT_OK_PREPEND(
reader.ExtractBool(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromBool(value)));
break;
}
case KuduColumnSchema::FLOAT:
case KuduColumnSchema::DOUBLE: {
// Like BOOL, as of the writing of this tool, floating point types are
// not supported for key columns, but we can pre-load support for them
// in case they become supported.
double value;
RETURN_NOT_OK_PREPEND(
reader.ExtractDouble(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromDouble(value)));
break;
}
case KuduColumnSchema::DECIMAL:
return Status::NotSupported(
Substitute("unsupported type $0 for key column '$1': "
"$0 key columns are not supported by this tool",
KuduColumnSchema::DataTypeToString(type),
col_name));
default:
return Status::NotSupported(
Substitute("unsupported type $0 for key column '$1': "
"is this tool out of date?",
KuduColumnSchema::DataTypeToString(type),
col_name));
}
}
// Find the tablet by constructing scan tokens for a scan with equality
// predicates on all key columns. At most one tablet will match, so there
// will be at most one token, and we can report the id of its tablet.
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
// In case we go on to check for existence of the row.
RETURN_NOT_OK(builder.SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY));
for (auto& predicate : predicates) {
RETURN_NOT_OK(builder.AddConjunctPredicate(predicate.release()));
}
RETURN_NOT_OK(builder.Build(&tokens));
if (tokens.empty()) {
// Must be in a non-covered range partition.
return Status::NotFound("row does not belong to any currently existing tablet");
}
if (tokens.size() > 1) {
// This should be impossible. But if it does happen, we'd like to know what
// all the matching tablets were.
for (const auto& token : tokens) {
cerr << token->tablet().id() << endl;
}
return Status::IllegalState(Substitute(
"all primary key columns specified but found $0 matching tablets!",
tokens.size()));
}
cout << tokens[0]->tablet().id() << endl;
if (FLAGS_check_row_existence) {
KuduScanner* scanner_ptr;
RETURN_NOT_OK(tokens[0]->IntoKuduScanner(&scanner_ptr));
unique_ptr<KuduScanner> scanner(scanner_ptr);
RETURN_NOT_OK(scanner->Open());
vector<string> row_str;
client::KuduScanBatch batch;
while (scanner->HasMoreRows()) {
RETURN_NOT_OK(scanner->NextBatch(&batch));
for (const auto& row : batch) {
row_str.emplace_back(row.ToString());
}
}
if (row_str.empty()) {
return Status::NotFound("row does not exist");
}
// There should be exactly one result, but if somehow there are more, print
// them all before returning an error.
cout << JoinStrings(row_str, "\n") << endl;
if (row_str.size() != 1) {
// This should be impossible.
return Status::IllegalState(
Substitute("expected 1 row but received $0", row_str.size()));
}
}
return Status::OK();
}