in extensions/librdkafka/PublishKafka.cpp [652:809]
void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// Check whether we have been interrupted
if (interrupted_) {
logger_->log_info("The processor has been interrupted, not running onTrigger");
context->yield();
return;
}
std::lock_guard<std::mutex> lock_connection(connection_mutex_);
logger_->log_debug("PublishKafka onTrigger");
// Collect FlowFiles to process
uint64_t actual_bytes = 0U;
std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
for (uint32_t i = 0; i < batch_size_; i++) {
std::shared_ptr<core::FlowFile> flowFile = session->get();
if (flowFile == nullptr) {
break;
}
actual_bytes += flowFile->getSize();
flowFiles.emplace_back(std::move(flowFile));
if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) {
break;
}
}
if (flowFiles.empty()) {
context->yield();
return;
}
logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
auto messages = std::make_shared<Messages>(logger_);
// We must add this to the messages set, so that it will be interrupted when notifyStop is called
{
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.emplace(messages);
}
// We also have to ensure that it will be removed once we are done with it
const auto messagesSetGuard = gsl::finally([&]() {
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.erase(messages);
});
// Process FlowFiles
for (auto& flowFile : flowFiles) {
size_t flow_file_index = messages->addFlowFile();
// Get Topic (FlowFile-dependent EL property)
std::string topic;
if (context->getProperty(Topic, topic, flowFile)) {
logger_->log_debug("PublishKafka: topic for flow file %s is '%s'", flowFile->getUUIDStr(), topic);
} else {
logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr());
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
// Add topic to the connection if needed
if (!conn_->hasTopic(topic)) {
if (!createNewTopic(context, topic, flowFile)) {
logger_->log_error("Failed to add topic %s", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
std::string kafkaKey;
if (!context->getProperty(KafkaKey, kafkaKey, flowFile) || kafkaKey.empty()) {
kafkaKey = flowFile->getUUIDStr();
}
logger_->log_debug("PublishKafka: Message Key [%s]", kafkaKey);
auto thisTopic = conn_->getTopic(topic);
if (thisTopic == nullptr) {
logger_->log_error("Topic %s is invalid", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
bool failEmptyFlowFiles = true;
context->getProperty(FailEmptyFlowFiles, failEmptyFlowFiles);
ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
session->read(flowFile, std::ref(callback));
if (!callback.called_) {
// workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
callback(nullptr);
}
if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
logger_->log_debug("Deprecated behavior, use connections to drop empty flow files! Failing empty flow file with uuid: %s", flowFile->getUUIDStr());
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
}
if (callback.status_ < 0) {
logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
messages->waitForCompletion();
if (messages->wasInterrupted()) {
logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered.");
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");
messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
bool success;
if (flow_file.flow_file_error) {
success = false;
} else {
success = true;
for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
const auto& message = flow_file.messages[segment_num];
switch (message.status) {
case MessageStatus::InFlight:
success = false;
logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
case MessageStatus::Error:
success = false;
logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
flowFiles[index]->getUUIDStr(),
segment_num,
rd_kafka_err2str(message.err_code));
break;
case MessageStatus::Success:
logger_->log_debug("Successfully delivered flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
}
}
}
if (success) {
session->transfer(flowFiles[index], Success);
} else {
session->penalize(flowFiles[index]);
session->transfer(flowFiles[index], Failure);
}
});
}