content/releases/content/1.10.0/FlumeUserGuide.html [4768:6117]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

Kite Dataset Sink

Experimental sink that writes events to a Kite Dataset. This sink will deserialize the body of each incoming event and store the resulting record in a Kite Dataset. It determines target Dataset by loading a dataset by URI.

The only supported serialization is avro, and the record schema must be passed in the event headers, using either flume.avro.schema.literal with the JSON schema representation or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported). This is compatible with the Log4jAppender flume client and the spooling directory source’s Avro deserializer using deserializer.schemaType = LITERAL.

Note 1: The flume.avro.schema.hash header is not supported. Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible.

Property Name Default Description
channel  
type Must be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri URI of the dataset to open
kite.repo.uri URI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespace Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.dataset.name Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatch true If true, the Flume transaction will be commited and the writer will be flushed on each batch of kite.batchSize records. This setting only applies to flushable datasets. When true, it’s possible for temp files with commited data to be left in the dataset directory. These files need to be recovered by hand for the data to be visible to DatasetReaders.
kite.syncable.syncOnBatch true Controls whether the sink will also sync data when committing the transaction. This setting only applies to syncable datasets. Syncing gaurentees that data will be written on stable storage on the remote system while flushing only gaurentees that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch property is set to false, this property must also be set to false.
kite.entityParser avro Parser that turns Flume Events into Kite entities. Valid values are avro and the fully-qualified class name of an implementation of the EntityParser.Builder interface.
kite.failurePolicy retry Policy that handles non-recoverable errors such as a missing Schema in the Event header. The default value, retry, will fail the current batch and try again which matches the old behavior. Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset, and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.
kite.error.dataset.uri URI of the dataset where failed events are saved when kite.failurePolicy is set to save. Required when the kite.failurePolicy is set to save.
auth.kerberosPrincipal Kerberos user principal for secure authentication to HDFS
auth.kerberosKeytab Kerberos keytab location (local FS) for the principal
auth.proxyUser The effective user for HDFS actions, if different from the kerberos principal

Kafka Sink

This is a Flume Sink implementation that can publish data to a Kafka topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming through various Flume sources.

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Required properties are marked in bold font.

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”. (If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)
flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side.
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId.
allowTopicOverride true When set, the sink will allow a message to be produced into a topic specified by the topicHeader property (if provided).
topicHeader topic When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more producer security props   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer.
Other Kafka Producer Properties These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms

Note

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer) and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Property Name Default Description
brokerList Use kafka.bootstrap.servers
topic default-flume-topic Use kafka.topic
batchSize 100 Use kafka.flumeBatchSize
requiredAcks 1 Use kafka.producer.acks

An example configuration of a Kafka sink is given below. Properties starting with the prefix kafka.producer the Kafka producer. The properties that are passed when creating the Kafka producer are not limited to the properties given in this example. Also it is possible to include your custom properties here and access them inside the preprocessor through the Flume Context object passed in as a method argument.

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer.security.protocol to any of the following value means:

Warning

There is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation. Reference: Kafka security overview and the jira for tracking this issue: KAFKA-2561

TLS and Kafka Sink:

Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.

# optional, the global keystore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for producer keystore:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink:

To use Kafka sink with a Kafka cluster secured with Kerberos, set the producer.security.protocol property noted above for producer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Unlike the Kafka Source or Kafka Channel a “Client” section is not required, unless it is needed by other connecting components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

HTTP Sink

Behaviour of this sink is that it will take events from the channel, and send those events to a remote service using an HTTP POST request. The event content is sent as the POST body.

Error handling behaviour of this sink depends on the HTTP response returned by the target server. The sink backoff/ready status is configurable, as is the transaction commit/rollback result and whether the event contributes to the successful event drain count.

Any malformed HTTP response returned by the server where the status code is not readable will result in a backoff signal and the event is not consumed from the channel.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be http.
endpoint The fully qualified URL endpoint to POST to
connectTimeout 5000 The socket connection timeout in milliseconds
requestTimeout 5000 The maximum request processing time in milliseconds
contentTypeHeader text/plain The HTTP Content-Type header
acceptHeader text/plain The HTTP Accept header value
defaultBackoff true Whether to backoff by default on receiving all HTTP status codes
defaultRollback true Whether to rollback by default on receiving all HTTP status codes
defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes
backoff.CODE Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODE Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODE Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

