in libminifi/src/sitetosite/SiteToSiteClient.cpp [191:295]
bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
int ret;
std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
}
if (peer_state_ != READY) {
return false;
}
auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return false;
}
transaction = it->second;
if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() &&
transaction->getDirection() == RECEIVE) {
transaction->_state = TRANSACTION_CONFIRMED;
return true;
}
if (transaction->getState() != DATA_EXCHANGED) {
return false;
}
if (transaction->getDirection() == RECEIVE) {
if (transaction->isDataAvailable()) {
return false;
}
// we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
// to peer so that we can verify that the connection is still open. This is a two-phase commit,
// which helps to prevent the chances of data duplication. Without doing this, we may commit the
// session and then when we send the response back to the peer, the peer may have timed out and may not
// be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
// Critical Section involved in this transaction so that rather than the Critical Section being the
// time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
uint64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
logger_->log_debug("Site2Site Receive confirm with CRC %llu to transaction %s", crcValue, transactionID.to_string());
ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
if (ret <= 0)
return false;
RespondCode code;
std::string message;
readResponse(transaction, code, message);
if (ret <= 0)
return false;
if (code == CONFIRM_TRANSACTION) {
logger_->log_debug("Site2Site transaction %s peer confirm transaction", transactionID.to_string());
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else if (code == BAD_CHECKSUM) {
logger_->log_debug("Site2Site transaction %s peer indicate bad checksum", transactionID.to_string());
return false;
} else {
logger_->log_debug("Site2Site transaction %s peer unknown response code %d", transactionID.to_string(), code);
return false;
}
} else {
logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.to_string());
ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
if (ret <= 0) {
return false;
}
RespondCode code;
std::string message;
readResponse(transaction, code, message);
// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
if (code == CONFIRM_TRANSACTION) {
logger_->log_debug("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.to_string(), message);
if (this->_currentVersion > 3) {
uint64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
if (message == crc) {
logger_->log_debug("Site2Site transaction %s CRC matched", transactionID.to_string());
ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
if (ret <= 0)
return false;
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else {
logger_->log_debug("Site2Site transaction %s CRC not matched %s", transactionID.to_string(), crc);
writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM");
return false;
}
}
ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
if (ret <= 0)
return false;
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else {
logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.to_string(), code);
return false;
}
return false;
}
}