in extensions/kafka/PublishKafka.cpp [612:776]
void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
// Check whether we have been interrupted
if (interrupted_) {
logger_->log_info("The processor has been interrupted, not running onTrigger");
context.yield();
return;
}
std::lock_guard<std::mutex> lock_connection(connection_mutex_);
logger_->log_debug("PublishKafka onTrigger");
// Collect FlowFiles to process
uint64_t actual_bytes = 0U;
std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
for (uint64_t i = 0; i < batch_size_; i++) {
std::shared_ptr<core::FlowFile> flowFile = session.get();
if (flowFile == nullptr) { break; }
actual_bytes += flowFile->getSize();
flowFiles.emplace_back(std::move(flowFile));
if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) { break; }
}
if (flowFiles.empty()) {
context.yield();
return;
}
logger_->log_debug("Processing {} flow files with a total size of {} B", flowFiles.size(), actual_bytes);
auto messages = std::make_shared<Messages>(logger_);
// We must add this to the messages set, so that it will be interrupted when notifyStop is called
{
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.emplace(messages);
}
// We also have to ensure that it will be removed once we are done with it
const auto messagesSetGuard = gsl::finally([&]() {
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.erase(messages);
});
// Process FlowFiles
for (auto& flowFile: flowFiles) {
const size_t flow_file_index = messages->addFlowFile();
// Get Topic (FlowFile-dependent EL property)
const auto topic = context.getProperty(Topic, flowFile.get());
if (topic) {
logger_->log_debug("PublishKafka: topic for flow file {} is '{}'", flowFile->getUUIDStr(), *topic);
} else {
logger_->log_error("Flow file {} does not have a valid Topic", flowFile->getUUIDStr());
messages->modifyResult(flow_file_index,
[](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; });
continue;
}
// Add topic to the connection if needed
if (!conn_->hasTopic(*topic)) {
if (!createNewTopic(context, *topic, flowFile)) {
logger_->log_error("Failed to add topic {}", *topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
std::string kafkaKey = context.getProperty(KafkaKey, flowFile.get()).value_or(flowFile->getUUIDStr());
logger_->log_debug("PublishKafka: Message Key [{}]", kafkaKey);
auto thisTopic = conn_->getTopic(*topic);
if (thisTopic == nullptr) {
logger_->log_error("Topic {} is invalid", *topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
bool failEmptyFlowFiles = utils::parseBoolProperty(context, FailEmptyFlowFiles);
ReadCallback callback(max_flow_seg_size_,
kafkaKey,
thisTopic->getTopic(),
conn_->getConnection(),
*flowFile,
attributeNameRegex_,
messages,
flow_file_index,
failEmptyFlowFiles,
logger_);
session.read(flowFile, std::ref(callback));
if (!callback.called_) {
// workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
callback(nullptr);
}
if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
logger_->log_debug(
"Deprecated behavior, use connections to drop empty flow files! "
"Failing empty flow file with uuid: {}",
flowFile->getUUIDStr());
messages->modifyResult(flow_file_index,
[](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; });
}
if (callback.status_ < 0) {
logger_->log_error("Failed to send flow to kafka topic {}, error: {}", *topic, callback.error_);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
messages->waitForCompletion();
if (messages->wasInterrupted()) {
logger_->log_warn(
"Waiting for delivery confirmation was interrupted, some flow files "
"might be routed to Failure, even if they were successfully "
"delivered.");
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");
messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
bool success = false;
if (flow_file.flow_file_error) {
success = false;
} else {
success = true;
for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
const auto& message = flow_file.messages[segment_num];
switch (message.status) {
case MessageStatus::InFlight:
success = false;
logger_->log_error(
"Waiting for delivery confirmation was interrupted for flow "
"file {} segment {}",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
case MessageStatus::Error:
success = false;
logger_->log_error("Failed to deliver flow file {} segment {}, error: {}",
flowFiles[index]->getUUIDStr(),
segment_num,
rd_kafka_err2str(message.err_code));
break;
case MessageStatus::Success:
logger_->log_debug("Successfully delivered flow file {} segment {}",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
}
}
}
if (success) {
session.transfer(flowFiles[index], Success);
} else {
session.penalize(flowFiles[index]);
session.transfer(flowFiles[index], Failure);
}
});
}