bool SiteToSiteClient::receive()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [520:636]


bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
  std::shared_ptr<Transaction> transaction;

  if (peer_state_ != READY) {
    bootstrap();
  }

  if (peer_state_ != READY) {
    return false;
  }

  auto it = this->known_transactions_.find(transactionID);

  if (it == known_transactions_.end()) {
    return false;
  }

  transaction = it->second;

  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
    logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID.to_string());
    return false;
  }

  if (transaction->getDirection() != RECEIVE) {
    logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID.to_string());
    return false;
  }

  if (!transaction->isDataAvailable()) {
    eof = true;
    return true;
  }

  if (transaction->current_transfers_ > 0) {
    // if we already has transfer before, check to see whether another one is available
    RespondCode code;
    std::string message;

    if (readResponse(transaction, code, message) <= 0) {
      return false;
    }
    if (code == CONTINUE_TRANSACTION) {
      logger_->log_debug("Site2Site transaction %s peer indicate continue transaction", transactionID.to_string());
      transaction->_dataAvailable = true;
    } else if (code == FINISH_TRANSACTION) {
      logger_->log_debug("Site2Site transaction %s peer indicate finish transaction", transactionID.to_string());
      transaction->_dataAvailable = false;
      eof = true;
      return true;
    } else {
      logger_->log_debug("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.to_string(), code);
      return false;
    }
  }

  if (!transaction->isDataAvailable()) {
    logger_->log_debug("No data is available");
    eof = true;
    return true;
  }

  // start to read the packet
  uint32_t numAttributes;
  {
    const auto ret = transaction->getStream().read(numAttributes);
    if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
      return false;
    }
  }

  // read the attributes
  logger_->log_debug("Site2Site transaction %s receives attribute key %d", transactionID.to_string(), numAttributes);
  for (unsigned int i = 0; i < numAttributes; i++) {
    std::string key;
    std::string value;
    {
      const auto ret = transaction->getStream().read(key, true);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    {
      const auto ret = transaction->getStream().read(value, true);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    packet->_attributes[key] = value;
    logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
  }

  uint64_t len;
  {
    const auto ret = transaction->getStream().read(len);
    if (ret == 0 || io::isError(ret)) {
      return false;
    }
  }

  packet->_size = len;
  if (len > 0 || numAttributes > 0) {
    transaction->current_transfers_++;
    transaction->total_transfers_++;
  } else {
    logger_->log_warn("Site2Site transaction %s empty flow file without attribute", transactionID.to_string());
    transaction->_dataAvailable = false;
    eof = true;
    return true;
  }
  transaction->_state = DATA_EXCHANGED;
  transaction->_bytes += len;
  core::logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " received flow record " << transaction->total_transfers_
                             << ", total length " << transaction->_bytes << ", added " << len;

  return true;
}