content/FlumeUserGuide.html [3290:4923]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.

An example http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.

Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

Stress Source

StressSource is an internal load-generating source implementation which is very useful for stress tests. It allows User to configure the size of Event payload, with empty headers. User can configure total number of events to be sent as well maximum number of Successful Event to be delivered.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch
maxEventsPerSecond 0 When set to an integer greater than zero, enforces a rate limiter onto the source.

Example for agent named a1:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4 agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume 1.0 format, and stores them in the connected channel. The 0.9.4 event properties like timestamp, pri, host, nanos, etc get converted to 1.x event header attributes. The legacy source supports both Avro and Thrift RPC connections. To use this bridge between two Flume versions, you need to start a Flume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agent should have the agent Sink pointing to the host/port of the 1.x agent.

Note

The reliability semantics of Flume 1.x are different from that of Flume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not be supported by the legacy source. The only supported 0.9.x mode is the best effort, though the reliability setting of the 1.x flow will be applicable to the events once they are saved into the Flume 1.x channel by the legacy source.

Required properties are in bold.

Avro Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Thrift Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Custom Source

A custom source is your own implementation of the Source interface. A custom source’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom source is its FQCN.

Property Name Default Description
channels  
type The component type name, needs to be your FQCN
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

Scribe Source

Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. For deployment of Scribe please follow the guide from Facebook. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port 1499 Port that Scribe should be connected
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
workerThreads 5 Handing threads number in Thrift
selector.type    
selector.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

Flume Sinks

HDFS Sink

This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.

The following are the escape sequences supported:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability to obtain the hostname, which may fail in some networking environments.

The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.

Note

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

Name Default Description
channel  
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.emptyInUseSuffix false If false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead.
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormat Writable Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
hdfs.proxyUser    
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit second The unit of the round down value - second, minute or hour.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
serializer.*    

Deprecated Properties

Name Default Description
hdfs.callTimeout 30000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

Hive Sink

This sink streams events containing delimited text or JSON data directly into a Hive table or partition. Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to corresponding columns in the Hive table.

