content/FlumeUserGuide.html [766:2314]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - settings.
To define the flow within a single agent, you need to link the sources and sinks via a channel. You need to list the sources, sinks and channels for the given agent, and then point the source and sink to a channel. A source instance can specify multiple channels, but a sink instance can only specify one channel. The format is as follows:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
For example, an agent named agent_foo is reading data from an external avro client and sending it to HDFS via a memory channel. The config file weblog.config could look like:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
This will make the events flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. When the agent is started with the weblog.config as its config file, it will instantiate that flow.
After defining the flow, you need to set properties of each source, sink and channel. This is done in the same hierarchical namespace fashion where you set the component type and other values for the properties specific to each component:
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channels.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sinks.<Sink>.<someProperty> = <someValue>
The property “type” needs to be set for each component for Flume to understand what kind of object it needs to be. Each source, sink and channel type has its own set of properties required for it to function as intended. All those need to be set as needed. In the previous example, we have a flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. Here’s an example that shows configuration of each of those components:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
A single Flume agent can contain several independent flows. You can list multiple sources, sinks and channels in a config. These components can be linked to form multiple flows:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
Then you can link the sources and sinks to their corresponding channels (for sources) or channel (for sinks) to setup two different flows. For example, if you need to setup two flows in an agent, one going from an external avro client to external HDFS and another from output of a tail to avro sink, then here’s a config to do that:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
To setup a multi-tier flow, you need to have an avro/thrift sink of first hop pointing to avro/thrift source of the next hop. This will result in the first Flume agent forwarding events to the next Flume agent. For example, if you are periodically sending files (1 file per event) using avro client to a local Flume agent, then this local agent can forward it to another agent that has the mounted for storage.
Weblog agent config:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000
# configure other pieces
#...
HDFS agent config:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...
Here we link the avro-forward-sink from the weblog agent to the avro-collection-source of the hdfs agent. This will result in the events coming from the external appserver source eventually getting stored in HDFS.
As discussed in previous section, Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
The multiplexing select has a further set of properties to bifurcate the flow. This requires specifying a mapping of an event attribute to a set for channel. The selector checks for each configured attribute in the event header. If it matches the specified value, then that event is sent to all the channels mapped to that value. If there’s no match, then the event is sent to set of channels configured as default:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
The mapping allows overlapping the channels for each value.
The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
The selector checks for a header called “State”. If the value is “CA” then its sent to mem-channel-1, if its “AZ” then it goes to file-channel-2 or if its “NY” then both. If the “State” header is not set or doesn’t match any of the three, then it goes to mem-channel-1 which is designated as ‘default’.
The selector also supports optional channels. To specify optional channels for a header, the config parameter ‘optional’ is used in the following way:
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
The selector will attempt to write to the required channels first and will fail the transaction if even one of these channels fails to consume the events. The transaction is reattempted on all of the channels. Once all required channels have consumed the events, then the selector will attempt to write to the optional channels. A failure by any of the optional channels to consume the event is simply ignored and not retried.
If there is an overlap between the optional channels and required channels for a specific header, the channel is considered to be required, and a failure in the channel will cause the entire set of required channels to be retried. For instance, in the above example, for the header “CA” mem-channel-1 is considered to be a required channel even though it is marked both as required and optional, and a failure to write to this channel will cause that event to be retried on all channels configured for the selector.
Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.
Several Flume components support the SSL/TLS protocols in order to communicate with other systems securely.
Component | SSL server or client |
---|---|
Avro Source | server |
Avro Sink | client |
Thrift Source | server |
Thrift Sink | client |
Kafka Source | client |
Kafka Channel | client |
Kafka Sink | client |
HTTP Source | server |
JMS Source | client |
Syslog TCP Source | server |
Multiport Syslog TCP Source | server |
The SSL compatible components have several configuration parameters to set up SSL, like enable SSL flag, keystore / truststore parameters (location, password, type) and additional SSL parameters (eg. disabled protocols).
Enabling SSL for a component is always specified at component level in the agent configuration file. So some components may be configured to use SSL while others not (even with the same component type).
The keystore / truststore setup can be specified at component level or globally.
In case of the component level setup, the keystore / truststore is configured in the agent configuration file through component specific parameters. The advantage of this method is that the components can use different keystores (if this would be needed). The disadvantage is that the keystore parameters must be copied for each component in the agent configuration file. The component level setup is optional, but if it is defined, it has higher precedence than the global parameters.
With the global setup, it is enough to define the keystore / truststore parameters once and use the same settings for all components, which means less and more centralized configuration.
The global setup can be configured either through system properties or through environment variables.
System property | Environment variable | Description |
---|---|---|
javax.net.ssl.keyStore | FLUME_SSL_KEYSTORE_PATH | Keystore location |
javax.net.ssl.keyStorePassword | FLUME_SSL_KEYSTORE_PASSWORD | Keystore password |
javax.net.ssl.keyStoreType | FLUME_SSL_KEYSTORE_TYPE | Keystore type (by default JKS) |
javax.net.ssl.trustStore | FLUME_SSL_TRUSTSTORE_PATH | Truststore location |
javax.net.ssl.trustStorePassword | FLUME_SSL_TRUSTSTORE_PASSWORD | Truststore password |
javax.net.ssl.trustStoreType | FLUME_SSL_TRUSTSTORE_TYPE | Truststore type (by default JKS) |
flume.ssl.include.protocols | FLUME_SSL_INCLUDE_PROTOCOLS | Protocols to include when calculating enabled protocols. A comma (,) separated list. Excluded protocols will be excluded from this list if provided. |
flume.ssl.exclude.protocols | FLUME_SSL_EXCLUDE_PROTOCOLS | Protocols to exclude when calculating enabled protocols. A comma (,) separated list. |
flume.ssl.include.cipherSuites | FLUME_SSL_INCLUDE_CIPHERSUITES | Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list. Excluded cipher suites will be excluded from this list if provided. |
flume.ssl.exclude.cipherSuites | FLUME_SSL_EXCLUDE_CIPHERSUITES | Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list. |
The SSL system properties can either be passed on the command line or by setting the JAVA_OPTS environment variable in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"
Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is a standard way for setting up SSL. On the other hand, specifying passwords in system properties means that the passwords can be seen in the process list. For cases where it is not acceptable, it is also be possible to define the parameters in environment variables. Flume initializes the JSSE system properties from the corresponding environment variables internally in this case.
The SSL environment variables can either be set in the shell environment before starting Flume or in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)
export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password
Please note:
Sources and sinks can have a batch size parameter that determines the maximum number of events they process in one batch. This happens within a channel transaction that has an upper limit called transaction capacity. Batch size must be smaller than the channel’s transaction capacity. There is an explicit check to prevent incompatible settings. This check happens whenever the configuration is read.
Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Example of ipFilterRules
ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.
<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern
example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
Note that the first rule to match will apply as the example below shows from a client on the localhost
This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:” This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:“
Listens on Thrift port and receives events from external Thrift client streams. When paired with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. Thrift source can be configured to start in secure mode by enabling kerberos authentication. agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be thrift |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section) |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. |
agent-principal | – | The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. |
agent-keytab | —- | The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Warning
The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. For stronger reliability guarantees, consider the Spooling Directory Source, Taildir Source or direct integration with Flume via the SDK.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bash or Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. This allows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes, loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will be invoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. The JMS source provides configurable batch size, message selector, user/pass, and message to flume event converter. Note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on command line, or via FLUME_CLASSPATH variable in flume-env.sh.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
createDurableSubscription | false | Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified. |
clientId | – | JMS client identifier set on Connection right after it is created. Required for durable subscriptions. |
durableSubscriptionName | – | Name used to identify the durable subscription. Required for durable subscriptions. |
The JMS source allows pluggable converters, though it’s likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the FlumeEvent.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely the JMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMS server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.
The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki.
SSL transport / server authentication:
If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s own CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via the global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.
Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agent config file.
Client certificate authentication (two-way SSL):
JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).
The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parameters again. For more details about the global SSL setup, see the SSL/TLS support section.
The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password.
In case of client certificate authentication, it is not needed to specify the userName / passwordFile properties for the JMS Source in the Flume agent config file.
Please note:
There are no component level configuration parameters for JMS Source unlike in case of other components. No enable SSL flag either. SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence of the truststore / keystore.
This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.
Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:
To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.
Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | false | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
includePattern | ^.*$ | Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
trackerDir | .flumespool | Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
trackingPolicy | rename | The tracking policy defines how file processing is tracked. It can be “rename” or “tracker_dir”. This parameter is only effective if the deletePolicy is “never”. “rename” - After processing files they get renamed according to the fileSuffix parameter. “tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir. The new tracker file name is derived from the ingested one plus the fileSuffix. |
consumeOrder | oldest | In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory. |
pollDelay | 500 | Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch | false | Whether to monitor sub directories for new files to read. |
maxBackoff | 4000 | The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, up to the value specified by this parameter. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL | What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer | LINE | Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder. |
deserializer.* | Varies per event deserializer. | |
bufferMaxLines | – | (Obsolete) This option is now ignored. |
bufferMaxLineLength | 5000 | (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for an agent named agent-1:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
The following event deserializers ship with Flume.
This deserializer generates one event per line of text input.
Property Name | Default | Description |
---|---|---|
deserializer.maxLineLength | 2048 | Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event. |
deserializer.outputCharset | UTF-8 | Charset to use for encoding events put into the channel. |
This deserializer is able to read an Avro container file, and it generates one event per Avro record in the file. Each event is annotated with a header that indicates the schema used. The body of the event is the binary Avro record data, not including the schema or the rest of the container file elements.
Note that if the spool directory source must retry putting one of these events onto a channel (for example, because the channel is full), then it will reset and retry from the most recent Avro container file sync point. To reduce potential event duplication in such a failure scenario, write sync markers more frequently in your Avro input files.
Property Name | Default | Description |
---|---|---|
deserializer.schemaType | HASH | How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode. |
This deserializer reads a Binary Large Object (BLOB) per event, typically one BLOB per file. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because the entire BLOB is buffered in RAM.
Property Name | Default | Description |
---|---|---|
deserializer | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
Note
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be TAILDIR. |
filegroups | – | Space-separated list of file groups. Each file group indicates a set of files to be tailed. |
filegroups.<filegroupName> | – | Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. |
positionFile | ~/.flume/taildir_position.json | File in JSON format to record the inode, the absolute path and the last position of each tailing file. |
headers.<filegroupName>.<headerKey> | – | Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader | false | Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd | false | Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout | 120000 | Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. |
writePosInterval | 3000 | Interval time (ms) to write the last position of each file on the position file. |
batchSize | 100 | Max number of lines to read and send to the channel at a time. Using the default is usually fine. |
maxBatchCount | Long.MAX_VALUE | Controls the number of batches being read consecutively from the same file. If the source is tailing multiple files and one of them is written at a fast rate, it can prevent other files to be processed, because the busy file would be read in an endless loop. In this case lower this value. |
backoffSleepIncrement | 1000 | The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep | 5000 | The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching | true | Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
Warning
This source is highly experimental and may change between minor versions of Flume. Use at your own risk.
Experimental source that connects via Streaming API to the 1% sample twitter firehose, continuously downloads tweets, converts them to Avro format and sends Avro events to a downstream Flume sink. Requires the consumer and access tokens and secrets of a Twitter developer account. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | Maximum number of twitter messages to put in a single batch |
maxBatchDurationMillis | 1000 | Maximum number of milliseconds to wait before closing a batch |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
Property Name | Default | Description | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
channels | – | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type | – | The component type name, needs to be org.apache.flume.source.kafka.KafkaSource | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the source | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kafka.consumer.group.id | flume | Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kafka.topics | – | Comma-separated list of topics the Kafka consumer will read messages from. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kafka.topics.regex | – | Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
batchSize | 1000 | Maximum number of messages written to Channel in one batch | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
batchDurationMillis | 1000 | Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
backoffSleepIncrement | 1000 | Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxBackoffSleep | 5000 | Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
useFlumeEventFormat | false | By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
setTopicHeader | true | When set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
topicHeader | topic | Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop. | - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.10.0/FlumeUserGuide.html [518:2066]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - settings.
Component | SSL server or client |
---|---|
Avro Source | server |
Avro Sink | client |
Thrift Source | server |
Thrift Sink | client |
Kafka Source | client |
Kafka Channel | client |
Kafka Sink | client |
HTTP Source | server |
JMS Source | client |
Syslog TCP Source | server |
Multiport Syslog TCP Source | server |
The SSL compatible components have several configuration parameters to set up SSL, like enable SSL flag, keystore / truststore parameters (location, password, type) and additional SSL parameters (eg. disabled protocols).
Enabling SSL for a component is always specified at component level in the agent configuration file. So some components may be configured to use SSL while others not (even with the same component type).
The keystore / truststore setup can be specified at component level or globally.
In case of the component level setup, the keystore / truststore is configured in the agent configuration file through component specific parameters. The advantage of this method is that the components can use different keystores (if this would be needed). The disadvantage is that the keystore parameters must be copied for each component in the agent configuration file. The component level setup is optional, but if it is defined, it has higher precedence than the global parameters.
With the global setup, it is enough to define the keystore / truststore parameters once and use the same settings for all components, which means less and more centralized configuration.
The global setup can be configured either through system properties or through environment variables.
System property | Environment variable | Description |
---|---|---|
javax.net.ssl.keyStore | FLUME_SSL_KEYSTORE_PATH | Keystore location |
javax.net.ssl.keyStorePassword | FLUME_SSL_KEYSTORE_PASSWORD | Keystore password |
javax.net.ssl.keyStoreType | FLUME_SSL_KEYSTORE_TYPE | Keystore type (by default JKS) |
javax.net.ssl.trustStore | FLUME_SSL_TRUSTSTORE_PATH | Truststore location |
javax.net.ssl.trustStorePassword | FLUME_SSL_TRUSTSTORE_PASSWORD | Truststore password |
javax.net.ssl.trustStoreType | FLUME_SSL_TRUSTSTORE_TYPE | Truststore type (by default JKS) |
flume.ssl.include.protocols | FLUME_SSL_INCLUDE_PROTOCOLS | Protocols to include when calculating enabled protocols. A comma (,) separated list. Excluded protocols will be excluded from this list if provided. |
flume.ssl.exclude.protocols | FLUME_SSL_EXCLUDE_PROTOCOLS | Protocols to exclude when calculating enabled protocols. A comma (,) separated list. |
flume.ssl.include.cipherSuites | FLUME_SSL_INCLUDE_CIPHERSUITES | Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list. Excluded cipher suites will be excluded from this list if provided. |
flume.ssl.exclude.cipherSuites | FLUME_SSL_EXCLUDE_CIPHERSUITES | Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list. |
The SSL system properties can either be passed on the command line or by setting the JAVA_OPTS environment variable in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"
Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is a standard way for setting up SSL. On the other hand, specifying passwords in system properties means that the passwords can be seen in the process list. For cases where it is not acceptable, it is also be possible to define the parameters in environment variables. Flume initializes the JSSE system properties from the corresponding environment variables internally in this case.
The SSL environment variables can either be set in the shell environment before starting Flume or in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)
export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password
Please note:
Sources and sinks can have a batch size parameter that determines the maximum number of events they process in one batch. This happens within a channel transaction that has an upper limit called transaction capacity. Batch size must be smaller than the channel’s transaction capacity. There is an explicit check to prevent incompatible settings. This check happens whenever the configuration is read.
Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Example of ipFilterRules
ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.
<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern
example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
Note that the first rule to match will apply as the example below shows from a client on the localhost
This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:” This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:“
Listens on Thrift port and receives events from external Thrift client streams. When paired with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. Thrift source can be configured to start in secure mode by enabling kerberos authentication. agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be thrift |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section) |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. |
agent-principal | – | The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. |
agent-keytab | —- | The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Warning
The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. For stronger reliability guarantees, consider the Spooling Directory Source, Taildir Source or direct integration with Flume via the SDK.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bash or Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. This allows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes, loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will be invoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. The JMS source provides configurable batch size, message selector, user/pass, and message to flume event converter. Note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on command line, or via FLUME_CLASSPATH variable in flume-env.sh.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
createDurableSubscription | false | Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified. |
clientId | – | JMS client identifier set on Connection right after it is created. Required for durable subscriptions. |
durableSubscriptionName | – | Name used to identify the durable subscription. Required for durable subscriptions. |
The JMS source allows pluggable converters, though it’s likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the FlumeEvent.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely the JMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMS server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.
The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki.
SSL transport / server authentication:
If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s own CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via the global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.
Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agent config file.
Client certificate authentication (two-way SSL):
JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).
The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parameters again. For more details about the global SSL setup, see the SSL/TLS support section.
The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password.
In case of client certificate authentication, it is not needed to specify the userName / passwordFile properties for the JMS Source in the Flume agent config file.
Please note:
There are no component level configuration parameters for JMS Source unlike in case of other components. No enable SSL flag either. SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence of the truststore / keystore.
This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.
Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:
To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.
Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | false | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
includePattern | ^.*$ | Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
trackerDir | .flumespool | Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
trackingPolicy | rename | The tracking policy defines how file processing is tracked. It can be “rename” or “tracker_dir”. This parameter is only effective if the deletePolicy is “never”. “rename” - After processing files they get renamed according to the fileSuffix parameter. “tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir. The new tracker file name is derived from the ingested one plus the fileSuffix. |
consumeOrder | oldest | In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory. |
pollDelay | 500 | Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch | false | Whether to monitor sub directories for new files to read. |
maxBackoff | 4000 | The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, up to the value specified by this parameter. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL | What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer | LINE | Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder. |
deserializer.* | Varies per event deserializer. | |
bufferMaxLines | – | (Obsolete) This option is now ignored. |
bufferMaxLineLength | 5000 | (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for an agent named agent-1:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
The following event deserializers ship with Flume.
This deserializer generates one event per line of text input.
Property Name | Default | Description |
---|---|---|
deserializer.maxLineLength | 2048 | Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event. |
deserializer.outputCharset | UTF-8 | Charset to use for encoding events put into the channel. |
This deserializer is able to read an Avro container file, and it generates one event per Avro record in the file. Each event is annotated with a header that indicates the schema used. The body of the event is the binary Avro record data, not including the schema or the rest of the container file elements.
Note that if the spool directory source must retry putting one of these events onto a channel (for example, because the channel is full), then it will reset and retry from the most recent Avro container file sync point. To reduce potential event duplication in such a failure scenario, write sync markers more frequently in your Avro input files.
Property Name | Default | Description |
---|---|---|
deserializer.schemaType | HASH | How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode. |
This deserializer reads a Binary Large Object (BLOB) per event, typically one BLOB per file. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because the entire BLOB is buffered in RAM.
Property Name | Default | Description |
---|---|---|
deserializer | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
Note
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be TAILDIR. |
filegroups | – | Space-separated list of file groups. Each file group indicates a set of files to be tailed. |
filegroups.<filegroupName> | – | Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. |
positionFile | ~/.flume/taildir_position.json | File in JSON format to record the inode, the absolute path and the last position of each tailing file. |
headers.<filegroupName>.<headerKey> | – | Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader | false | Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd | false | Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout | 120000 | Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. |
writePosInterval | 3000 | Interval time (ms) to write the last position of each file on the position file. |
batchSize | 100 | Max number of lines to read and send to the channel at a time. Using the default is usually fine. |
maxBatchCount | Long.MAX_VALUE | Controls the number of batches being read consecutively from the same file. If the source is tailing multiple files and one of them is written at a fast rate, it can prevent other files to be processed, because the busy file would be read in an endless loop. In this case lower this value. |
backoffSleepIncrement | 1000 | The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep | 5000 | The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching | true | Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
Warning
This source is highly experimental and may change between minor versions of Flume. Use at your own risk.
Experimental source that connects via Streaming API to the 1% sample twitter firehose, continuously downloads tweets, converts them to Avro format and sends Avro events to a downstream Flume sink. Requires the consumer and access tokens and secrets of a Twitter developer account. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | Maximum number of twitter messages to put in a single batch |
maxBatchDurationMillis | 1000 | Maximum number of milliseconds to wait before closing a batch |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the source |
kafka.consumer.group.id | flume | Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group |
kafka.topics | – | Comma-separated list of topics the Kafka consumer will read messages from. |
kafka.topics.regex | – | Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists. |
batchSize | 1000 | Maximum number of messages written to Channel in one batch |
batchDurationMillis | 1000 | Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. |
backoffSleepIncrement | 1000 | Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
maxBackoffSleep | 5000 | Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
useFlumeEventFormat | false | By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side. |
setTopicHeader | true | When set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property. |
topicHeader | topic | Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop. | - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -