Channels are the repositories where the events are staged on a agent.
Source adds the events and Sink removes it.
Kafka Channel
The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability and
replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks
The Kafka channel can be used for multiple scenarios:
- With Flume source and sink - it provides a reliable and highly available channel for events
- With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
- With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr
This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
the channel has changed compared to previous flume versions.
The configuration parameters are organized as such:
- Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
- Configuration values related to Kafka or how the Channel operates are prefixed with “kafka.”, (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates
- Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
- Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks
This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message
is logged on startup when they are present in the configuration file.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers |
– |
List of brokers in the Kafka cluster used by the channel
This can be a partial list of brokers, but we recommend at least two for HA.
The format is comma separated list of hostname:port |
kafka.topic |
flume-channel |
Kafka topic which the channel will use |
kafka.consumer.group.id |
flume |
Consumer group ID the channel uses to register with Kafka.
Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
Note that having non-channel consumers with the same ID can lead to data loss. |
parseAsFlumeEvent |
true |
Expecting Avro datums with FlumeEvent schema in the channel.
This should be true if Flume source is writing to the channel and false if other producers are
writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact |
migrateZookeeperOffsets |
true |
When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set
to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset
configuration defines how offsets are handled. |
pollTimeout |
500 |
The amount of time(in milliseconds) to wait in the “poll()” call of the consumer.
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) |
defaultPartitionId |
– |
Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
overriden by partitionIdHeader. By default, if this property is not set, events will be
distributed by the Kafka Producer’s partitioner - including by key if specified (or by a
partitioner specified by kafka.partitioner.class). |
partitionIdHeader |
– |
When set, the producer will take the value of the field named using the value of this property
from the event header and send the message to the specified partition of the topic. If the
value represents an invalid partition the event will not be accepted into the channel. If the header value
is present then this setting overrides defaultPartitionId. |
kafka.consumer.auto.offset.reset |
latest |
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
(e.g. because that data has been deleted):
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer’s group
anything else: throw exception to the consumer. |
kafka.producer.security.protocol |
PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
kafka.consumer.security.protocol |
PLAINTEXT |
Same as kafka.producer.security.protocol but for reading/consuming from Kafka. |
more producer/consumer security props |
|
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional
properties that need to be set on producer/consumer. |
Deprecated Properties
Property Name |
Default |
Description |
brokerList |
– |
List of brokers in the Kafka cluster used by the channel
This can be a partial list of brokers, but we recommend at least two for HA.
The format is comma separated list of hostname:port |
topic |
flume-channel |
Use kafka.topic |
groupId |
flume |
Use kafka.consumer.group.id |
readSmallestOffset |
false |
Use kafka.consumer.auto.offset.reset |
Note
Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up
Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
Security and Kafka Channel:
Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.
For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.
As of now data encryption is solely provided by SSL/TLS.
Setting kafka.producer|consumer.security.protocol to any of the following value means:
- SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
- SASL_SSL - Kerberos or plaintext authentication with data encryption
- SSL - TLS based encryption with optional authentication.
Warning
There is a performance degradation when SSL is enabled,
the magnitude of which depends on the CPU type and the JVM implementation.
Reference: Kafka security overview
and the jira for tracking this issue:
KAFKA-2561
TLS and Kafka Channel:
Please read the steps described in Configuring Kafka Clients SSL
to learn about additional configuration settings for fine tuning for example any of the following:
security provider, cipher suites, enabled protocols, truststore or keystore types.
Example configuration with server side authentication and data encryption.
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
Note: By default the property ssl.endpoint.identification.algorithm
is not defined, so hostname verification is not performed.
In order to enable hostname verification, set the following properties
a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
Once enabled, clients will verify the server’s fully qualified domain name (FQDN)
against one of the following two fields:
- Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
- Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
If client side authentication is also required then additionally the following should be added to Flume agent configuration.
Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
which in turn is trusted by Kafka brokers.
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
If keystore and key use different password protection then ssl.key.password property will
provide the required additional secret for both consumer and producer keystores:
a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
Kerberos and Kafka Channel:
To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol properties noted above for producer and/or consumer.
The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed.
See Kafka doc
for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)
in Kafka documentation of SASL configuration.
Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example.
This won’t be needed unless you require offset migration, or you require this section for other secure components.
Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
File Channel
Required properties are in bold.
Property Name Default |
Description |
|
type |
– |
The component type name, needs to be file. |
checkpointDir |
~/.flume/file-channel/checkpoint |
The directory where checkpoint file will be stored |
useDualCheckpoints |
false |
Backup the checkpoint. If this is set to true, backupCheckpointDir must be set |
backupCheckpointDir |
– |
The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory |
dataDirs |
~/.flume/file-channel/data |
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance |
transactionCapacity |
10000 |
The maximum size of transaction supported by the channel |
checkpointInterval |
30000 |
Amount of time (in millis) between checkpoints |
maxFileSize |
2146435071 |
Max size (in bytes) of a single log file |
minimumRequiredSpace |
524288000 |
Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value |
capacity |
1000000 |
Maximum capacity of the channel |
keep-alive |
3 |
Amount of time (in sec) to wait for a put operation |
use-log-replay-v1 |
false |
Expert: Use old replay logic |
use-fast-replay |
false |
Expert: Replay without using queue |
checkpointOnClose |
true |
Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay. |
encryption.activeKey |
– |
Key name used to encrypt new data |
encryption.cipherProvider |
– |
Cipher provider type, supported types: AESCTRNOPADDING |
encryption.keyProvider |
– |
Key provider type, supported types: JCEKSFILE |
encryption.keyProvider.keyStoreFile |
– |
Path to the keystore file |
encrpytion.keyProvider.keyStorePasswordFile |
– |
Path to the keystore password file |
encryption.keyProvider.keys |
– |
List of all keys (e.g. history of the activeKey setting) |
encyption.keyProvider.keys.*.passwordFile |
– |
Path to the optional key password file |
Note
By default the File Channel uses paths for checkpoint and data
directories that are within the user home as specified above.
As a result if you have more than one File Channel instances
active within the agent, only one will be able to lock the
directories and cause the other channel initialization to fail.
It is therefore necessary that you provide explicit paths to
all the configured channels, preferably on different disks.
Furthermore, as file channel will sync to disk after every commit,
coupling it with a sink/source that batches events together may
be necessary to provide good performance where multiple disks are
not available for checkpoint and data directories.
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
Below is a few sample configurations:
Generating a key with a password seperate from the key store password:
keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-keysize 128 -validity 9000 -keystore test.keystore \
-storetype jceks -storepass keyStorePassword
Generating a key with the password the same as the key store password:
keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0
Let’s say you have aged key-0 out and new files should be encrypted with key-1:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
The same scenerio as above, however key-0 has its own password:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Spillable Memory Channel
The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow.
The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in
the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the
same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates.
The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart,
only the events stored on disk are recovered when the agent comes online. This channel is currently experimental and
not recommended for use in production.
Required properties are in bold. Please refer to file channel for additional required properties.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be SPILLABLEMEMORY |
memoryCapacity |
10000 |
Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero. |
overflowCapacity |
100000000 |
Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero. |
overflowTimeout |
3 |
The number of seconds to wait before enabling disk overflow when memory fills up. |
byteCapacityBufferPercentage |
20 |
Defines the percent of buffer between byteCapacity and the estimated total size
of all events in the channel, to account for data in headers. See below. |
byteCapacity |
see description |
Maximum bytes of memory allowed as a sum of all events in the memory queue.
The implementation only counts the Event body, which is the reason for
providing the byteCapacityBufferPercentage configuration parameter as well.
Defaults to a computed value equal to 80% of the maximum memory available to
the JVM (i.e. 80% of the -Xmx value passed on the command line).
Note that if you have multiple memory channels on a single JVM, and they happen
to hold the same physical events (i.e. if you are using a replicating channel
selector from a single source) then those event sizes may be double-counted for
channel byteCapacity purposes.
Setting this value to 0 will cause this value to fall back to a hard
internal limit of about 200 GB. |
avgEventSize |
500 |
Estimated average size of events, in bytes, going into the channel |
<file channel properties> |
see file channel |
Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used.
The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’
to set the File channel’s capacity. |
In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
To disable the use of the in-memory queue and function like a file channel:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
To disable the use of overflow disk and function purely as a in-memory channel:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0