content/releases/content/1.10.1/FlumeUserGuide.html [616:1623]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - you need to setup two flows in an agent, one going from an external avro client to external HDFS and another from output of a tail to avro sink, then here’s a config to do that:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

Configuring a multi agent flow

To setup a multi-tier flow, you need to have an avro/thrift sink of first hop pointing to avro/thrift source of the next hop. This will result in the first Flume agent forwarding events to the next Flume agent. For example, if you are periodically sending files (1 file per event) using avro client to a local Flume agent, then this local agent can forward it to another agent that has the mounted for storage.

Weblog agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel

# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000

# configure other pieces
#...

HDFS agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

Here we link the avro-forward-sink from the weblog agent to the avro-collection-source of the hdfs agent. This will result in the events coming from the external appserver source eventually getting stored in HDFS.

Fan out flow

As discussed in previous section, Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

The multiplexing select has a further set of properties to bifurcate the flow. This requires specifying a mapping of an event attribute to a set for channel. The selector checks for each configured attribute in the event header. If it matches the specified value, then that event is sent to all the channels mapped to that value. If there’s no match, then the event is sent to set of channels configured as default:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

The mapping allows overlapping the channels for each value.

The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector checks for a header called “State”. If the value is “CA” then its sent to mem-channel-1, if its “AZ” then it goes to file-channel-2 or if its “NY” then both. If the “State” header is not set or doesn’t match any of the three, then it goes to mem-channel-1 which is designated as ‘default’.

The selector also supports optional channels. To specify optional channels for a header, the config parameter ‘optional’ is used in the following way:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector will attempt to write to the required channels first and will fail the transaction if even one of these channels fails to consume the events. The transaction is reattempted on all of the channels. Once all required channels have consumed the events, then the selector will attempt to write to the optional channels. A failure by any of the optional channels to consume the event is simply ignored and not retried.

If there is an overlap between the optional channels and required channels for a specific header, the channel is considered to be required, and a failure in the channel will cause the entire set of required channels to be retried. For instance, in the above example, for the header “CA” mem-channel-1 is considered to be a required channel even though it is marked both as required and optional, and a failure to write to this channel will cause that event to be retried on all channels configured for the selector.

Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.

SSL/TLS support

Several Flume components support the SSL/TLS protocols in order to communicate with other systems securely.

Component SSL server or client
Avro Source server
Avro Sink client
Thrift Source server
Thrift Sink client
Kafka Source client
Kafka Channel client
Kafka Sink client
HTTP Source server
JMS Source client
Syslog TCP Source server
Multiport Syslog TCP Source server

The SSL compatible components have several configuration parameters to set up SSL, like enable SSL flag, keystore / truststore parameters (location, password, type) and additional SSL parameters (eg. disabled protocols).

Enabling SSL for a component is always specified at component level in the agent configuration file. So some components may be configured to use SSL while others not (even with the same component type).

The keystore / truststore setup can be specified at component level or globally.

In case of the component level setup, the keystore / truststore is configured in the agent configuration file through component specific parameters. The advantage of this method is that the components can use different keystores (if this would be needed). The disadvantage is that the keystore parameters must be copied for each component in the agent configuration file. The component level setup is optional, but if it is defined, it has higher precedence than the global parameters.

With the global setup, it is enough to define the keystore / truststore parameters once and use the same settings for all components, which means less and more centralized configuration.

The global setup can be configured either through system properties or through environment variables.

