content/releases/content/1.5.0.1/FlumeUserGuide.html [3647:4483]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Custom Channel
A custom channel is your own implementation of the Channel interface. A
custom channel’s class and its dependencies must be included in the agent’s
classpath when starting the Flume agent. The type of the custom channel is
its FQCN.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be a FQCN |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
Flume Channel Selectors
If the type is not specified, then defaults to “replicating”.
Replicating Channel Selector (default)
Required properties are in bold.
Property Name |
Default |
Description |
selector.type |
replicating |
The component type name, needs to be replicating |
selector.optional |
– |
Set of channels to be marked as optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
In the above configuration, c3 is an optional channel. Failure to write to c3 is
simply ignored. Since c1 and c2 are not marked optional, failure to write to
those channels will cause the transaction to fail.
Multiplexing Channel Selector
Required properties are in bold.
Property Name |
Default |
Description |
selector.type |
replicating |
The component type name, needs to be multiplexing |
selector.header |
flume.selector.header |
|
selector.default |
– |
|
selector.mapping.* |
– |
|
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Custom Channel Selector
A custom channel selector is your own implementation of the ChannelSelector
interface. A custom channel selector’s class and its dependencies must be
included in the agent’s classpath when starting the Flume agent. The type of
the custom channel selector is its FQCN.
Property Name |
Default |
Description |
selector.type |
– |
The component type name, needs to be your FQCN |
Example for agent named a1 and its source called r1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector
Flume Sink Processors
Sink groups allow users to group multiple sinks into one entity.
Sink processors can be used to provide load balancing capabilities over all
sinks inside the group or to achieve fail over from one sink to another in
case of temporal failure.
Required properties are in bold.
Property Name |
Default |
Description |
sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be default, failover or load_balance |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
Default Sink Processor
Default sink processor accepts only a single sink. User is not forced
to create processor (sink group) for single sinks. Instead user can follow
the source - channel - sink pattern that was explained above in this user
guide.
Failover Sink Processor
Failover Sink Processor maintains a prioritized list of sinks, guaranteeing
that so long as one is available events will be processed (delivered).
The failover mechanism works by relegating failed sinks to a pool where
they are assigned a cool down period, increasing with sequential failures
before they are retried. Once a sink successfully sends an event, it is
restored to the live pool.
To configure, set a sink groups processor to failover and set
priorities for all individual sinks. All specified priorities must
be unique. Furthermore, upper limit to failover time can be set
(in milliseconds) using maxpenalty property.
Required properties are in bold.
Property Name |
Default |
Description |
sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be failover |
processor.priority.<sinkName> |
– |
<sinkName> must be one of the sink instances associated with the current sink group |
processor.maxpenalty |
30000 |
(in millis) |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink Processor
Load balancing sink processor provides the ability to load-balance flow over
multiple sinks. It maintains an indexed list of active sinks on which the
load must be distributed. Implementation supports distributing load using
either via round_robin or random selection mechanisms.
The choice of selection mechanism defaults to round_robin type,
but can be overridden via configuration. Custom selection mechanisms are
supported via custom classes that inherits from AbstractSinkSelector.
When invoked, this selector picks the next sink using its configured selection
mechanism and invokes it. For round_robin and random In case the selected sink
fails to deliver the event, the processor picks the next available sink via
its configured selection mechanism. This implementation does not blacklist
the failing sink and instead continues to optimistically attempt every
available sink. If all sinks invocations result in failure, the selector
propagates the failure to the sink runner.
If backoff is enabled, the sink processor will blacklist
sinks that fail, removing them for selection for a given timeout. When the
timeout ends, if the sink is still unresponsive timeout is increased
exponentially to avoid potentially getting stuck in long waits on unresponsive
sinks. With this disabled, in round-robin all the failed sinks load will be
passed to the next sink in line and thus not evenly balanced
Required properties are in bold.
Property Name |
Default |
Description |
processor.sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be load_balance |
processor.backoff |
false |
Should failed sinks be backed off exponentially. |
processor.selector |
round_robin |
Selection mechanism. Must be either round_robin, random
or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut |
30000 |
Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Custom Sink Processor
Custom sink processors are not supported at the moment.
Event Serializers
The file_roll sink and the hdfs sink both support the
EventSerializer interface. Details of the EventSerializers that ship with
Flume are provided below.
Body Text Serializer
Alias: text. This interceptor writes the body of the event to an output
stream without any transformation or modification. The event headers are
ignored. Configuration options are as follows:
Property Name |
Default |
Description |
appendNewline |
true |
Whether a newline will be appended to each event at write time. The default
of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro
container file. The schema used is the same schema used for Flume events
in the Avro RPC mechanism. This serializers inherits from the
AbstractAvroEventSerializer class. Configuration options are as follows:
Property Name |
Default |
Description |
syncIntervalBytes |
2048000 |
Avro sync interval, in approximate bytes. |
compressionCodec |
null |
Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
Flume Interceptors
Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors
are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can
modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports
chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names
in the configuration. Interceptors are specified as a whitespace separated list in the source configuration.
The order in which the interceptors are specified is the order in which they are invoked.
The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors
can modify or drop events. If an interceptor needs to drop events, it just does not return that event in
the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors
are named components, here is an example of how they are created through configuration:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves
configurable and can be passed configuration values just like they are passed to any other configurable component.
In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor
are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN)
or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also use
the HostInterceptor.
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
inserts a header with key timestamp whose value is the relevant timestamp. This interceptor
can preserve an existing timestamp if it is already present in the configuration.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be timestamp or the FQCN |
preserveExisting |
false |
If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header
with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be host |
preserveExisting |
false |
If the host header already exists, should it be preserved - true or false |
useIP |
true |
Use the IP Address if true, else use hostname. |
hostHeader |
host |
The header key to be used. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname
Static Interceptor
Static interceptor allows user to append a static header with static value to all events.
The current implementation does not allow specifying multiple headers at one time. Instead user might chain
multiple static interceptors each defining one static header.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be static |
preserveExisting |
true |
If configured header already exists, should it be preserved - true or false |
key |
key |
Name of header that should be created |
value |
value |
Static value that should be created |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
UUID Interceptor
This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.
Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.
Property Name |
Default |
Description |
type |
– |
The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName |
id |
The name of the Flume header to modify |
preserveExisting |
true |
If the UUID header already exists, should it be preserved - true or false |
prefix |
“” |
The prefix string constant to prepend to each generated UUID |
Morphline Interceptor
This interceptor filters the events through a morphline configuration file that defines a chain of transformation commands that pipe records from one command to another.
For example the morphline can ignore certain events or alter or insert certain event headers via regular expression based pattern matching, or it can auto-detect and set a MIME type via Apache Tika on events that are intercepted. For example, this kind of packet sniffing can be used for content based dynamic routing in a Flume topology.
MorphlineInterceptor can also help to implement dynamic routing to multiple Apache Solr collections (e.g. for multi-tenancy).
Currently, there is a restriction in that the morphline of an interceptor must not generate more than one output record for each input event. This interceptor is not intended for heavy duty ETL processing - if you need this consider moving ETL processing from the Flume Source to a Flume Sink, e.g. to a MorphlineSolrSink.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name has to be org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder |
morphlineFile |
– |
The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId |
null |
Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
Regex Filtering Interceptor
This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression.
The supplied regular expression can be used to include events or exclude events.
Property Name |
Default |
Description |
type |
– |
The component type name has to be regex_filter |
regex |
”.*” |
Regular expression for matching against events |
excludeEvents |
false |
If true, regex determines events to exclude, otherwise regex determines
events to include. |
Example 1:
If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
Example 2:
If the Flume event body contained 2012-10-18 18:47:57,614 some log line and the following configuration was used
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
the extracted event will contain the same body but the following headers will have been added timestamp=>1350611220000
Log4J Appender
Appends Log4j events to a flume agent’s avro source. A client using this
appender must have the flume-ng-sdk in the classpath (eg,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
content/releases/content/1.5.0/FlumeUserGuide.html [3646:4482]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Pseudo Transaction Channel
Warning
The Pseudo Transaction Channel is only for unit testing purposes
and is NOT meant for production use.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity |
50 |
The max number of events stored in the channel |
keep-alive |
3 |
Timeout in seconds for adding or removing an event |
Custom Channel
A custom channel is your own implementation of the Channel interface. A
custom channel’s class and its dependencies must be included in the agent’s
classpath when starting the Flume agent. The type of the custom channel is
its FQCN.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be a FQCN |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
Flume Channel Selectors
If the type is not specified, then defaults to “replicating”.
Replicating Channel Selector (default)
Required properties are in bold.
Property Name |
Default |
Description |
selector.type |
replicating |
The component type name, needs to be replicating |
selector.optional |
– |
Set of channels to be marked as optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
In the above configuration, c3 is an optional channel. Failure to write to c3 is
simply ignored. Since c1 and c2 are not marked optional, failure to write to
those channels will cause the transaction to fail.
Multiplexing Channel Selector
Required properties are in bold.
Property Name |
Default |
Description |
selector.type |
replicating |
The component type name, needs to be multiplexing |
selector.header |
flume.selector.header |
|
selector.default |
– |
|
selector.mapping.* |
– |
|
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Custom Channel Selector
A custom channel selector is your own implementation of the ChannelSelector
interface. A custom channel selector’s class and its dependencies must be
included in the agent’s classpath when starting the Flume agent. The type of
the custom channel selector is its FQCN.
Property Name |
Default |
Description |
selector.type |
– |
The component type name, needs to be your FQCN |
Example for agent named a1 and its source called r1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector
Flume Sink Processors
Sink groups allow users to group multiple sinks into one entity.
Sink processors can be used to provide load balancing capabilities over all
sinks inside the group or to achieve fail over from one sink to another in
case of temporal failure.
Required properties are in bold.
Property Name |
Default |
Description |
sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be default, failover or load_balance |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
Default Sink Processor
Default sink processor accepts only a single sink. User is not forced
to create processor (sink group) for single sinks. Instead user can follow
the source - channel - sink pattern that was explained above in this user
guide.
Failover Sink Processor
Failover Sink Processor maintains a prioritized list of sinks, guaranteeing
that so long as one is available events will be processed (delivered).
The failover mechanism works by relegating failed sinks to a pool where
they are assigned a cool down period, increasing with sequential failures
before they are retried. Once a sink successfully sends an event, it is
restored to the live pool.
To configure, set a sink groups processor to failover and set
priorities for all individual sinks. All specified priorities must
be unique. Furthermore, upper limit to failover time can be set
(in milliseconds) using maxpenalty property.
Required properties are in bold.
Property Name |
Default |
Description |
sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be failover |
processor.priority.<sinkName> |
– |
<sinkName> must be one of the sink instances associated with the current sink group |
processor.maxpenalty |
30000 |
(in millis) |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink Processor
Load balancing sink processor provides the ability to load-balance flow over
multiple sinks. It maintains an indexed list of active sinks on which the
load must be distributed. Implementation supports distributing load using
either via round_robin or random selection mechanisms.
The choice of selection mechanism defaults to round_robin type,
but can be overridden via configuration. Custom selection mechanisms are
supported via custom classes that inherits from AbstractSinkSelector.
When invoked, this selector picks the next sink using its configured selection
mechanism and invokes it. For round_robin and random In case the selected sink
fails to deliver the event, the processor picks the next available sink via
its configured selection mechanism. This implementation does not blacklist
the failing sink and instead continues to optimistically attempt every
available sink. If all sinks invocations result in failure, the selector
propagates the failure to the sink runner.
If backoff is enabled, the sink processor will blacklist
sinks that fail, removing them for selection for a given timeout. When the
timeout ends, if the sink is still unresponsive timeout is increased
exponentially to avoid potentially getting stuck in long waits on unresponsive
sinks. With this disabled, in round-robin all the failed sinks load will be
passed to the next sink in line and thus not evenly balanced
Required properties are in bold.
Property Name |
Default |
Description |
processor.sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be load_balance |
processor.backoff |
false |
Should failed sinks be backed off exponentially. |
processor.selector |
round_robin |
Selection mechanism. Must be either round_robin, random
or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut |
30000 |
Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Custom Sink Processor
Custom sink processors are not supported at the moment.
Event Serializers
The file_roll sink and the hdfs sink both support the
EventSerializer interface. Details of the EventSerializers that ship with
Flume are provided below.
Body Text Serializer
Alias: text. This interceptor writes the body of the event to an output
stream without any transformation or modification. The event headers are
ignored. Configuration options are as follows:
Property Name |
Default |
Description |
appendNewline |
true |
Whether a newline will be appended to each event at write time. The default
of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro
container file. The schema used is the same schema used for Flume events
in the Avro RPC mechanism. This serializers inherits from the
AbstractAvroEventSerializer class. Configuration options are as follows:
Property Name |
Default |
Description |
syncIntervalBytes |
2048000 |
Avro sync interval, in approximate bytes. |
compressionCodec |
null |
Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
Flume Interceptors
Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors
are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can
modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports
chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names
in the configuration. Interceptors are specified as a whitespace separated list in the source configuration.
The order in which the interceptors are specified is the order in which they are invoked.
The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors
can modify or drop events. If an interceptor needs to drop events, it just does not return that event in
the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors
are named components, here is an example of how they are created through configuration:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves
configurable and can be passed configuration values just like they are passed to any other configurable component.
In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor
are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN)
or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also use
the HostInterceptor.
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
inserts a header with key timestamp whose value is the relevant timestamp. This interceptor
can preserve an existing timestamp if it is already present in the configuration.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be timestamp or the FQCN |
preserveExisting |
false |
If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header
with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be host |
preserveExisting |
false |
If the host header already exists, should it be preserved - true or false |
useIP |
true |
Use the IP Address if true, else use hostname. |
hostHeader |
host |
The header key to be used. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname
Static Interceptor
Static interceptor allows user to append a static header with static value to all events.
The current implementation does not allow specifying multiple headers at one time. Instead user might chain
multiple static interceptors each defining one static header.
Property Name |
Default |
Description |
type |
– |
The component type name, has to be static |
preserveExisting |
true |
If configured header already exists, should it be preserved - true or false |
key |
key |
Name of header that should be created |
value |
value |
Static value that should be created |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
UUID Interceptor
This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.
Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.
Property Name |
Default |
Description |
type |
– |
The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName |
id |
The name of the Flume header to modify |
preserveExisting |
true |
If the UUID header already exists, should it be preserved - true or false |
prefix |
“” |
The prefix string constant to prepend to each generated UUID |
Morphline Interceptor
This interceptor filters the events through a morphline configuration file that defines a chain of transformation commands that pipe records from one command to another.
For example the morphline can ignore certain events or alter or insert certain event headers via regular expression based pattern matching, or it can auto-detect and set a MIME type via Apache Tika on events that are intercepted. For example, this kind of packet sniffing can be used for content based dynamic routing in a Flume topology.
MorphlineInterceptor can also help to implement dynamic routing to multiple Apache Solr collections (e.g. for multi-tenancy).
Currently, there is a restriction in that the morphline of an interceptor must not generate more than one output record for each input event. This interceptor is not intended for heavy duty ETL processing - if you need this consider moving ETL processing from the Flume Source to a Flume Sink, e.g. to a MorphlineSolrSink.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name has to be org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder |
morphlineFile |
– |
The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId |
null |
Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
Regex Filtering Interceptor
This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression.
The supplied regular expression can be used to include events or exclude events.
Property Name |
Default |
Description |
type |
– |
The component type name has to be regex_filter |
regex |
”.*” |
Regular expression for matching against events |
excludeEvents |
false |
If true, regex determines events to exclude, otherwise regex determines
events to include. |
Example 1:
If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
Example 2:
If the Flume event body contained 2012-10-18 18:47:57,614 some log line and the following configuration was used
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
the extracted event will contain the same body but the following headers will have been added timestamp=>1350611220000