bool SiteToSiteClient::receive()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [525:642]


bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
  std::shared_ptr<Transaction> transaction;

  if (peer_state_ != READY) {
    bootstrap();
  }

  if (peer_state_ != READY) {
    return false;
  }

  auto it = this->known_transactions_.find(transactionID);

  if (it == known_transactions_.end()) {
    return false;
  }

  transaction = it->second;

  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
    logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transactionID.to_string());
    return false;
  }

  if (transaction->getDirection() != RECEIVE) {
    logger_->log_warn("Site2Site transaction {} direction is wrong", transactionID.to_string());
    return false;
  }

  if (!transaction->isDataAvailable()) {
    eof = true;
    return true;
  }

  if (transaction->current_transfers_ > 0) {
    // if we already has transfer before, check to see whether another one is available
    RespondCode code = RESERVED;
    std::string message;

    if (readResponse(transaction, code, message) <= 0) {
      return false;
    }
    if (code == CONTINUE_TRANSACTION) {
      logger_->log_debug("Site2Site transaction {} peer indicate continue transaction", transactionID.to_string());
      transaction->_dataAvailable = true;
    } else if (code == FINISH_TRANSACTION) {
      logger_->log_debug("Site2Site transaction {} peer indicate finish transaction", transactionID.to_string());
      transaction->_dataAvailable = false;
      eof = true;
      return true;
    } else {
      logger_->log_debug("Site2Site transaction {} peer indicate wrong respond code {}", transactionID.to_string(), magic_enum::enum_underlying(code));
      return false;
    }
  }

  if (!transaction->isDataAvailable()) {
    logger_->log_debug("No data is available");
    eof = true;
    return true;
  }

  // start to read the packet
  uint32_t numAttributes = 0;
  {
    const auto ret = transaction->getStream().read(numAttributes);
    if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
      return false;
    }
  }

  // read the attributes
  logger_->log_debug("Site2Site transaction {} receives attribute key {}", transactionID.to_string(), numAttributes);
  for (unsigned int i = 0; i < numAttributes; i++) {
    std::string key;
    std::string value;
    {
      const auto ret = transaction->getStream().read(key, true);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    {
      const auto ret = transaction->getStream().read(value, true);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    packet->_attributes[key] = value;
    logger_->log_debug("Site2Site transaction {} receives attribute key {} value {}", transactionID.to_string(), key, value);
  }

  uint64_t len = 0;
  {
    const auto ret = transaction->getStream().read(len);
    if (ret == 0 || io::isError(ret)) {
      return false;
    }
  }

  packet->_size = len;
  if (len > 0 || numAttributes > 0) {
    transaction->current_transfers_++;
    transaction->total_transfers_++;
  } else {
    logger_->log_warn("Site2Site transaction {} empty flow file without attribute", transactionID.to_string());
    transaction->_dataAvailable = false;
    eof = true;
    return true;
  }
  transaction->_state = DATA_EXCHANGED;
  transaction->_bytes += len;

  logger_->log_info("Site to Site transaction {} received flow record {}, total length {}, added {}",
      transactionID.to_string(), transaction->total_transfers_, transaction->_bytes, len);

  return true;
}