Note that the most specific HTTP status code match is used for the backoff, rollback and incrementMetrics configuration options. If there are configuration values for both 2XX and 200 status codes, then 200 HTTP codes will use the 200 value, and all other HTTP codes in the 201-299 range will use the 2XX value.

Any empty or null events are consumed without any request being made to the HTTP endpoint.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

A custom sink is your own implementation of the Sink interface. A custom sink’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom sink is its FQCN. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be your FQCN

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Flume Channels

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.

Memory Channel

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be memory
capacity 100 The maximum number of events stored in the channel
transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction
keep-alive 3 Timeout in seconds for adding or removing an event
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

JDBC Channel

The events are stored in a persistent storage that’s backed by a database. The JDBC channel currently supports embedded Derby. This is a durable channel that’s ideal for flows where recoverability is important. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be jdbc
db.type DERBY Database vendor, needs to be DERBY.
driver.class org.apache.derby.jdbc.EmbeddedDriver Class for vendor’s JDBC driver
driver.url (constructed from other properties) JDBC connection URL
db.username “sa” User id for db connection
db.password password for db connection
connection.properties.file JDBC Connection property file path
create.schema true If true, then creates db schema if not there
create.index true Create indexes to speed up lookups
create.foreignkey true  
transaction.isolation “READ_COMMITTED” Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections 10 Max connections allowed to db
maximum.capacity 0 (unlimited) Max number of events in the channel
sysprop.*   DB Vendor specific properties
sysprop.user.home   Home path to store embedded Derby database

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = jdbc

Kafka Channel

The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability and replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks

The Kafka channel can be used for multiple scenarios:

  1. With Flume source and sink - it provides a reliable and highly available channel for events
  2. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
  3. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

The configuration parameters are organized as such:

  1. Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
  2. Configuration values related to Kafka or how the Channel operates are prefixed with “kafka.”, (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates
  3. Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
  4. Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks

This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message is logged on startup when they are present in the configuration file.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic flume-channel Kafka topic which the channel will use
kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss.
parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
pollTimeout 500 The amount of time(in milliseconds) to wait in the “poll()” call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.
kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka.
more producer/consumer security props   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer.

Deprecated Properties

Property Name Default Description
brokerList List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
topic flume-channel Use kafka.topic
groupId flume Use kafka.consumer.group.id
readSmallestOffset false Use kafka.consumer.auto.offset.reset
migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled.

Note

Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up

Example for agent named a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

Security and Kafka Channel:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer|consumer.security.protocol to any of the following value means:

Warning

There is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation. Reference: Kafka security overview and the jira for tracking this issue: KAFKA-2561

TLS and Kafka Channel:

Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties

a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.

# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer and producer keystores:

a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos and Kafka Channel:

To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol properties noted above for producer and/or consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

File Channel

Required properties are in bold.

Property Name Default Description  
type The component type name, needs to be file.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 Max size (in bytes) of a single log file
minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey Key name used to encrypt new data
encryption.cipherProvider Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile Path to the keystore password file
encryption.keyProvider.keys List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile Path to the optional key password file

Note

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has its own password:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates. The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart, only the events stored on disk are recovered when the agent comes online. This channel is currently experimental and not recommended for use in production.

Required properties are in bold. Please refer to file channel for additional required properties.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of the in-memory queue and function like a file channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of overflow disk and function purely as a in-memory channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

Warning

The Pseudo Transaction Channel is only for unit testing purposes and is NOT meant for production use.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 The max number of events stored in the channel
keep-alive 3 Timeout in seconds for adding or removing an event

Custom Channel

A custom channel is your own implementation of the Channel interface. A custom channel’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel is its FQCN. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be a FQCN

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Flume Channel Selectors

If the type is not specified, then defaults to “replicating”.

Replicating Channel Selector (default)

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

In the above configuration, c3 is an optional channel. Failure to write to c3 is simply ignored. Since c1 and c2 are not marked optional, failure to write to those channels will cause the transaction to fail.

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.9.0/FlumeUserGuide.html [4777:6126]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

Kite Dataset Sink

Experimental sink that writes events to a Kite Dataset. This sink will deserialize the body of each incoming event and store the resulting record in a Kite Dataset. It determines target Dataset by loading a dataset by URI.

The only supported serialization is avro, and the record schema must be passed in the event headers, using either flume.avro.schema.literal with the JSON schema representation or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported). This is compatible with the Log4jAppender flume client and the spooling directory source’s Avro deserializer using deserializer.schemaType = LITERAL.

Note 1: The flume.avro.schema.hash header is not supported. Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible.

Property Name Default Description
channel  
type Must be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri URI of the dataset to open
kite.repo.uri URI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespace Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.dataset.name Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatch true If true, the Flume transaction will be commited and the writer will be flushed on each batch of kite.batchSize records. This setting only applies to flushable datasets. When true, it’s possible for temp files with commited data to be left in the dataset directory. These files need to be recovered by hand for the data to be visible to DatasetReaders.
kite.syncable.syncOnBatch true Controls whether the sink will also sync data when committing the transaction. This setting only applies to syncable datasets. Syncing gaurentees that data will be written on stable storage on the remote system while flushing only gaurentees that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch property is set to false, this property must also be set to false.
kite.entityParser avro Parser that turns Flume Events into Kite entities. Valid values are avro and the fully-qualified class name of an implementation of the EntityParser.Builder interface.
kite.failurePolicy retry Policy that handles non-recoverable errors such as a missing Schema in the Event header. The default value, retry, will fail the current batch and try again which matches the old behavior. Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset, and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.
kite.error.dataset.uri URI of the dataset where failed events are saved when kite.failurePolicy is set to save. Required when the kite.failurePolicy is set to save.
auth.kerberosPrincipal Kerberos user principal for secure authentication to HDFS
auth.kerberosKeytab Kerberos keytab location (local FS) for the principal
auth.proxyUser The effective user for HDFS actions, if different from the kerberos principal

Kafka Sink

This is a Flume Sink implementation that can publish data to a Kafka topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming through various Flume sources.

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Required properties are marked in bold font.

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”. (If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)
flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side.
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId.
allowTopicOverride true When set, the sink will allow a message to be produced into a topic specified by the topicHeader property (if provided).
topicHeader topic When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more producer security props   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer.
Other Kafka Producer Properties These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms

Note

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer) and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Property Name Default Description
brokerList Use kafka.bootstrap.servers
topic default-flume-topic Use kafka.topic
batchSize 100 Use kafka.flumeBatchSize
requiredAcks 1 Use kafka.producer.acks

An example configuration of a Kafka sink is given below. Properties starting with the prefix kafka.producer the Kafka producer. The properties that are passed when creating the Kafka producer are not limited to the properties given in this example. Also it is possible to include your custom properties here and access them inside the preprocessor through the Flume Context object passed in as a method argument.

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation. Reference: Kafka security overview and the jira for tracking this issue: KAFKA-2561

TLS and Kafka Sink:

Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.

# optional, the global keystore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for producer keystore:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink:

To use Kafka sink with a Kafka cluster secured with Kerberos, set the producer.security.protocol property noted above for producer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Unlike the Kafka Source or Kafka Channel a “Client” section is not required, unless it is needed by other connecting components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

HTTP Sink

Behaviour of this sink is that it will take events from the channel, and send those events to a remote service using an HTTP POST request. The event content is sent as the POST body.

Error handling behaviour of this sink depends on the HTTP response returned by the target server. The sink backoff/ready status is configurable, as is the transaction commit/rollback result and whether the event contributes to the successful event drain count.

Any malformed HTTP response returned by the server where the status code is not readable will result in a backoff signal and the event is not consumed from the channel.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be http.
endpoint The fully qualified URL endpoint to POST to
connectTimeout 5000 The socket connection timeout in milliseconds
requestTimeout 5000 The maximum request processing time in milliseconds
contentTypeHeader text/plain The HTTP Content-Type header
acceptHeader text/plain The HTTP Accept header value
defaultBackoff true Whether to backoff by default on receiving all HTTP status codes
defaultRollback true Whether to rollback by default on receiving all HTTP status codes
defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes
backoff.CODE Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODE Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODE Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