Name Default Description
channel  
type The component type name, needs to be hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive database name
hive.table Hive table name
hive.partition Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21
hive.txnsPerBatchAsk 100 Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.
heartBeatInterval 240 (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.
autoCreatePartitions true Flume will automatically create the necessary Hive partitions to stream to
batchSize 15000 Max number of events written to Hive in a single Hive transaction
maxOpenConnections 500 Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout 10000 (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer   Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON
roundUnit minute The unit of the round down value - second, minute or hour.
roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZone Local Time Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Following serializers are provided for Hive sink:

JSON: Handles UTF8 encoded Json (strict syntax) events and requires no configration. Object names in the JSON are mapped directly to columns with the same name in the Hive table. Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of the Serde of the Hive table. This serializer requires HCatalog to be installed.

DELIMITED: Handles simple delimited textual events. Internally uses LazySimpleSerde but is independent of the Serde of the Hive table.

Name Default Description
serializer.delimiter , (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t”
serializer.fieldnames The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time, ip and message columns in the hive table.
serializer.serdeSeparator Ctrl-A (Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of table columns, as the fields in incoming event body do not need to be reordered to match order of table columns. Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character

The following are the escape sequences supported:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

Note

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

Example Hive table :

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp header set to 11:54:34 AM, June 12, 2012 and ‘country’ header set to ‘india’ will evaluate to the partition (continent=’asia’,country=’india’,time=‘2012-06-12-11-50’. The serializer is configured to accept tab separated input containing three fields and to skip the second field.

Logger Sink

Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold. This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

Property Name Default Description
channel  
type The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be avro.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.
trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.
truststore The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-password The password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size.

Thrift sink can be configured to start in secure mode by enabling kerberos authentication. To communicate with a Thrift source started in secure mode, the Thrift sink should also operate in secure mode. client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be thrift.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
connection-reset-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststore The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-password The password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude
kerberos false Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source.
client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab —- The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principal The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

IRC Sink

The IRC sink takes messages from attached channel and relays those to configured IRC destinations. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be irc
hostname The hostname or IP address to connect to
port 6667 The port number of remote host to connect
nick Nick name
user User name
password User password
chan channel
name    
splitlines (boolean)
splitchars n line separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “\n”)

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

File Roll Sink

Stores events on the local filesystem. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be file_roll.
sink.directory The directory where files will be stored
sink.pathManager DEFAULT The PathManager implementation to use.
sink.pathManager.extension The file extension if the default PathManager is used.
sink.pathManager.prefix A character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
sink.batchSize 100  

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

Discards all events it receives from the channel. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be null.
batchSize 100  

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

This sink writes data to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.

The HBaseSink supports writing data to secure HBase. To write to secure HBase, the user the agent is running as must have write permissions to the table the sink is configured to write to. The principal and keytab to use to authenticate against the KDC can be specified in the configuration. The hbase-site.xml in the Flume agent’s classpath must have authentication set to kerberos (For details on how to do this, please refer to HBase documentation).

For convenience, two serializers are provided with Flume. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.

The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = “iCol”, payload column = “pCol”.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HBase2Sink

HBase2Sink is the equivalent of HBaseSink for HBase version 2. The provided functionality and the configuration parameters are the same as in case of HBaseSink (except the hbase2 tag in the sink type and the package/class names).

The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be hbase2
table The name of the table in HBase to write to.
columnFamily The column family in HBase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
serializer org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer Default increment column = “iCol”, payload column = “pCol”.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
AsyncHBaseSink

This sink writes data to HBase using an asynchronous model. A class implementing AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink uses the Asynchbase API to write to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. AsyncHBaseSink can only be used with HBase 1.x. The async client library used by AsyncHBaseSink is not available for HBase 2. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be asynchbase
table The name of the table in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamily The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer  
serializer.* Properties to be passed to the serializer.
async.* Properties to be passed to asyncHbase library. These properties have precedence over the old zookeeperQuorum and znodeParent values. You can find the list of the available properties at - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.10.1/FlumeUserGuide.html [3032:4665]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.

An example http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.

Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

Stress Source

StressSource is an internal load-generating source implementation which is very useful for stress tests. It allows User to configure the size of Event payload, with empty headers. User can configure total number of events to be sent as well maximum number of Successful Event to be delivered.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch
maxEventsPerSecond 0 When set to an integer greater than zero, enforces a rate limiter onto the source.

Example for agent named a1:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4 agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume 1.0 format, and stores them in the connected channel. The 0.9.4 event properties like timestamp, pri, host, nanos, etc get converted to 1.x event header attributes. The legacy source supports both Avro and Thrift RPC connections. To use this bridge between two Flume versions, you need to start a Flume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agent should have the agent Sink pointing to the host/port of the 1.x agent.

Note

The reliability semantics of Flume 1.x are different from that of Flume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not be supported by the legacy source. The only supported 0.9.x mode is the best effort, though the reliability setting of the 1.x flow will be applicable to the events once they are saved into the Flume 1.x channel by the legacy source.

Required properties are in bold.

Avro Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Thrift Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Custom Source

A custom source is your own implementation of the Source interface. A custom source’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom source is its FQCN.

Property Name Default Description
channels  
type The component type name, needs to be your FQCN
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

Scribe Source

Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. For deployment of Scribe please follow the guide from Facebook. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port 1499 Port that Scribe should be connected
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
workerThreads 5 Handing threads number in Thrift
selector.type    
selector.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

Flume Sinks

HDFS Sink

This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.

The following are the escape sequences supported:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability to obtain the hostname, which may fail in some networking environments.

The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.

Note

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

Name Default Description
channel  
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.emptyInUseSuffix false If false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead.
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormat Writable Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
hdfs.proxyUser    
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit second The unit of the round down value - second, minute or hour.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
serializer.*    

Deprecated Properties

Name Default Description
hdfs.callTimeout 30000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

Hive Sink

This sink streams events containing delimited text or JSON data directly into a Hive table or partition. Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to corresponding columns in the Hive table.

Name Default Description
channel  
type The component type name, needs to be hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive database name
hive.table Hive table name
hive.partition Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21
hive.txnsPerBatchAsk 100 Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.
heartBeatInterval 240 (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.
autoCreatePartitions true Flume will automatically create the necessary Hive partitions to stream to
batchSize 15000 Max number of events written to Hive in a single Hive transaction
maxOpenConnections 500 Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout 10000 (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer   Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON
roundUnit minute The unit of the round down value - second, minute or hour.
roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZone Local Time Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Following serializers are provided for Hive sink:

JSON: Handles UTF8 encoded Json (strict syntax) events and requires no configration. Object names in the JSON are mapped directly to columns with the same name in the Hive table. Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of the Serde of the Hive table. This serializer requires HCatalog to be installed.

DELIMITED: Handles simple delimited textual events. Internally uses LazySimpleSerde but is independent of the Serde of the Hive table.

Name Default Description
serializer.delimiter , (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t”
serializer.fieldnames The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time, ip and message columns in the hive table.
serializer.serdeSeparator Ctrl-A (Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of table columns, as the fields in incoming event body do not need to be reordered to match order of table columns. Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character

The following are the escape sequences supported:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

Note

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

Example Hive table :

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp header set to 11:54:34 AM, June 12, 2012 and ‘country’ header set to ‘india’ will evaluate to the partition (continent=’asia’,country=’india’,time=‘2012-06-12-11-50’. The serializer is configured to accept tab separated input containing three fields and to skip the second field.

Logger Sink

Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold. This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

Property Name Default Description
channel  
type The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be avro.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.
trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.
truststore The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-password The password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size.

Thrift sink can be configured to start in secure mode by enabling kerberos authentication. To communicate with a Thrift source started in secure mode, the Thrift sink should also operate in secure mode. client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be thrift.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
connection-reset-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststore The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-password The password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude
kerberos false Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source.
client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab —- The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principal The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

IRC Sink

The IRC sink takes messages from attached channel and relays those to configured IRC destinations. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be irc
hostname The hostname or IP address to connect to
port 6667 The port number of remote host to connect
nick Nick name
user User name
password User password
chan channel
name    
splitlines (boolean)
splitchars n line separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “\n”)

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

File Roll Sink

Stores events on the local filesystem. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be file_roll.
sink.directory The directory where files will be stored
sink.pathManager DEFAULT The PathManager implementation to use.
sink.pathManager.extension The file extension if the default PathManager is used.
sink.pathManager.prefix A character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
sink.batchSize 100  

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

Discards all events it receives from the channel. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be null.
batchSize 100  

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

This sink writes data to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.

The HBaseSink supports writing data to secure HBase. To write to secure HBase, the user the agent is running as must have write permissions to the table the sink is configured to write to. The principal and keytab to use to authenticate against the KDC can be specified in the configuration. The hbase-site.xml in the Flume agent’s classpath must have authentication set to kerberos (For details on how to do this, please refer to HBase documentation).

For convenience, two serializers are provided with Flume. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.

The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = “iCol”, payload column = “pCol”.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HBase2Sink

HBase2Sink is the equivalent of HBaseSink for HBase version 2. The provided functionality and the configuration parameters are the same as in case of HBaseSink (except the hbase2 tag in the sink type and the package/class names).

The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.

Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be hbase2
table The name of the table in HBase to write to.
columnFamily The column family in HBase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
serializer org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer Default increment column = “iCol”, payload column = “pCol”.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
AsyncHBaseSink

This sink writes data to HBase using an asynchronous model. A class implementing AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink uses the Asynchbase API to write to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. AsyncHBaseSink can only be used with HBase 1.x. The async client library used by AsyncHBaseSink is not available for HBase 2. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in bold.

Property Name Default Description
channel  
type The component type name, needs to be asynchbase
table The name of the table in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamily The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells.
timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer  
serializer.* Properties to be passed to the serializer.
async.* Properties to be passed to asyncHbase library. These properties have precedence over the old zookeeperQuorum and znodeParent values. You can find the list of the available properties at - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -