in libminifi/src/sitetosite/SiteToSiteClient.cpp [401:523]
int16_t SiteToSiteClient::send(const utils::Identifier& transactionID, DataPacket* packet, const std::shared_ptr<core::FlowFile> &flowFile, core::ProcessSession* session) {
if (peer_state_ != READY) {
bootstrap();
}
if (peer_state_ != READY) {
return -1;
}
auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return -1;
}
std::shared_ptr<Transaction> transaction = it->second;
if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transactionID.to_string());
return -1;
}
if (transaction->getDirection() != SEND) {
logger_->log_warn("Site2Site transaction {} direction is wrong", transactionID.to_string());
return -1;
}
if (transaction->current_transfers_ > 0) {
const auto ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
if (ret <= 0) {
return -1;
}
}
// start to read the packet
{
const auto numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size());
const auto ret = transaction->getStream().write(numAttributes);
if (ret != 4) {
return -1;
}
}
for (const auto& attribute : packet->_attributes) {
{
const auto ret = transaction->getStream().write(attribute.first, true);
if (ret == 0 || io::isError(ret)) {
return -1;
}
}
{
const auto ret = transaction->getStream().write(attribute.second, true);
if (ret == 0 || io::isError(ret)) {
return -1;
}
}
logger_->log_debug("Site2Site transaction {} send attribute key {} value {}", transactionID.to_string(), attribute.first, attribute.second);
}
bool flowfile_has_content = (flowFile != nullptr);
if (flowFile && (flowFile->getResourceClaim() == nullptr || !flowFile->getResourceClaim()->exists())) {
auto path = flowFile->getResourceClaim() != nullptr ? flowFile->getResourceClaim()->getContentFullPath() : "nullclaim";
logger_->log_debug("Claim {} does not exist for FlowFile {}", path, flowFile->getUUIDStr());
flowfile_has_content = false;
}
uint64_t len = 0;
if (flowFile && flowfile_has_content && session) {
len = flowFile->getSize();
const auto ret = transaction->getStream().write(len);
if (ret != 8) {
logger_->log_debug("Failed to write content size!");
return -1;
}
if (flowFile->getSize() > 0) {
session->read(flowFile, [packet](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
const auto result = internal::pipe(*input_stream, packet->transaction_->getStream());
if (result == -1) return -1;
packet->_size = gsl::narrow<size_t>(result);
return result;
});
if (flowFile->getSize() != packet->_size) {
logger_->log_debug("Mismatched sizes {} {}", flowFile->getSize(), packet->_size);
return -2;
}
}
if (packet->payload_.length() == 0 && len == 0) {
if (flowFile->getResourceClaim() == nullptr)
logger_->log_trace("no claim");
else
logger_->log_trace("Flowfile empty {}", flowFile->getResourceClaim()->getContentFullPath());
}
} else if (packet->payload_.length() > 0) {
len = packet->payload_.length();
{
const auto ret = transaction->getStream().write(len);
if (ret != 8) {
return -1;
}
}
{
const auto ret = transaction->getStream().write(reinterpret_cast<const uint8_t*>(packet->payload_.c_str()), gsl::narrow<size_t>(len));
if (ret != gsl::narrow<size_t>(len)) {
logger_->log_debug("Failed to write payload size!");
return -1;
}
}
packet->_size += len;
} else if (flowFile && !flowfile_has_content) {
const auto ret = transaction->getStream().write(len); // Indicate zero length
if (ret != 8) {
logger_->log_debug("Failed to write content size (0)!");
return -1;
}
}
transaction->current_transfers_++;
transaction->total_transfers_++;
transaction->_state = DATA_EXCHANGED;
transaction->_bytes += len;
logger_->log_info("Site to Site transaction {} sent flow {} flow records, with total size {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
return 0;
}