Note that the most specific HTTP status code match is used for the backoff, rollback and incrementMetrics configuration options. If there are configuration values for both 2XX and 200 status codes, then 200 HTTP codes will use the 200 value, and all other HTTP codes in the 201-299 range will use the 2XX value.

Any empty or null events are consumed without any request being made to the HTTP endpoint.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

A custom sink is your own implementation of the Sink interface. A custom sink’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom sink is its FQCN. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be your FQCN

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Flume Channels

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.

Memory Channel

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be memory
capacity 100 The maximum number of events stored in the channel
transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction
keep-alive 3 Timeout in seconds for adding or removing an event
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

JDBC Channel

The events are stored in a persistent storage that’s backed by a database. The JDBC channel currently supports embedded Derby. This is a durable channel that’s ideal for flows where recoverability is important. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be jdbc
db.type DERBY Database vendor, needs to be DERBY.
driver.class org.apache.derby.jdbc.EmbeddedDriver Class for vendor’s JDBC driver
driver.url (constructed from other properties) JDBC connection URL
db.username “sa” User id for db connection
db.password password for db connection
connection.properties.file JDBC Connection property file path
create.schema true If true, then creates db schema if not there
create.index true Create indexes to speed up lookups
create.foreignkey true  
transaction.isolation “READ_COMMITTED” Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections 10 Max connections allowed to db
maximum.capacity 0 (unlimited) Max number of events in the channel
sysprop.*   DB Vendor specific properties
sysprop.user.home   Home path to store embedded Derby database

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = jdbc

Kafka Channel

The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability and replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks

The Kafka channel can be used for multiple scenarios:

  1. With Flume source and sink - it provides a reliable and highly available channel for events
  2. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
  3. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

The configuration parameters are organized as such:

  1. Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
  2. Configuration values related to Kafka or how the Channel operates are prefixed with “kafka.”, (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates
  3. Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
  4. Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks

This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message is logged on startup when they are present in the configuration file.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic flume-channel Kafka topic which the channel will use
kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss.
parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
pollTimeout 500 The amount of time(in milliseconds) to wait in the “poll()” call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.
kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka.
more producer/consumer security props   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer.

Deprecated Properties

Property Name Default Description
brokerList List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
topic flume-channel Use kafka.topic
groupId flume Use kafka.consumer.group.id
readSmallestOffset false Use kafka.consumer.auto.offset.reset
migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled.

Note

Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up

Example for agent named a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

Security and Kafka Channel:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer|consumer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation. Reference: Kafka security overview and the jira for tracking this issue: KAFKA-2561

TLS and Kafka Channel:

Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties

a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.

# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer and producer keystores:

a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos and Kafka Channel:

To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol properties noted above for producer and/or consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

File Channel

Required properties are in bold.

Property Name Default Description  
type The component type name, needs to be file.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 Max size (in bytes) of a single log file
minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey Key name used to encrypt new data
encryption.cipherProvider Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile Path to the keystore password file
encryption.keyProvider.keys List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile Path to the optional key password file

Note

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has its own password:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates. The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart, only the events stored on disk are recovered when the agent comes online. This channel is currently experimental and not recommended for use in production.

Required properties are in bold. Please refer to file channel for additional required properties.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of the in-memory queue and function like a file channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of overflow disk and function purely as a in-memory channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

Warning

The Pseudo Transaction Channel is only for unit testing purposes and is NOT meant for production use.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 The max number of events stored in the channel
keep-alive 3 Timeout in seconds for adding or removing an event

Custom Channel

A custom channel is your own implementation of the Channel interface. A custom channel’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel is its FQCN. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be a FQCN

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Flume Channel Selectors

If the type is not specified, then defaults to “replicating”.

Replicating Channel Selector (default)

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

In the above configuration, c3 is an optional channel. Failure to write to c3 is simply ignored. Since c1 and c2 are not marked optional, failure to write to those channels will cause the transaction to fail.

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -