void PublishKafka::onTrigger()

in extensions/librdkafka/PublishKafka.cpp [652:809]


void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  // Check whether we have been interrupted
  if (interrupted_) {
    logger_->log_info("The processor has been interrupted, not running onTrigger");
    context->yield();
    return;
  }

  std::lock_guard<std::mutex> lock_connection(connection_mutex_);
  logger_->log_debug("PublishKafka onTrigger");

  // Collect FlowFiles to process
  uint64_t actual_bytes = 0U;
  std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
  for (uint32_t i = 0; i < batch_size_; i++) {
    std::shared_ptr<core::FlowFile> flowFile = session->get();
    if (flowFile == nullptr) {
      break;
    }
    actual_bytes += flowFile->getSize();
    flowFiles.emplace_back(std::move(flowFile));
    if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) {
      break;
    }
  }
  if (flowFiles.empty()) {
    context->yield();
    return;
  }
  logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);

  auto messages = std::make_shared<Messages>(logger_);
  // We must add this to the messages set, so that it will be interrupted when notifyStop is called
  {
    std::lock_guard<std::mutex> lock(messages_mutex_);
    messages_set_.emplace(messages);
  }
  // We also have to ensure that it will be removed once we are done with it
  const auto messagesSetGuard = gsl::finally([&]() {
    std::lock_guard<std::mutex> lock(messages_mutex_);
    messages_set_.erase(messages);
  });

  // Process FlowFiles
  for (auto& flowFile : flowFiles) {
    size_t flow_file_index = messages->addFlowFile();

    // Get Topic (FlowFile-dependent EL property)
    std::string topic;
    if (context->getProperty(Topic, topic, flowFile)) {
      logger_->log_debug("PublishKafka: topic for flow file %s is '%s'", flowFile->getUUIDStr(), topic);
    } else {
      logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr());
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
      continue;
    }

    // Add topic to the connection if needed
    if (!conn_->hasTopic(topic)) {
      if (!createNewTopic(context, topic, flowFile)) {
        logger_->log_error("Failed to add topic %s", topic);
        messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
          flow_file_result.flow_file_error = true;
        });
        continue;
      }
    }

    std::string kafkaKey;
    if (!context->getProperty(KafkaKey, kafkaKey, flowFile) || kafkaKey.empty()) {
      kafkaKey = flowFile->getUUIDStr();
    }
    logger_->log_debug("PublishKafka: Message Key [%s]", kafkaKey);

    auto thisTopic = conn_->getTopic(topic);
    if (thisTopic == nullptr) {
      logger_->log_error("Topic %s is invalid", topic);
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
      continue;
    }

    bool failEmptyFlowFiles = true;
    context->getProperty(FailEmptyFlowFiles, failEmptyFlowFiles);

    ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
                          attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
    session->read(flowFile, std::ref(callback));

    if (!callback.called_) {
      // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
      callback(nullptr);
    }

    if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
      logger_->log_debug("Deprecated behavior, use connections to drop empty flow files! Failing empty flow file with uuid: %s", flowFile->getUUIDStr());
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
    }

    if (callback.status_ < 0) {
      logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_);
      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
        flow_file_result.flow_file_error = true;
      });
      continue;
    }
  }

  logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
  messages->waitForCompletion();
  if (messages->wasInterrupted()) {
    logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered.");
  }
  logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");

  messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
    bool success;
    if (flow_file.flow_file_error) {
      success = false;
    } else {
      success = true;
      for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
        const auto& message = flow_file.messages[segment_num];
        switch (message.status) {
          case MessageStatus::InFlight:
            success = false;
            logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
                flowFiles[index]->getUUIDStr(),
                segment_num);
          break;
          case MessageStatus::Error:
            success = false;
            logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
                flowFiles[index]->getUUIDStr(),
                segment_num,
                rd_kafka_err2str(message.err_code));
          break;
          case MessageStatus::Success:
            logger_->log_debug("Successfully delivered flow file %s segment %zu",
                flowFiles[index]->getUUIDStr(),
                segment_num);
          break;
        }
      }
    }
    if (success) {
      session->transfer(flowFiles[index], Success);
    } else {
      session->penalize(flowFiles[index]);
      session->transfer(flowFiles[index], Failure);
    }
  });
}