in src/kudu/tools/tool_action_txn.cc [318:441]
Status ShowTxn(const RunnerContext& context) {
const auto& txn_id_str = FindOrDie(context.required_args, kTxnIdArg);
int64_t txn_id = -1;
if (!SimpleAtoi(txn_id_str, &txn_id) || txn_id < 0) {
return Status::InvalidArgument(
Substitute("Must supply a valid transaction ID: $0", txn_id_str));
}
vector<string> txn_field_names;
vector<ListTxnsField> txn_fields;
vector<string> participant_field_names;
vector<ParticipantField> participant_fields;
RETURN_NOT_OK(GetFields(&txn_field_names, &txn_fields,
&participant_field_names, &participant_fields));
DCHECK_EQ(txn_field_names.size(), txn_fields.size());
DCHECK_EQ(participant_field_names.size(), participant_fields.size());
// First set up our clients so we can be sure we can connect to the cluster.
std::unique_ptr<TxnSystemClient> txn_client;
vector<HostPort> master_hps;
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, kMasterAddressesArg, &master_addresses));
for (const auto& m : master_addresses) {
HostPort hp;
RETURN_NOT_OK(hp.ParseString(m, master::Master::kDefaultPort));
master_hps.emplace_back(hp);
}
RETURN_NOT_OK(TxnSystemClient::Create(master_hps,
FLAGS_sasl_protocol_name,
&txn_client));
RETURN_NOT_OK(txn_client->OpenTxnStatusTable());
shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
// Scan the transaction status table for its table entries.
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
KuduScanner scanner(table.get());
RETURN_NOT_OK(scanner.SetFaultTolerant());
RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
auto* pred = table->NewComparisonPredicate(
TxnStatusTablet::kTxnIdColName, KuduPredicate::EQUAL, KuduValue::FromInt(txn_id));
RETURN_NOT_OK(scanner.AddConjunctPredicate(pred));
RETURN_NOT_OK(scanner.Open());
KuduScanBatch batch;
// Extract transaction statuses and participant IDs from the results.
vector<string> participant_ids;
DataTable txn_table(txn_field_names);
while (scanner.HasMoreRows()) {
RETURN_NOT_OK(scanner.NextBatch(&batch));
for (const auto& iter : batch) {
Slice metadata_bytes;
string metadata;
RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes));
int8_t entry_type;
int64_t fetched_txn_id;
RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type));
if (entry_type == TxnStatusTablet::TRANSACTION && !txn_fields.empty()) {
TxnStatusEntryPB txn_entry_pb;
RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, metadata_bytes.data(),
metadata_bytes.size()));
RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &fetched_txn_id));
DCHECK_EQ(txn_id, fetched_txn_id);
AddTxnStatusRow(txn_fields, fetched_txn_id, txn_entry_pb, &txn_table);
} else if (entry_type == TxnStatusTablet::PARTICIPANT && !participant_fields.empty()) {
Slice participant_id;
RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kIdentifierColName, &participant_id));
participant_ids.emplace_back(participant_id.ToString());
}
}
}
if (!txn_fields.empty()) {
RETURN_NOT_OK(txn_table.PrintTo(std::cout));
std::cout << std::endl;
}
if (participant_field_names.empty()) {
return Status::OK();
}
// Get further details from the participants.
DataTable participant_table(participant_field_names);
for (const auto& id : participant_ids) {
TxnMetadataPB meta_pb;
ParticipantOpPB pb;
pb.set_txn_id(txn_id);
pb.set_type(ParticipantOpPB::GET_METADATA);
RETURN_NOT_OK(txn_client->ParticipateInTransaction(
id, pb, MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms),
/*begin_commit_timestamp=*/nullptr, &meta_pb));
vector<string> col_vals;
for (const auto& field : participant_fields) {
switch (field) {
case ParticipantField::kTabletId:
col_vals.emplace_back(id);
break;
case ParticipantField::kAborted:
col_vals.emplace_back(meta_pb.aborted() ? "true" : "false");
break;
case ParticipantField::kFlushedCommittedMrs:
col_vals.emplace_back(meta_pb.flushed_committed_mrs() ? "true" : "false");
break;
case ParticipantField::kBeginCommitDateTime:
col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
HybridTimeToDateTime(meta_pb.commit_mvcc_op_timestamp()) : "<none>");
break;
case ParticipantField::kBeginCommitHybridTime:
col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_mvcc_op_timestamp())) :
"<none>");
break;
case ParticipantField::kCommitDateTime:
col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
HybridTimeToDateTime(meta_pb.commit_timestamp()) : "<none>");
break;
case ParticipantField::kCommitHybridTime:
col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_timestamp())) : "<none>");
break;
}
}
participant_table.AddRow(std::move(col_vals));
}
return participant_table.PrintTo(std::cout);
}