Status ShowTxn()

in src/kudu/tools/tool_action_txn.cc [318:441]


Status ShowTxn(const RunnerContext& context) {
  const auto& txn_id_str = FindOrDie(context.required_args, kTxnIdArg);
  int64_t txn_id = -1;
  if (!SimpleAtoi(txn_id_str, &txn_id) || txn_id < 0) {
    return Status::InvalidArgument(
        Substitute("Must supply a valid transaction ID: $0", txn_id_str));
  }
  vector<string> txn_field_names;
  vector<ListTxnsField> txn_fields;
  vector<string> participant_field_names;
  vector<ParticipantField> participant_fields;
  RETURN_NOT_OK(GetFields(&txn_field_names, &txn_fields,
                          &participant_field_names, &participant_fields));
  DCHECK_EQ(txn_field_names.size(), txn_fields.size());
  DCHECK_EQ(participant_field_names.size(), participant_fields.size());

  // First set up our clients so we can be sure we can connect to the cluster.
  std::unique_ptr<TxnSystemClient> txn_client;
  vector<HostPort> master_hps;
  vector<string> master_addresses;
  RETURN_NOT_OK(ParseMasterAddresses(context, kMasterAddressesArg, &master_addresses));
  for (const auto& m : master_addresses) {
    HostPort hp;
    RETURN_NOT_OK(hp.ParseString(m, master::Master::kDefaultPort));
    master_hps.emplace_back(hp);
  }
  RETURN_NOT_OK(TxnSystemClient::Create(master_hps,
                                        FLAGS_sasl_protocol_name,
                                        &txn_client));
  RETURN_NOT_OK(txn_client->OpenTxnStatusTable());
  shared_ptr<KuduClient> client;
  RETURN_NOT_OK(CreateKuduClient(context, &client));

  // Scan the transaction status table for its table entries.
  shared_ptr<KuduTable> table;
  RETURN_NOT_OK(client->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
  KuduScanner scanner(table.get());
  RETURN_NOT_OK(scanner.SetFaultTolerant());
  RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
  auto* pred = table->NewComparisonPredicate(
      TxnStatusTablet::kTxnIdColName, KuduPredicate::EQUAL, KuduValue::FromInt(txn_id));
  RETURN_NOT_OK(scanner.AddConjunctPredicate(pred));
  RETURN_NOT_OK(scanner.Open());
  KuduScanBatch batch;

  // Extract transaction statuses and participant IDs from the results.
  vector<string> participant_ids;
  DataTable txn_table(txn_field_names);
  while (scanner.HasMoreRows()) {
    RETURN_NOT_OK(scanner.NextBatch(&batch));
    for (const auto& iter : batch) {
      Slice metadata_bytes;
      string metadata;
      RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes));
      int8_t entry_type;
      int64_t fetched_txn_id;
      RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type));
      if (entry_type == TxnStatusTablet::TRANSACTION && !txn_fields.empty()) {
        TxnStatusEntryPB txn_entry_pb;
        RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, metadata_bytes.data(),
                                              metadata_bytes.size()));
        RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &fetched_txn_id));
        DCHECK_EQ(txn_id, fetched_txn_id);
        AddTxnStatusRow(txn_fields, fetched_txn_id, txn_entry_pb, &txn_table);
      } else if (entry_type == TxnStatusTablet::PARTICIPANT && !participant_fields.empty()) {
        Slice participant_id;
        RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kIdentifierColName, &participant_id));
        participant_ids.emplace_back(participant_id.ToString());
      }
    }
  }
  if (!txn_fields.empty()) {
    RETURN_NOT_OK(txn_table.PrintTo(std::cout));
    std::cout << std::endl;
  }
  if (participant_field_names.empty()) {
    return Status::OK();
  }

  // Get further details from the participants.
  DataTable participant_table(participant_field_names);
  for (const auto& id : participant_ids) {
    TxnMetadataPB meta_pb;
    ParticipantOpPB pb;
    pb.set_txn_id(txn_id);
    pb.set_type(ParticipantOpPB::GET_METADATA);
    RETURN_NOT_OK(txn_client->ParticipateInTransaction(
        id, pb, MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms),
        /*begin_commit_timestamp=*/nullptr, &meta_pb));
    vector<string> col_vals;
    for (const auto& field : participant_fields) {
      switch (field) {
        case ParticipantField::kTabletId:
          col_vals.emplace_back(id);
          break;
        case ParticipantField::kAborted:
          col_vals.emplace_back(meta_pb.aborted() ? "true" : "false");
          break;
        case ParticipantField::kFlushedCommittedMrs:
          col_vals.emplace_back(meta_pb.flushed_committed_mrs() ? "true" : "false");
          break;
        case ParticipantField::kBeginCommitDateTime:
          col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
              HybridTimeToDateTime(meta_pb.commit_mvcc_op_timestamp()) : "<none>");
          break;
        case ParticipantField::kBeginCommitHybridTime:
          col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
              HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_mvcc_op_timestamp())) :
              "<none>");
          break;
        case ParticipantField::kCommitDateTime:
          col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
              HybridTimeToDateTime(meta_pb.commit_timestamp()) : "<none>");
          break;
        case ParticipantField::kCommitHybridTime:
          col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
              HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_timestamp())) : "<none>");
          break;
      }
    }
    participant_table.AddRow(std::move(col_vals));
  }
  return participant_table.PrintTo(std::cout);
}