System property Environment variable Description
javax.net.ssl.keyStore FLUME_SSL_KEYSTORE_PATH Keystore location
javax.net.ssl.keyStorePassword FLUME_SSL_KEYSTORE_PASSWORD Keystore password
javax.net.ssl.keyStoreType FLUME_SSL_KEYSTORE_TYPE Keystore type (by default JKS)
javax.net.ssl.trustStore FLUME_SSL_TRUSTSTORE_PATH Truststore location
javax.net.ssl.trustStorePassword FLUME_SSL_TRUSTSTORE_PASSWORD Truststore password
javax.net.ssl.trustStoreType FLUME_SSL_TRUSTSTORE_TYPE Truststore type (by default JKS)
flume.ssl.include.protocols FLUME_SSL_INCLUDE_PROTOCOLS Protocols to include when calculating enabled protocols. A comma (,) separated list. Excluded protocols will be excluded from this list if provided.
flume.ssl.exclude.protocols FLUME_SSL_EXCLUDE_PROTOCOLS Protocols to exclude when calculating enabled protocols. A comma (,) separated list.
flume.ssl.include.cipherSuites FLUME_SSL_INCLUDE_CIPHERSUITES Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list. Excluded cipher suites will be excluded from this list if provided.
flume.ssl.exclude.cipherSuites FLUME_SSL_EXCLUDE_CIPHERSUITES Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list.

The SSL system properties can either be passed on the command line or by setting the JAVA_OPTS environment variable in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"

Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is a standard way for setting up SSL. On the other hand, specifying passwords in system properties means that the passwords can be seen in the process list. For cases where it is not acceptable, it is also be possible to define the parameters in environment variables. Flume initializes the JSSE system properties from the corresponding environment variables internally in this case.

The SSL environment variables can either be set in the shell environment before starting Flume or in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password

Please note:

Source and sink batch sizes and channel transaction capacities

Sources and sinks can have a batch size parameter that determines the maximum number of events they process in one batch. This happens within a channel transaction that has an upper limit called transaction capacity. Batch size must be smaller than the channel’s transaction capacity. There is an explicit check to prevent incompatible settings. This check happens whenever the configuration is read.

Flume Sources

Avro Source

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Example of ipFilterRules

ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

Note that the first rule to match will apply as the example below shows from a client on the localhost

This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:” This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:

Thrift Source

Listens on Thrift port and receives events from external Thrift client streams. When paired with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. Thrift source can be configured to start in secure mode by enabling kerberos authentication. agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section)
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
kerberos false Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Warning

The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. For stronger reliability guarantees, consider the Spooling Directory Source, Taildir Source or direct integration with Flume via the SDK.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bash or Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. This allows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes, loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will be invoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. The JMS source provides configurable batch size, message selector, user/pass, and message to flume event converter. Note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on command line, or via FLUME_CLASSPATH variable in flume-env.sh.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified.
clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.
JMS message converter

The JMS source allows pluggable converters, though it’s likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the FlumeEvent.

BytesMessage:
Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GB of data per message.
TextMessage:
Text of message is converted to a byte array and copied to the body of the FlumeEvent. The default converter uses UTF-8 by default but this is configurable.
ObjectMessage:
Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream and the resulting array is copied to the body of the FlumeEvent.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
SSL and JMS Source

JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely the JMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMS server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.

The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki.

SSL transport / server authentication:

If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s own CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via the global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.

Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agent config file.

Client certificate authentication (two-way SSL):

JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).

The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parameters again. For more details about the global SSL setup, see the SSL/TLS support section.

The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password.

In case of client certificate authentication, it is not needed to specify the userName / passwordFile properties for the JMS Source in the Flume agent config file.

Please note:

There are no component level configuration parameters for JMS Source unlike in case of other components. No enable SSL flag either. SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence of the truststore / keystore.

Spooling Directory Source

This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.

Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:

  1. If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
  2. If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.

Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.

Property Name Default Description
channels  
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$ Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
ignorePattern ^$ Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
trackingPolicy rename The tracking policy defines how file processing is tracked. It can be “rename” or “tracker_dir”. This parameter is only effective if the deletePolicy is “never”. “rename” - After processing files they get renamed according to the fileSuffix parameter. “tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir. The new tracker file name is derived from the ingested one plus the fileSuffix.
consumeOrder oldest In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory.
pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.9.0/FlumeUserGuide.html [539:1546]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - you need to setup two flows in an agent, one going from an external avro client to external HDFS and another from output of a tail to avro sink, then here’s a config to do that:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

Configuring a multi agent flow

To setup a multi-tier flow, you need to have an avro/thrift sink of first hop pointing to avro/thrift source of the next hop. This will result in the first Flume agent forwarding events to the next Flume agent. For example, if you are periodically sending files (1 file per event) using avro client to a local Flume agent, then this local agent can forward it to another agent that has the mounted for storage.

Weblog agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel

# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000

# configure other pieces
#...

HDFS agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

Here we link the avro-forward-sink from the weblog agent to the avro-collection-source of the hdfs agent. This will result in the events coming from the external appserver source eventually getting stored in HDFS.

Fan out flow

As discussed in previous section, Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

The multiplexing select has a further set of properties to bifurcate the flow. This requires specifying a mapping of an event attribute to a set for channel. The selector checks for each configured attribute in the event header. If it matches the specified value, then that event is sent to all the channels mapped to that value. If there’s no match, then the event is sent to set of channels configured as default:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

The mapping allows overlapping the channels for each value.

The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector checks for a header called “State”. If the value is “CA” then its sent to mem-channel-1, if its “AZ” then it goes to file-channel-2 or if its “NY” then both. If the “State” header is not set or doesn’t match any of the three, then it goes to mem-channel-1 which is designated as ‘default’.

The selector also supports optional channels. To specify optional channels for a header, the config parameter ‘optional’ is used in the following way:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector will attempt to write to the required channels first and will fail the transaction if even one of these channels fails to consume the events. The transaction is reattempted on all of the channels. Once all required channels have consumed the events, then the selector will attempt to write to the optional channels. A failure by any of the optional channels to consume the event is simply ignored and not retried.

If there is an overlap between the optional channels and required channels for a specific header, the channel is considered to be required, and a failure in the channel will cause the entire set of required channels to be retried. For instance, in the above example, for the header “CA” mem-channel-1 is considered to be a required channel even though it is marked both as required and optional, and a failure to write to this channel will cause that event to be retried on all channels configured for the selector.

Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.

SSL/TLS support

Several Flume components support the SSL/TLS protocols in order to communicate with other systems securely.

Component SSL server or client
Avro Source server
Avro Sink client
Thrift Source server
Thrift Sink client
Kafka Source client
Kafka Channel client
Kafka Sink client
HTTP Source server
JMS Source client
Syslog TCP Source server
Multiport Syslog TCP Source server

The SSL compatible components have several configuration parameters to set up SSL, like enable SSL flag, keystore / truststore parameters (location, password, type) and additional SSL parameters (eg. disabled protocols).

Enabling SSL for a component is always specified at component level in the agent configuration file. So some components may be configured to use SSL while others not (even with the same component type).

The keystore / truststore setup can be specified at component level or globally.

In case of the component level setup, the keystore / truststore is configured in the agent configuration file through component specific parameters. The advantage of this method is that the components can use different keystores (if this would be needed). The disadvantage is that the keystore parameters must be copied for each component in the agent configuration file. The component level setup is optional, but if it is defined, it has higher precedence than the global parameters.

With the global setup, it is enough to define the keystore / truststore parameters once and use the same settings for all components, which means less and more centralized configuration.

The global setup can be configured either through system properties or through environment variables.

System property Environment variable Description
javax.net.ssl.keyStore FLUME_SSL_KEYSTORE_PATH Keystore location
javax.net.ssl.keyStorePassword FLUME_SSL_KEYSTORE_PASSWORD Keystore password
javax.net.ssl.keyStoreType FLUME_SSL_KEYSTORE_TYPE Keystore type (by default JKS)
javax.net.ssl.trustStore FLUME_SSL_TRUSTSTORE_PATH Truststore location
javax.net.ssl.trustStorePassword FLUME_SSL_TRUSTSTORE_PASSWORD Truststore password
javax.net.ssl.trustStoreType FLUME_SSL_TRUSTSTORE_TYPE Truststore type (by default JKS)
flume.ssl.include.protocols FLUME_SSL_INCLUDE_PROTOCOLS Protocols to include when calculating enabled protocols. A comma (,) separated list. Excluded protocols will be excluded from this list if provided.
flume.ssl.exclude.protocols FLUME_SSL_EXCLUDE_PROTOCOLS Protocols to exclude when calculating enabled protocols. A comma (,) separated list.
flume.ssl.include.cipherSuites FLUME_SSL_INCLUDE_CIPHERSUITES Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list. Excluded cipher suites will be excluded from this list if provided.
flume.ssl.exclude.cipherSuites FLUME_SSL_EXCLUDE_CIPHERSUITES Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list.

