in libminifi/src/sitetosite/SiteToSiteClient.cpp [520:636]
bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
}
if (peer_state_ != READY) {
return false;
}
auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return false;
}
transaction = it->second;
if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID.to_string());
return false;
}
if (transaction->getDirection() != RECEIVE) {
logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID.to_string());
return false;
}
if (!transaction->isDataAvailable()) {
eof = true;
return true;
}
if (transaction->current_transfers_ > 0) {
// if we already has transfer before, check to see whether another one is available
RespondCode code;
std::string message;
if (readResponse(transaction, code, message) <= 0) {
return false;
}
if (code == CONTINUE_TRANSACTION) {
logger_->log_debug("Site2Site transaction %s peer indicate continue transaction", transactionID.to_string());
transaction->_dataAvailable = true;
} else if (code == FINISH_TRANSACTION) {
logger_->log_debug("Site2Site transaction %s peer indicate finish transaction", transactionID.to_string());
transaction->_dataAvailable = false;
eof = true;
return true;
} else {
logger_->log_debug("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.to_string(), code);
return false;
}
}
if (!transaction->isDataAvailable()) {
logger_->log_debug("No data is available");
eof = true;
return true;
}
// start to read the packet
uint32_t numAttributes;
{
const auto ret = transaction->getStream().read(numAttributes);
if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
return false;
}
}
// read the attributes
logger_->log_debug("Site2Site transaction %s receives attribute key %d", transactionID.to_string(), numAttributes);
for (unsigned int i = 0; i < numAttributes; i++) {
std::string key;
std::string value;
{
const auto ret = transaction->getStream().read(key, true);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = transaction->getStream().read(value, true);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
packet->_attributes[key] = value;
logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
}
uint64_t len;
{
const auto ret = transaction->getStream().read(len);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
packet->_size = len;
if (len > 0 || numAttributes > 0) {
transaction->current_transfers_++;
transaction->total_transfers_++;
} else {
logger_->log_warn("Site2Site transaction %s empty flow file without attribute", transactionID.to_string());
transaction->_dataAvailable = false;
eof = true;
return true;
}
transaction->_state = DATA_EXCHANGED;
transaction->_bytes += len;
core::logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " received flow record " << transaction->total_transfers_
<< ", total length " << transaction->_bytes << ", added " << len;
return true;
}