void PublishKafka::onTrigger()

in extensions/kafka/PublishKafka.cpp [612:776]


void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
  // Check whether we have been interrupted
  if (interrupted_) {
    logger_->log_info("The processor has been interrupted, not running onTrigger");
    context.yield();
    return;
  }

  std::lock_guard<std::mutex> lock_connection(connection_mutex_);
  logger_->log_debug("PublishKafka onTrigger");

  // Collect FlowFiles to process
  uint64_t actual_bytes = 0U;
  std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
  for (uint64_t i = 0; i < batch_size_; i++) {
    std::shared_ptr<core::FlowFile> flowFile = session.get();
    if (flowFile == nullptr) { break; }
    actual_bytes += flowFile->getSize();
    flowFiles.emplace_back(std::move(flowFile));
    if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) { break; }
  }
  if (flowFiles.empty()) {
    context.yield();
    return;
  }
  logger_->log_debug("Processing {} flow files with a total size of {} B", flowFiles.size(), actual_bytes);

  auto messages = std::make_shared<Messages>(logger_);
  // We must add this to the messages set, so that it will be interrupted when notifyStop is called
  {
    std::lock_guard<std::mutex> lock(messages_mutex_);
    messages_set_.emplace(messages);
  }
  // We also have to ensure that it will be removed once we are done with it
  const auto messagesSetGuard = gsl::finally([&]() {
    std::lock_guard<std::mutex> lock(messages_mutex_);
    messages_set_.erase(messages);
  });

  // Process FlowFiles
  for (auto& flowFile: flowFiles) {
    const size_t flow_file_index = messages->addFlowFile();

    // Get Topic (FlowFile-dependent EL property)
    const auto topic = context.getProperty(Topic, flowFile.get());
    if (topic) {
      logger_->log_debug("PublishKafka: topic for flow file {} is '{}'", flowFile->getUUIDStr(), *topic);
    } else {
      logger_->log_error("Flow file {} does not have a valid Topic", flowFile->getUUIDStr());
      messages->modifyResult(flow_file_index,
          [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; });
      continue;
    }

    // Add topic to the connection if needed
    if (!conn_->hasTopic(*topic)) {
      if (!createNewTopic(context, *topic, flowFile)) {
        logger_->log_error("Failed to add topic {}", *topic);
        messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
          flow_file_result.flow_file_error = true;
        });
        continue;
      }
    }

    std::string kafkaKey = context.getProperty(KafkaKey, flowFile.get()).value_or(flowFile->getUUIDStr());

    logger_->log_debug("PublishKafka: Message Key [{}]", kafkaKey);

    auto thisTopic = conn_->getTopic(*topic);
    if (thisTopic == nullptr) {
      logger_->log_error("Topic {} is invalid", *topic);
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
      continue;
    }

    bool failEmptyFlowFiles = utils::parseBoolProperty(context, FailEmptyFlowFiles);

    ReadCallback callback(max_flow_seg_size_,
        kafkaKey,
        thisTopic->getTopic(),
        conn_->getConnection(),
        *flowFile,
        attributeNameRegex_,
        messages,
        flow_file_index,
        failEmptyFlowFiles,
        logger_);
    session.read(flowFile, std::ref(callback));

    if (!callback.called_) {
      // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
      callback(nullptr);
    }

    if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
      logger_->log_debug(
          "Deprecated behavior, use connections to drop empty flow files! "
          "Failing empty flow file with uuid: {}",
          flowFile->getUUIDStr());
      messages->modifyResult(flow_file_index,
          [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; });
    }

    if (callback.status_ < 0) {
      logger_->log_error("Failed to send flow to kafka topic {}, error: {}", *topic, callback.error_);
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
      continue;
    }
  }

  logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
  messages->waitForCompletion();
  if (messages->wasInterrupted()) {
    logger_->log_warn(
        "Waiting for delivery confirmation was interrupted, some flow files "
        "might be routed to Failure, even if they were successfully "
        "delivered.");
  }
  logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");

  messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
    bool success = false;
    if (flow_file.flow_file_error) {
      success = false;
    } else {
      success = true;
      for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
        const auto& message = flow_file.messages[segment_num];
        switch (message.status) {
          case MessageStatus::InFlight:
            success = false;
            logger_->log_error(
                "Waiting for delivery confirmation was interrupted for flow "
                "file {} segment {}",
                flowFiles[index]->getUUIDStr(),
                segment_num);
            break;
          case MessageStatus::Error:
            success = false;
            logger_->log_error("Failed to deliver flow file {} segment {}, error: {}",
                flowFiles[index]->getUUIDStr(),
                segment_num,
                rd_kafka_err2str(message.err_code));
            break;
          case MessageStatus::Success:
            logger_->log_debug("Successfully delivered flow file {} segment {}",
                flowFiles[index]->getUUIDStr(),
                segment_num);
            break;
        }
      }
    }
    if (success) {
      session.transfer(flowFiles[index], Success);
    } else {
      session.penalize(flowFiles[index]);
      session.transfer(flowFiles[index], Failure);
    }
  });
}