The SSL system properties can either be passed on the command line or by setting the JAVA_OPTS environment variable in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"

Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is a standard way for setting up SSL. On the other hand, specifying passwords in system properties means that the passwords can be seen in the process list. For cases where it is not acceptable, it is also be possible to define the parameters in environment variables. Flume initializes the JSSE system properties from the corresponding environment variables internally in this case.

The SSL environment variables can either be set in the shell environment before starting Flume or in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password

Please note:

  • SSL must be enabled at component level. Specifying the global SSL parameters alone will not have any effect.
  • If the global SSL parameters are specified at multiple levels, the priority is the following (from higher to lower):
    • component parameters in agent config
    • system properties
    • environment variables
  • If SSL is enabled for a component, but the SSL parameters are not specified in any of the ways described above, then
    • in case of keystores: configuration error
    • in case of truststores: the default truststore will be used (jssecacerts / cacerts in Oracle JDK)
  • The trustore password is optional in all cases. If not specified, then no integrity check will be performed on the truststore when it is opened by the JDK.

Source and sink batch sizes and channel transaction capacities

Sources and sinks can have a batch size parameter that determines the maximum number of events they process in one batch. This happens within a channel transaction that has an upper limit called transaction capacity. Batch size must be smaller than the channel’s transaction capacity. There is an explicit check to prevent incompatible settings. This check happens whenever the configuration is read.

Flume Sources

Avro Source

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Example of ipFilterRules

ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

Note that the first rule to match will apply as the example below shows from a client on the localhost

This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:” This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:

Thrift Source

Listens on Thrift port and receives events from external Thrift client streams. When paired with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. Thrift source can be configured to start in secure mode by enabling kerberos authentication. agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section)
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
kerberos false Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Warning

The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. For stronger reliability guarantees, consider the Spooling Directory Source, Taildir Source or direct integration with Flume via the SDK.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bash or Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. This allows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes, loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will be invoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. The JMS source provides configurable batch size, message selector, user/pass, and message to flume event converter. Note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on command line, or via FLUME_CLASSPATH variable in flume-env.sh.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified.
clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.
JMS message converter

The JMS source allows pluggable converters, though it’s likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the FlumeEvent.

BytesMessage:
Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GB of data per message.
TextMessage:
Text of message is converted to a byte array and copied to the body of the FlumeEvent. The default converter uses UTF-8 by default but this is configurable.
ObjectMessage:
Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream and the resulting array is copied to the body of the FlumeEvent.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
SSL and JMS Source

JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely the JMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMS server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.

The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki.

SSL transport / server authentication:

If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s own CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via the global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.

Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agent config file.

Client certificate authentication (two-way SSL):

JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).

The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parameters again. For more details about the global SSL setup, see the SSL/TLS support section.

The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password.

In case of client certificate authentication, it is not needed to specify the userName / passwordFile properties for the JMS Source in the Flume agent config file.

Please note:

There are no component level configuration parameters for JMS Source unlike in case of other components. No enable SSL flag either. SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence of the truststore / keystore.

Spooling Directory Source

This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.

Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:

  1. If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
  2. If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.

Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.

Property Name Default Description
channels  
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$ Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
ignorePattern ^$ Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
trackingPolicy rename The tracking policy defines how file processing is tracked. It can be “rename” or “tracker_dir”. This parameter is only effective if the deletePolicy is “never”. “rename” - After processing files they get renamed according to the fileSuffix parameter. “tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir. The new tracker file name is derived from the ingested one plus the fileSuffix.
consumeOrder oldest In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory.
pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -