content/releases/content/1.10.0/FlumeUserGuide.html [5724:6778]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

File Channel

Required properties are in bold.

Property Name Default Description  
type The component type name, needs to be file.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 Max size (in bytes) of a single log file
minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey Key name used to encrypt new data
encryption.cipherProvider Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile Path to the keystore password file
encryption.keyProvider.keys List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile Path to the optional key password file

Note

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has its own password:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates. The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart, only the events stored on disk are recovered when the agent comes online. This channel is currently experimental and not recommended for use in production.

Required properties are in bold. Please refer to file channel for additional required properties.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of the in-memory queue and function like a file channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of overflow disk and function purely as a in-memory channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

Warning

The Pseudo Transaction Channel is only for unit testing purposes and is NOT meant for production use.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 The max number of events stored in the channel
keep-alive 3 Timeout in seconds for adding or removing an event

Custom Channel

A custom channel is your own implementation of the Channel interface. A custom channel’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel is its FQCN. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be a FQCN

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Flume Channel Selectors

If the type is not specified, then defaults to “replicating”.

Replicating Channel Selector (default)

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

In the above configuration, c3 is an optional channel. Failure to write to c3 is simply ignored. Since c1 and c2 are not marked optional, failure to write to those channels will cause the transaction to fail.

Load Balancing Channel Selector

Load balancing channel selector provides the ability to load-balance flow over multiple channels. This effectively allows the incoming data to be processed on multiple threads. It maintains an indexed list of active channels on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to round_robin type, but can be overridden via configuration.

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be load_balancing
selector.policy round_robin Selection mechanism. Must be either round_robin or random.

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = load_balancing
a1.sources.r1.selector.policy = round_robin

Multiplexing Channel Selector

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be multiplexing
selector.header flume.selector.header  
selector.default  
selector.mapping.*  

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

Custom Channel Selector

A custom channel selector is your own implementation of the ChannelSelector interface. A custom channel selector’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel selector is its FQCN.

Property Name Default Description
selector.type The component type name, needs to be your FQCN

Example for agent named a1 and its source called r1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

Flume Sink Processors

Sink groups allow users to group multiple sinks into one entity. Sink processors can be used to provide load balancing capabilities over all sinks inside the group or to achieve fail over from one sink to another in case of temporal failure.

Required properties are in bold.

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be default, failover or load_balance

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Default Sink Processor

Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sinks. Instead user can follow the source - channel - sink pattern that was explained above in this user guide.

Failover Sink Processor

Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).

The failover mechanism works by relegating failed sinks to a pool where they are assigned a cool down period, increasing with sequential failures before they are retried. Once a sink successfully sends an event, it is restored to the live pool. The Sinks have a priority associated with them, larger the number, higher the priority. If a Sink fails while sending a Event the next Sink with highest priority shall be tried next for sending Events. For example, a sink with priority 100 is activated before the Sink with priority 80. If no priority is specified, thr priority is determined based on the order in which the Sinks are specified in configuration.

To configure, set a sink groups processor to failover and set priorities for all individual sinks. All specified priorities must be unique. Furthermore, upper limit to failover time can be set (in milliseconds) using maxpenalty property.

Required properties are in bold.

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority.<sinkName> Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to round_robin type, but can be overridden via configuration. Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.

When invoked, this selector picks the next sink using its configured selection mechanism and invokes it. For round_robin and random In case the selected sink fails to deliver the event, the processor picks the next available sink via its configured selection mechanism. This implementation does not blacklist the failing sink and instead continues to optimistically attempt every available sink. If all sinks invocations result in failure, the selector propagates the failure to the sink runner.

If backoff is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout. When the timeout ends, if the sink is still unresponsive timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive sinks. With this disabled, in round-robin all the failed sinks load will be passed to the next sink in line and thus not evenly balanced

Required properties are in bold.

Property Name Default Description
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Custom Sink Processor

Custom sink processors are not supported at the moment.

Event Serializers

The file_roll sink and the hdfs sink both support the EventSerializer interface. Details of the EventSerializers that ship with Flume are provided below.

Body Text Serializer

Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification. The event headers are ignored. Configuration options are as follows:

Property Name Default Description
appendNewline true Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

Example for agent named a1:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

“Flume Event” Avro Event Serializer

Alias: avro_event.

This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for Flume events in the Avro RPC mechanism.

This serializer inherits from the AbstractAvroEventSerializer class.

Configuration options are as follows:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

Avro Event Serializer

Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name.

This serializes Flume events into an Avro container file like the “Flume Event” Avro Event Serializer, however the record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header.

To pass the record schema as part of the Flume configuration, use the property schemaURL as listed below.

To pass the record schema in an event header, specify either the event header flume.avro.schema.literal containing a JSON-format representation of the schema or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported).

This serializer inherits from the AbstractAvroEventSerializer class.

Configuration options are as follows:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.
schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names in the configuration. Interceptors are specified as a whitespace separated list in the source configuration. The order in which the interceptors are specified is the order in which they are invoked. The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors can modify or drop events. If an interceptor needs to drop events, it just does not return that event in the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors are named components, here is an example of how they are created through configuration:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves configurable and can be passed configuration values just like they are passed to any other configurable component. In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also use the HostInterceptor.

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp (or as specified by the header property) whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.

Property Name Default Description
type The component type name, has to be timestamp or the FQCN
headerName timestamp The name of the header in which to place the generated timestamp.
preserveExisting false If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

Property Name Default Description
type The component type name, has to be host
preserveExisting false If the host header already exists, should it be preserved - true or false
useIP true Use the IP Address if true, else use hostname.
hostHeader host The header key to be used.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

Static Interceptor

Static interceptor allows user to append a static header with static value to all events.

The current implementation does not allow specifying multiple headers at one time. Instead user might chain multiple static interceptors each defining one static header.

Property Name Default Description
type The component type name, has to be static
preserveExisting true If configured header already exists, should it be preserved - true or false
key key Name of header that should be created
value value Static value that should be created

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

Remove Header Interceptor

This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified.

Note that if only one header needs to be removed, specifying it by name provides performance benefits over the other 2 methods.

Property Name Default Description
type The component type name has to be remove_header
withName Name of the header to remove
fromList List of headers to remove, separated with the separator specified by fromListSeparator
fromListSeparator \s*,\s* Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters
matching All the headers which names match this regular expression are removed

UUID Interceptor

This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.

Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.

Property Name Default Description
type The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id The name of the Flume header to modify
preserveExisting true If the UUID header already exists, should it be preserved - true or false
prefix “” The prefix string constant to prepend to each generated UUID

Morphline Interceptor

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.11.0/FlumeUserGuide.html [5992:7046]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

File Channel

Required properties are in bold.

Property Name Default Description  
type The component type name, needs to be file.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 Max size (in bytes) of a single log file
minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey Key name used to encrypt new data
encryption.cipherProvider Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile Path to the keystore password file
encryption.keyProvider.keys List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile Path to the optional key password file

Note

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has its own password:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates. The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart, only the events stored on disk are recovered when the agent comes online. This channel is currently experimental and not recommended for use in production.

Required properties are in bold. Please refer to file channel for additional required properties.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of the in-memory queue and function like a file channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of overflow disk and function purely as a in-memory channel:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

Warning

The Pseudo Transaction Channel is only for unit testing purposes and is NOT meant for production use.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 The max number of events stored in the channel
keep-alive 3 Timeout in seconds for adding or removing an event

Custom Channel

A custom channel is your own implementation of the Channel interface. A custom channel’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel is its FQCN. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be a FQCN

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Flume Channel Selectors

If the type is not specified, then defaults to “replicating”.

Replicating Channel Selector (default)

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

In the above configuration, c3 is an optional channel. Failure to write to c3 is simply ignored. Since c1 and c2 are not marked optional, failure to write to those channels will cause the transaction to fail.

Load Balancing Channel Selector

Load balancing channel selector provides the ability to load-balance flow over multiple channels. This effectively allows the incoming data to be processed on multiple threads. It maintains an indexed list of active channels on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to round_robin type, but can be overridden via configuration.

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be load_balancing
selector.policy round_robin Selection mechanism. Must be either round_robin or random.

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = load_balancing
a1.sources.r1.selector.policy = round_robin

Multiplexing Channel Selector

Required properties are in bold.

Property Name Default Description
selector.type replicating The component type name, needs to be multiplexing
selector.header flume.selector.header  
selector.default  
selector.mapping.*  

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

Custom Channel Selector

A custom channel selector is your own implementation of the ChannelSelector interface. A custom channel selector’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel selector is its FQCN.

Property Name Default Description
selector.type The component type name, needs to be your FQCN

Example for agent named a1 and its source called r1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

Flume Sink Processors

Sink groups allow users to group multiple sinks into one entity. Sink processors can be used to provide load balancing capabilities over all sinks inside the group or to achieve fail over from one sink to another in case of temporal failure.

Required properties are in bold.

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be default, failover or load_balance

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Default Sink Processor

Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sinks. Instead user can follow the source - channel - sink pattern that was explained above in this user guide.

Failover Sink Processor

Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).

The failover mechanism works by relegating failed sinks to a pool where they are assigned a cool down period, increasing with sequential failures before they are retried. Once a sink successfully sends an event, it is restored to the live pool. The Sinks have a priority associated with them, larger the number, higher the priority. If a Sink fails while sending a Event the next Sink with highest priority shall be tried next for sending Events. For example, a sink with priority 100 is activated before the Sink with priority 80. If no priority is specified, thr priority is determined based on the order in which the Sinks are specified in configuration.

To configure, set a sink groups processor to failover and set priorities for all individual sinks. All specified priorities must be unique. Furthermore, upper limit to failover time can be set (in milliseconds) using maxpenalty property.

Required properties are in bold.

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority.<sinkName> Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to round_robin type, but can be overridden via configuration. Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.

When invoked, this selector picks the next sink using its configured selection mechanism and invokes it. For round_robin and random In case the selected sink fails to deliver the event, the processor picks the next available sink via its configured selection mechanism. This implementation does not blacklist the failing sink and instead continues to optimistically attempt every available sink. If all sinks invocations result in failure, the selector propagates the failure to the sink runner.

If backoff is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout. When the timeout ends, if the sink is still unresponsive timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive sinks. With this disabled, in round-robin all the failed sinks load will be passed to the next sink in line and thus not evenly balanced

Required properties are in bold.

Property Name Default Description
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Custom Sink Processor

Custom sink processors are not supported at the moment.

Event Serializers

The file_roll sink and the hdfs sink both support the EventSerializer interface. Details of the EventSerializers that ship with Flume are provided below.

Body Text Serializer

Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification. The event headers are ignored. Configuration options are as follows:

Property Name Default Description
appendNewline true Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

Example for agent named a1:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

“Flume Event” Avro Event Serializer

Alias: avro_event.

This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for Flume events in the Avro RPC mechanism.

This serializer inherits from the AbstractAvroEventSerializer class.

Configuration options are as follows:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

Avro Event Serializer

Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name.

This serializes Flume events into an Avro container file like the “Flume Event” Avro Event Serializer, however the record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header.

To pass the record schema as part of the Flume configuration, use the property schemaURL as listed below.

To pass the record schema in an event header, specify either the event header flume.avro.schema.literal containing a JSON-format representation of the schema or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported).

This serializer inherits from the AbstractAvroEventSerializer class.

Configuration options are as follows:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.
schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names in the configuration. Interceptors are specified as a whitespace separated list in the source configuration. The order in which the interceptors are specified is the order in which they are invoked. The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors can modify or drop events. If an interceptor needs to drop events, it just does not return that event in the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors are named components, here is an example of how they are created through configuration:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves configurable and can be passed configuration values just like they are passed to any other configurable component. In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also use the HostInterceptor.

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp (or as specified by the header property) whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.

Property Name Default Description
type The component type name, has to be timestamp or the FQCN
headerName timestamp The name of the header in which to place the generated timestamp.
preserveExisting false If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

Property Name Default Description
type The component type name, has to be host
preserveExisting false If the host header already exists, should it be preserved - true or false
useIP true Use the IP Address if true, else use hostname.
hostHeader host The header key to be used.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

Static Interceptor

Static interceptor allows user to append a static header with static value to all events.

The current implementation does not allow specifying multiple headers at one time. Instead user might chain multiple static interceptors each defining one static header.

Property Name Default Description
type The component type name, has to be static
preserveExisting true If configured header already exists, should it be preserved - true or false
key key Name of header that should be created
value value Static value that should be created

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

Remove Header Interceptor

This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified.

Note that if only one header needs to be removed, specifying it by name provides performance benefits over the other 2 methods.

Property Name Default Description
type The component type name has to be remove_header
withName Name of the header to remove
fromList List of headers to remove, separated with the separator specified by fromListSeparator
fromListSeparator \s*,\s* Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters
matching All the headers which names match this regular expression are removed

UUID Interceptor

This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.

Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.

Property Name Default Description
type The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id The name of the Flume header to modify
preserveExisting true If the UUID header already exists, should it be preserved - true or false
prefix “” The prefix string constant to prepend to each generated UUID

Morphline Interceptor

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -