bool SiteToSiteClient::confirm()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [191:295]


bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
  int ret;
  std::shared_ptr<Transaction> transaction;

  if (peer_state_ != READY) {
    bootstrap();
  }

  if (peer_state_ != READY) {
    return false;
  }

  auto it = this->known_transactions_.find(transactionID);

  if (it == known_transactions_.end()) {
    return false;
  }
  transaction = it->second;


  if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() &&
      transaction->getDirection() == RECEIVE) {
    transaction->_state = TRANSACTION_CONFIRMED;
    return true;
  }

  if (transaction->getState() != DATA_EXCHANGED) {
    return false;
  }

  if (transaction->getDirection() == RECEIVE) {
    if (transaction->isDataAvailable()) {
      return false;
    }
    // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
    // to peer so that we can verify that the connection is still open. This is a two-phase commit,
    // which helps to prevent the chances of data duplication. Without doing this, we may commit the
    // session and then when we send the response back to the peer, the peer may have timed out and may not
    // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
    // Critical Section involved in this transaction so that rather than the Critical Section being the
    // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
    uint64_t crcValue = transaction->getCRC();
    std::string crc = std::to_string(crcValue);
    logger_->log_debug("Site2Site Receive confirm with CRC %llu to transaction %s", crcValue, transactionID.to_string());
    ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
    if (ret <= 0)
      return false;
    RespondCode code;
    std::string message;
    readResponse(transaction, code, message);
    if (ret <= 0)
      return false;

    if (code == CONFIRM_TRANSACTION) {
      logger_->log_debug("Site2Site transaction %s peer confirm transaction", transactionID.to_string());
      transaction->_state = TRANSACTION_CONFIRMED;
      return true;
    } else if (code == BAD_CHECKSUM) {
      logger_->log_debug("Site2Site transaction %s peer indicate bad checksum", transactionID.to_string());
      return false;
    } else {
      logger_->log_debug("Site2Site transaction %s peer unknown response code %d", transactionID.to_string(), code);
      return false;
    }
  } else {
    logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.to_string());
    ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
    if (ret <= 0) {
      return false;
    }
    RespondCode code;
    std::string message;
    readResponse(transaction, code, message);

    // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
    if (code == CONFIRM_TRANSACTION) {
      logger_->log_debug("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.to_string(), message);
      if (this->_currentVersion > 3) {
        uint64_t crcValue = transaction->getCRC();
        std::string crc = std::to_string(crcValue);
        if (message == crc) {
          logger_->log_debug("Site2Site transaction %s CRC matched", transactionID.to_string());
          ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
          if (ret <= 0)
            return false;
          transaction->_state = TRANSACTION_CONFIRMED;
          return true;
        } else {
          logger_->log_debug("Site2Site transaction %s CRC not matched %s", transactionID.to_string(), crc);
          writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM");
          return false;
        }
      }
      ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
      if (ret <= 0)
        return false;
      transaction->_state = TRANSACTION_CONFIRMED;
      return true;
    } else {
      logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.to_string(), code);
      return false;
    }
    return false;
  }
}