content/releases/content/1.7.0/FlumeUserGuide.html [2926:4198]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
This sink streams events containing delimited text or JSON data directly into a Hive table or partition. Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to corresponding columns in the Hive table.
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hive |
hive.metastore | – | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | – | Hive database name |
hive.table | – | Hive table name |
hive.partition | – | Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21 |
hive.txnsPerBatchAsk | 100 | Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files. |
heartBeatInterval | 240 | (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats. |
autoCreatePartitions | true | Flume will automatically create the necessary Hive partitions to stream to |
batchSize | 15000 | Max number of events written to Hive in a single Hive transaction |
maxOpenConnections | 500 | Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed. |
callTimeout | 10000 | (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. |
serializer | Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON | |
roundUnit | minute | The unit of the round down value - second, minute or hour. |
roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time |
timeZone | Local Time | Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles. |
useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
Following serializers are provided for Hive sink:
JSON: Handles UTF8 encoded Json (strict syntax) events and requires no configration. Object names in the JSON are mapped directly to columns with the same name in the Hive table. Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of the Serde of the Hive table. This serializer requires HCatalog to be installed.
DELIMITED: Handles simple delimited textual events. Internally uses LazySimpleSerde but is independent of the Serde of the Hive table.
Name | Default | Description |
---|---|---|
serializer.delimiter | , | (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t” |
serializer.fieldnames | – | The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time, ip and message columns in the hive table. |
serializer.serdeSeparator | Ctrl-A | (Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of table columns, as the fields in incoming event body do not need to be reordered to match order of table columns. Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character |
The following are the escape sequences supported:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.
Example Hive table :
create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp header set to 11:54:34 AM, June 12, 2012 and ‘country’ header set to ‘india’ will evaluate to the partition (continent=’asia’,country=’india’,time=‘2012-06-12-11-50’. The serializer is configured to accept tab separated input containing three fields and to skip the second field.
Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold. This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be logger |
maxBytesToLog | 16 | Maximum number of bytes of the Event body to log |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be avro. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
reset-connection-interval | none | Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
compression-level | 6 | The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression |
ssl | false | Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”. |
trust-all-certs | false | If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection. |
truststore | – | The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
maxIoWorkers | 2 * the number of available processors in the machine | The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size.
Thrift sink can be configured to start in secure mode by enabling kerberos authentication. To communicate with a Thrift source started in secure mode, the Thrift sink should also operate in secure mode. client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be thrift. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
connection-reset-interval | none | Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
ssl | false | Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type” |
truststore | – | The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source. |
client-principal | —- | The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC. |
client-keytab | —- | The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC. |
server-principal | – | The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
The IRC sink takes messages from attached channel and relays those to configured IRC destinations. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be irc |
hostname | – | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | – | Nick name |
user | – | User name |
password | – | User password |
chan | – | channel |
name | ||
splitlines | – | (boolean) |
splitchars | n | line separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “\n”) |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
Stores events on the local filesystem. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be file_roll. |
sink.directory | – | The directory where files will be stored |
sink.pathManager | DEFAULT | The PathManager implementation to use. |
sink.pathManager.extension | – | The file extension if the default PathManager is used. |
sink.pathManager.prefix | – | A character string to add to the beginning of the file name if the default PathManager is used |
sink.rollInterval | 30 | Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer | TEXT | Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize | 100 |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Discards all events it receives from the channel. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be null. |
batchSize | 100 |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
This sink writes data to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.
The HBaseSink supports writing data to secure HBase. To write to secure HBase, the user the agent is running as must have write permissions to the table the sink is configured to write to. The principal and keytab to use to authenticate against the KDC can be specified in the configuration. The hbase-site.xml in the Flume agent’s classpath must have authentication set to kerberos (For details on how to do this, please refer to HBase documentation).
For convenience, two serializers are provided with Flume. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.
The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false | Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol”. |
serializer.* | – | Properties to be passed to the serializer. |
kerberosPrincipal | – | Kerberos user principal for accessing secure HBase |
kerberosKeytab | – | Kerberos keytab for accessing secure HBase |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
This sink writes data to HBase using an asynchronous model. A class implementing AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink uses the Asynchbase API to write to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be asynchbase |
table | – | The name of the table in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
columnFamily | – | The column family in Hbase to write to. |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false | Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
timeout | 60000 | The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction. |
serializer | org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer | |
serializer.* | – | Properties to be passed to the serializer. |
Note that this sink takes the Zookeeper Quorum and parent znode information in the configuration. Zookeeper Quorum and parent node configuration may be specified in the flume configuration file. Alternatively, these configuration values are taken from the first hbase-site.xml file in the classpath.
If these are not provided in the configuration, then the sink will read this information from the first hbase-site.xml file in the classpath.
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1
This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications.
This sink is well suited for use cases that stream raw data into HDFS (via the HdfsSink) and simultaneously extract, transform and load the same data into Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary heterogeneous raw data from disparate data sources and turn it into a data model that is useful to Search applications.
The ETL functionality is customizable using a morphline configuration file that defines a chain of transformation commands that pipe event records from one command to another.
Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline command is a bit like a Flume Interceptor. Morphlines can be embedded into Hadoop components such as Flume.
Commands to parse and transform a set of standard data formats such as log files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional data formats can be added as morphline plugins. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed.
Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava’s ArrayListMultimap, which is a ListMultimap). Note that a field can have multiple values and any two records need not use common field names.
This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. The commands can then act on this data.
Routing to a SolrCloud cluster is supported to improve scalability. Indexing load can be spread across a large number of MorphlineSolrSinks for improved scalability. Indexing load can be replicated across multiple MorphlineSolrSinks for high availability, for example using Flume features such as Load balancing Sink Processor. MorphlineInterceptor can also help to implement dynamic routing to multiple Solr collections (e.g. for multi-tenancy).
The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation.
The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink |
morphlineFile | – | The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
batchSize | 1000 | The maximum number of events to take per flume transaction. |
batchDurationMillis | 1000 | The maximum duration per flume transaction (ms). The transaction commits after this duration or when batchSize is exceeded, whichever comes first. |
handlerClass | org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl | The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler |
isProductionMode | false | This flag should be enabled for mission critical, large-scale online production systems that need to make progress without downtime when unrecoverable exceptions occur. Corrupt or malformed parser input data, parser bugs, and errors related to unknown Solr schema fields produce unrecoverable exceptions. |
recoverableExceptionClasses | org.apache.solr.client.solrj.SolrServerException | Comma separated list of recoverable exceptions that tend to be transient, in which case the corresponding task can be retried. Examples include network connection errors, timeouts, etc. When the production mode flag is set to true, the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries. |
isIgnoringRecoverableExceptions | false | This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable. This enables the sink to make progress and avoid retrying an event forever. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
This sink writes data to an elasticsearch cluster. By default, events will be written so that the Kibana graphical interface can display them - just as if logstash wrote them.
The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version of the JVM. SerializationExceptions will appear if this is incorrect. To select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent which is running the ElasticSearchSink should also match the JVM the target cluster is running down to the minor version.
Events will be written to a new index every day. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink will start writing to a new index at midnight UTC.
Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer by default. This behaviour can be overridden with the serializer parameter. This parameter accepts implementations of org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer or org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory. Implementing ElasticSearchEventSerializer is deprecated in favour of the more powerful ElasticSearchIndexRequestBuilderFactory.
The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink |
hostNames | – | Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used |
indexName | flume | The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
indexType | logs | The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
clusterName | elasticsearch | Name of the ElasticSearch cluster to connect to |
batchSize | 100 | Number of events to be written per txn. |
ttl | – | TTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information. |
serializer | org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer | The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. |
serializer.* | – | Properties to be passed to the serializer. |
Note
Header substitution is a handy to use the value of an event header to dynamically decide the indexName and indexType to use when storing the event. Caution should be used in using this feature as the event submitter now has control of the indexName and indexType. Furthermore, if the elasticsearch REST client is used then the event submitter has control of the URL path used.
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Experimental sink that writes events to a Kite Dataset. This sink will deserialize the body of each incoming event and store the resulting record in a Kite Dataset. It determines target Dataset by loading a dataset by URI.
The only supported serialization is avro, and the record schema must be passed in the event headers, using either flume.avro.schema.literal with the JSON schema representation or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported). This is compatible with the Log4jAppender flume client and the spooling directory source’s Avro deserializer using deserializer.schemaType = LITERAL.
Note 1: The flume.avro.schema.hash header is not supported. Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | Must be org.apache.flume.sink.kite.DatasetSink |
kite.dataset.uri | – | URI of the dataset to open |
kite.repo.uri | – | URI of the repository to open (deprecated; use kite.dataset.uri instead) |
kite.dataset.namespace | – | Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.dataset.name | – | Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.batchSize | 100 | Number of records to process in each batch |
kite.rollInterval | 30 | Maximum wait time (seconds) before data files are released |
kite.flushable.commitOnBatch | true | If true, the Flume transaction will be commited and the writer will be flushed on each batch of kite.batchSize records. This setting only applies to flushable datasets. When true, it’s possible for temp files with commited data to be left in the dataset directory. These files need to be recovered by hand for the data to be visible to DatasetReaders. |
kite.syncable.syncOnBatch | true | Controls whether the sink will also sync data when committing the transaction. This setting only applies to syncable datasets. Syncing gaurentees that data will be written on stable storage on the remote system while flushing only gaurentees that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch property is set to false, this property must also be set to false. |
kite.entityParser | avro | Parser that turns Flume Events into Kite entities. Valid values are avro and the fully-qualified class name of an implementation of the EntityParser.Builder interface. |
kite.failurePolicy | retry | Policy that handles non-recoverable errors such as a missing Schema in the Event header. The default value, retry, will fail the current batch and try again which matches the old behavior. Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset, and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface. |
kite.error.dataset.uri | – | URI of the dataset where failed events are saved when kite.failurePolicy is set to save. Required when the kite.failurePolicy is set to save. |
auth.kerberosPrincipal | – | Kerberos user principal for secure authentication to HDFS |
auth.kerberosKeytab | – | Kerberos keytab location (local FS) for the principal |
auth.proxyUser | – | The effective user for HDFS actions, if different from the kerberos principal |
This is a Flume Sink implementation that can publish data to a Kafka topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming through various Flume sources. This currently supports Kafka 0.9.x series of releases.
This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
Required properties are marked in bold font.
hdfs.callTimeout | 10000 | Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. |
hdfs.threadsPoolSize | 10 | Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize | 1 | Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal | – | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | – | Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser | ||
hdfs.round | false | Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit | second | The unit of the round down value - second, minute or hour. |
hdfs.timeZone | Local Time | Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
hdfs.useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.closeTries | 0 | Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. |
hdfs.retryInterval | 180 | Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension. |
serializer | TEXT | Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface. |
serializer.* |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
This sink streams events containing delimited text or JSON data directly into a Hive table or partition. Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to corresponding columns in the Hive table.
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hive |
hive.metastore | – | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | – | Hive database name |
hive.table | – | Hive table name |
hive.partition | – | Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21 |
hive.txnsPerBatchAsk | 100 | Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files. |
heartBeatInterval | 240 | (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats. |
autoCreatePartitions | true | Flume will automatically create the necessary Hive partitions to stream to |
batchSize | 15000 | Max number of events written to Hive in a single Hive transaction |
maxOpenConnections | 500 | Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed. |
callTimeout | 10000 | (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. |
serializer | Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON | |
roundUnit | minute | The unit of the round down value - second, minute or hour. |
roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time |
timeZone | Local Time | Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles. |
useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
Following serializers are provided for Hive sink:
JSON: Handles UTF8 encoded Json (strict syntax) events and requires no configration. Object names in the JSON are mapped directly to columns with the same name in the Hive table. Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of the Serde of the Hive table. This serializer requires HCatalog to be installed.
DELIMITED: Handles simple delimited textual events. Internally uses LazySimpleSerde but is independent of the Serde of the Hive table.
Name | Default | Description |
---|---|---|
serializer.delimiter | , | (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t” |
serializer.fieldnames | – | The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time, ip and message columns in the hive table. |
serializer.serdeSeparator | Ctrl-A | (Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of table columns, as the fields in incoming event body do not need to be reordered to match order of table columns. Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character |
The following are the escape sequences supported:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.
Example Hive table :
create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp header set to 11:54:34 AM, June 12, 2012 and ‘country’ header set to ‘india’ will evaluate to the partition (continent=’asia’,country=’india’,time=‘2012-06-12-11-50’. The serializer is configured to accept tab separated input containing three fields and to skip the second field.
Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold. This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be logger |
maxBytesToLog | 16 | Maximum number of bytes of the Event body to log |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be avro. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
reset-connection-interval | none | Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
compression-level | 6 | The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression |
ssl | false | Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”. |
trust-all-certs | false | If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection. |
truststore | – | The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
maxIoWorkers | 2 * the number of available processors in the machine | The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size.
Thrift sink can be configured to start in secure mode by enabling kerberos authentication. To communicate with a Thrift source started in secure mode, the Thrift sink should also operate in secure mode. client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be thrift. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
connection-reset-interval | none | Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
ssl | false | Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type” |
truststore | – | The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source. |
client-principal | —- | The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC. |
client-keytab | —- | The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC. |
server-principal | – | The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
The IRC sink takes messages from attached channel and relays those to configured IRC destinations. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be irc |
hostname | – | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | – | Nick name |
user | – | User name |
password | – | User password |
chan | – | channel |
name | ||
splitlines | – | (boolean) |
splitchars | n | line separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “\n”) |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
Stores events on the local filesystem. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be file_roll. |
sink.directory | – | The directory where files will be stored |
sink.pathManager | DEFAULT | The PathManager implementation to use. |
sink.pathManager.extension | – | The file extension if the default PathManager is used. |
sink.pathManager.prefix | – | A character string to add to the beginning of the file name if the default PathManager is used |
sink.rollInterval | 30 | Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer | TEXT | Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize | 100 |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Discards all events it receives from the channel. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be null. |
batchSize | 100 |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
This sink writes data to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.
The HBaseSink supports writing data to secure HBase. To write to secure HBase, the user the agent is running as must have write permissions to the table the sink is configured to write to. The principal and keytab to use to authenticate against the KDC can be specified in the configuration. The hbase-site.xml in the Flume agent’s classpath must have authentication set to kerberos (For details on how to do this, please refer to HBase documentation).
For convenience, two serializers are provided with Flume. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.
The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false | Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol”. |
serializer.* | – | Properties to be passed to the serializer. |
kerberosPrincipal | – | Kerberos user principal for accessing secure HBase |
kerberosKeytab | – | Kerberos keytab for accessing secure HBase |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
This sink writes data to HBase using an asynchronous model. A class implementing AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink uses the Asynchbase API to write to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be asynchbase |
table | – | The name of the table in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
columnFamily | – | The column family in Hbase to write to. |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false | Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
timeout | 60000 | The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction. |
serializer | org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer | |
serializer.* | – | Properties to be passed to the serializer. |
Note that this sink takes the Zookeeper Quorum and parent znode information in the configuration. Zookeeper Quorum and parent node configuration may be specified in the flume configuration file. Alternatively, these configuration values are taken from the first hbase-site.xml file in the classpath.
If these are not provided in the configuration, then the sink will read this information from the first hbase-site.xml file in the classpath.
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1
This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications.
This sink is well suited for use cases that stream raw data into HDFS (via the HdfsSink) and simultaneously extract, transform and load the same data into Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary heterogeneous raw data from disparate data sources and turn it into a data model that is useful to Search applications.
The ETL functionality is customizable using a morphline configuration file that defines a chain of transformation commands that pipe event records from one command to another.
Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline command is a bit like a Flume Interceptor. Morphlines can be embedded into Hadoop components such as Flume.
Commands to parse and transform a set of standard data formats such as log files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional data formats can be added as morphline plugins. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed.
Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava’s ArrayListMultimap, which is a ListMultimap). Note that a field can have multiple values and any two records need not use common field names.
This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. The commands can then act on this data.
Routing to a SolrCloud cluster is supported to improve scalability. Indexing load can be spread across a large number of MorphlineSolrSinks for improved scalability. Indexing load can be replicated across multiple MorphlineSolrSinks for high availability, for example using Flume features such as Load balancing Sink Processor. MorphlineInterceptor can also help to implement dynamic routing to multiple Solr collections (e.g. for multi-tenancy).
The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation.
The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink |
morphlineFile | – | The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
batchSize | 1000 | The maximum number of events to take per flume transaction. |
batchDurationMillis | 1000 | The maximum duration per flume transaction (ms). The transaction commits after this duration or when batchSize is exceeded, whichever comes first. |
handlerClass | org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl | The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler |
isProductionMode | false | This flag should be enabled for mission critical, large-scale online production systems that need to make progress without downtime when unrecoverable exceptions occur. Corrupt or malformed parser input data, parser bugs, and errors related to unknown Solr schema fields produce unrecoverable exceptions. |
recoverableExceptionClasses | org.apache.solr.client.solrj.SolrServerException | Comma separated list of recoverable exceptions that tend to be transient, in which case the corresponding task can be retried. Examples include network connection errors, timeouts, etc. When the production mode flag is set to true, the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries. |
isIgnoringRecoverableExceptions | false | This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable. This enables the sink to make progress and avoid retrying an event forever. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
This sink writes data to an elasticsearch cluster. By default, events will be written so that the Kibana graphical interface can display them - just as if logstash wrote them.
The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version of the JVM. SerializationExceptions will appear if this is incorrect. To select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent which is running the ElasticSearchSink should also match the JVM the target cluster is running down to the minor version.
Events will be written to a new index every day. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink will start writing to a new index at midnight UTC.
Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer by default. This behaviour can be overridden with the serializer parameter. This parameter accepts implementations of org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer or org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory. Implementing ElasticSearchEventSerializer is deprecated in favour of the more powerful ElasticSearchIndexRequestBuilderFactory.
The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink |
hostNames | – | Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used |
indexName | flume | The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
indexType | logs | The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
clusterName | elasticsearch | Name of the ElasticSearch cluster to connect to |
batchSize | 100 | Number of events to be written per txn. |
ttl | – | TTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information. |
serializer | org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer | The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. |
serializer.* | – | Properties to be passed to the serializer. |
Note
Header substitution is a handy to use the value of an event header to dynamically decide the indexName and indexType to use when storing the event. Caution should be used in using this feature as the event submitter now has control of the indexName and indexType. Furthermore, if the elasticsearch REST client is used then the event submitter has control of the URL path used.
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Experimental sink that writes events to a Kite Dataset. This sink will deserialize the body of each incoming event and store the resulting record in a Kite Dataset. It determines target Dataset by loading a dataset by URI.
The only supported serialization is avro, and the record schema must be passed in the event headers, using either flume.avro.schema.literal with the JSON schema representation or flume.avro.schema.url with a URL where the schema may be found (hdfs:/... URIs are supported). This is compatible with the Log4jAppender flume client and the spooling directory source’s Avro deserializer using deserializer.schemaType = LITERAL.
Note 1: The flume.avro.schema.hash header is not supported. Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible.
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | Must be org.apache.flume.sink.kite.DatasetSink |
kite.dataset.uri | – | URI of the dataset to open |
kite.repo.uri | – | URI of the repository to open (deprecated; use kite.dataset.uri instead) |
kite.dataset.namespace | – | Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.dataset.name | – | Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.batchSize | 100 | Number of records to process in each batch |
kite.rollInterval | 30 | Maximum wait time (seconds) before data files are released |
kite.flushable.commitOnBatch | true | If true, the Flume transaction will be commited and the writer will be flushed on each batch of kite.batchSize records. This setting only applies to flushable datasets. When true, it’s possible for temp files with commited data to be left in the dataset directory. These files need to be recovered by hand for the data to be visible to DatasetReaders. |
kite.syncable.syncOnBatch | true | Controls whether the sink will also sync data when committing the transaction. This setting only applies to syncable datasets. Syncing gaurentees that data will be written on stable storage on the remote system while flushing only gaurentees that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch property is set to false, this property must also be set to false. |
kite.entityParser | avro | Parser that turns Flume Events into Kite entities. Valid values are avro and the fully-qualified class name of an implementation of the EntityParser.Builder interface. |
kite.failurePolicy | retry | Policy that handles non-recoverable errors such as a missing Schema in the Event header. The default value, retry, will fail the current batch and try again which matches the old behavior. Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset, and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface. |
kite.error.dataset.uri | – | URI of the dataset where failed events are saved when kite.failurePolicy is set to save. Required when the kite.failurePolicy is set to save. |
auth.kerberosPrincipal | – | Kerberos user principal for secure authentication to HDFS |
auth.kerberosKeytab | – | Kerberos keytab location (local FS) for the principal |
auth.proxyUser | – | The effective user for HDFS actions, if different from the kerberos principal |
This is a Flume Sink implementation that can publish data to a Kafka topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming through various Flume sources. This currently supports Kafka 0.9.x series of releases.
This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
Required properties are marked in bold font.