extensions/kafka/PublishKafka.h (253 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "KafkaConnection.h"
#include "KafkaProcessorBase.h"
#include "controllers/SSLContextService.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/RelationshipDefinition.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
#include "rdkafka.h"
#include "utils/ArrayUtils.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::processors {
class PublishKafka final : public KafkaProcessorBase {
public:
enum class CompressionCodecEnum { none, gzip, snappy, lz4, zstd };
EXTENSIONAPI static constexpr const char* Description =
"This Processor puts the contents of a FlowFile to a Topic in Apache "
"Kafka. "
"The content of a FlowFile becomes the contents of a Kafka message. "
"This message is optionally assigned a key by using the <Kafka Key> "
"Property.";
EXTENSIONAPI static constexpr auto SeedBrokers =
core::PropertyDefinitionBuilder<>::createProperty("Known Brokers")
.withDescription(
"A comma-separated list of known Kafka Brokers in the format "
"<host>:<port>")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto Topic =
core::PropertyDefinitionBuilder<>::createProperty("Topic Name")
.withDescription("The Kafka Topic of interest")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto DeliveryGuarantee =
core::PropertyDefinitionBuilder<>::createProperty("Delivery Guarantee")
.withDescription(
"Specifies the requirement for guaranteeing that a message is "
"sent to Kafka. "
"Valid values are 0 (do not wait for acks), "
"-1 or all (block until message is committed by all in sync "
"replicas) "
"or any concrete number of nodes.")
.isRequired(false)
.supportsExpressionLanguage(true)
.withDefaultValue("1")
.build();
EXTENSIONAPI static constexpr auto MaxMessageSize =
core::PropertyDefinitionBuilder<>::createProperty("Max Request Size")
.withDescription("Maximum Kafka protocol request message size")
.isRequired(false)
.build();
EXTENSIONAPI static constexpr auto RequestTimeOut =
core::PropertyDefinitionBuilder<>::createProperty("Request Timeout")
.withDescription("The ack timeout of the producer request")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("10 sec")
.build();
EXTENSIONAPI static constexpr auto MessageTimeOut =
core::PropertyDefinitionBuilder<>::createProperty("Message Timeout")
.withDescription("The total time sending a message could take")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("30 sec")
.build();
EXTENSIONAPI static constexpr auto ClientName =
core::PropertyDefinitionBuilder<>::createProperty("Client Name")
.withDescription("Client Name to use when communicating with Kafka")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto BatchSize =
core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
.withDescription("Maximum number of messages batched in one MessageSet")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.withDefaultValue("10")
.build();
EXTENSIONAPI static constexpr auto TargetBatchPayloadSize =
core::PropertyDefinitionBuilder<>::createProperty("Target Batch Payload Size")
.withDescription(
"The target total payload size for a batch. 0 B means unlimited "
"(Batch Size is still applied).")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("512 KB")
.build();
EXTENSIONAPI static constexpr auto AttributeNameRegex =
core::PropertyDefinitionBuilder<>::createProperty("Attributes to Send as Headers")
.withDescription(
"Any attribute whose name matches the regex will be added to the "
"Kafka messages as a Header")
.build();
EXTENSIONAPI static constexpr auto QueueBufferMaxTime =
core::PropertyDefinitionBuilder<>::createProperty("Queue Buffering Max Time")
.withDescription(
"Delay to wait for messages in the producer queue to accumulate "
"before constructing message batches")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("5 millis")
.build();
EXTENSIONAPI static constexpr auto QueueBufferMaxSize =
core::PropertyDefinitionBuilder<>::createProperty("Queue Max Buffer Size")
.withDescription("Maximum total message size sum allowed on the producer queue")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("1 MB")
.build();
EXTENSIONAPI static constexpr auto QueueBufferMaxMessage =
core::PropertyDefinitionBuilder<>::createProperty("Queue Max Message")
.withDescription("Maximum number of messages allowed on the producer queue")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.withDefaultValue("1000")
.build();
EXTENSIONAPI static constexpr auto CompressCodec =
core::PropertyDefinitionBuilder<magic_enum::enum_count<CompressionCodecEnum>()>::createProperty("Compress Codec")
.withDescription("compression codec to use for compressing message sets")
.isRequired(false)
.withDefaultValue(magic_enum::enum_name(CompressionCodecEnum::none))
.withAllowedValues(magic_enum::enum_names<CompressionCodecEnum>())
.build();
EXTENSIONAPI static constexpr auto MaxFlowSegSize =
core::PropertyDefinitionBuilder<>::createProperty("Max Flow Segment Size")
.withDescription(
"Maximum flow content payload segment size for the kafka record. "
"0 B means unlimited.")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto SecurityCA =
core::PropertyDefinitionBuilder<>::createProperty("Security CA")
.withDescription(
"DEPRECATED in favor of SSL Context Service. File or directory "
"path to CA certificate(s) for verifying the broker's key")
.build();
EXTENSIONAPI static constexpr auto SecurityCert =
core::PropertyDefinitionBuilder<>::createProperty("Security Cert")
.withDescription(
"DEPRECATED in favor of SSL Context Service.Path to client's "
"public key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKey =
core::PropertyDefinitionBuilder<>::createProperty("Security Private Key")
.withDescription(
"DEPRECATED in favor of SSL Context Service.Path to client's "
"private key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKeyPassWord =
core::PropertyDefinitionBuilder<>::createProperty("Security Pass Phrase")
.withDescription(
"DEPRECATED in favor of SSL Context Service.Private key "
"passphrase")
.isSensitive(true)
.build();
EXTENSIONAPI static constexpr auto KafkaKey =
core::PropertyDefinitionBuilder<>::createProperty("Kafka Key")
.withDescription(
"The key to use for the message. If not specified, the UUID of "
"the flow file is used as the message key.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto MessageKeyField =
core::PropertyDefinitionBuilder<>::createProperty("Message Key Field")
.withDescription("DEPRECATED, does not work -- use Kafka Key instead")
.build();
EXTENSIONAPI static constexpr auto DebugContexts =
core::PropertyDefinitionBuilder<>::createProperty("Debug contexts")
.withDescription(
"A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, "
"msg, protocol, cgrp, security, fetch, interceptor, plugin, "
"consumer, "
"admin, eos, all")
.build();
EXTENSIONAPI static constexpr auto FailEmptyFlowFiles =
core::PropertyDefinitionBuilder<>::createProperty("Fail empty flow files")
.withDescription(
"Keep backwards compatibility with <=0.7.0 bug which caused flow "
"files with empty content to not be published to Kafka and "
"forwarded "
"to failure. The old behavior is "
"deprecated. Use connections to drop empty flow files!")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(KafkaProcessorBase::Properties,
std::to_array<core::PropertyReference>({SeedBrokers, Topic, DeliveryGuarantee, MaxMessageSize, RequestTimeOut,
MessageTimeOut, ClientName, BatchSize, TargetBatchPayloadSize, AttributeNameRegex, QueueBufferMaxTime,
QueueBufferMaxSize, QueueBufferMaxMessage, CompressCodec, MaxFlowSegSize, SecurityCA, SecurityCert,
SecurityPrivateKey, SecurityPrivateKeyPassWord, KafkaKey, MessageKeyField, DebugContexts, FailEmptyFlowFiles}));
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success",
"Any FlowFile that is successfully sent to Kafka will be routed to "
"this Relationship"};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
"Any FlowFile that cannot be sent to Kafka will be routed to this "
"Relationship"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
explicit PublishKafka(const std::string_view name, const utils::Identifier& uuid = {})
: KafkaProcessorBase(name, uuid, core::logging::LoggerFactory<PublishKafka>::getLogger(uuid)) {}
PublishKafka(const PublishKafka&) = delete;
PublishKafka(PublishKafka&&) = delete;
PublishKafka& operator=(const PublishKafka&) = delete;
PublishKafka& operator=(PublishKafka&&) = delete;
~PublishKafka() override = default;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override;
void notifyStop() override;
class Messages;
protected:
bool configureNewConnection(core::ProcessContext& context);
bool createNewTopic(
core::ProcessContext& context, const std::string& topic_name, const std::shared_ptr<core::FlowFile>& flow_file) const;
std::optional<utils::net::SslData> getSslData(core::ProcessContext& context) const override;
private:
KafkaConnectionKey key_;
std::unique_ptr<KafkaConnection> conn_;
std::mutex connection_mutex_;
uint64_t batch_size_{};
uint64_t target_batch_payload_size_{};
uint64_t max_flow_seg_size_{};
std::optional<utils::Regex> attributeNameRegex_;
std::atomic<bool> interrupted_{false};
std::mutex messages_mutex_; // If both connection_mutex_ and messages_mutex_ are needed, always take connection_mutex_
// first to avoid deadlock
std::set<std::shared_ptr<Messages>> messages_set_;
};
} // namespace org::apache::nifi::minifi::processors