content/releases/content/1.10.0/FlumeUserGuide.html [2185:3688]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - For more details about the global SSL setup, see the SSL/TLS support section.
Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties
a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:
If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.
# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer keystores:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | true | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
As per the original Netcat (TCP) source, this source that listens on a given port and turns each line of text into an event and sent via the connected channel. Acts like nc -u -k -l [host] [port].
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcatudp |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
remoteAddressHeader | – | |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
A simple sequence generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents. Retries when it can’t send events to the channel. Useful mainly for testing. During retries it keeps the body of the retried messages the same as before so that the number of unique events - after de-duplication at destination - is expected to be equal to the specified totalEvents. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be seq |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
batchSize | 1 | Number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | Number of unique events sent by the source. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Reads syslog data and generate Flume events. The UDP source treats an entire message as a single event. The TCP sources create a new event for each string of characters separated by a newline (‘n’).
Required properties are in bold.
The original, tried-and-true syslog TCP source.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
eventSize | 2500 | Maximum size of a single event line, in bytes |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
This is a newer, faster, multi-port capable version of the Syslog TCP source. Note that the ports configuration setting has replaced port. Multi-port capability means that it can listen on many ports at once in an efficient manner. This source uses the Apache Mina library to do that. Provides support for RFC-3164 and many common RFC-5424 formatted messages. Also provides the capability to configure the character set used on a per-port basis.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
eventSize | 2500 | Maximum size of a single event line, in bytes. |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
charset.default | UTF-8 | Default character set used while parsing syslog events into strings. |
charset.port.<port> | – | Character set is configurable on a per-port basis. |
batchSize | 100 | Maximum number of events to attempt to process per request loop. Using the default is usually fine. |
readBufferSize | 1024 | Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine. |
numProcessors | (auto-detected) | Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable. |
selector.type | replicating | replicating, multiplexing, or custom |
selector.* | – | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors. |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a multiport syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogudp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
keepFields | false | Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
For example, a syslog UDP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
A source which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable “handler” which must implement the HTTPSourceHandler interface. This handler takes a HttpServletRequest and returns a list of flume events. All events handled from one Http request are committed to the channel in one transaction, thus allowing for increased efficiency on channels like the file channel. If the handler throws an exception, this source will return a HTTP status of 400. If the channel is full, or the source is unable to append events to the channel, the source will return a HTTP 503 - Temporarily unavailable status.
All events sent in one post request are considered to be one batch and inserted into the channel in one transaction.
This source is based on Jetty 9.4 and offers the ability to set additional Jetty-specific parameters which will be passed directly to the Jetty components.
Property Name | Default | Description |
---|---|---|
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
handler.* | – | Config parameters for the handler |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
ssl | false | Set the property true, to enable SSL. HTTP Source does not support SSLv3. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
keystore | Location of the keystore including keystore file name. If SSL is enabled but the keystore is not specified here, then the global keystore will be used (if defined, otherwise configuration error). | |
keystore-password | Keystore password. If SSL is enabled but the keystore password is not specified here, then the global keystore password will be used (if defined, otherwise configuration error). | |
keystore-type | JKS | Keystore type. This can be “JKS” or “PKCS12”. |
QueuedThreadPool.* | Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool. N.B. QueuedThreadPool will only be used if at least one property of this class is set. | |
HttpConfiguration.* | Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration | |
SslContextFactory.* | Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (only applicable when ssl is set to true). | |
ServerConnector.* | Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector |
Deprecated Properties
Property Name | Default | Description |
---|---|---|
keystorePassword | – | Use keystore-password. Deprecated value will be overwritten with the new one. |
excludeProtocols | SSLv3 | Use exclude-protocols. Deprecated value will be overwritten with the new one. |
enableSSL | false | Use ssl. Deprecated value will be overwritten with the new one. |
N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes (QueuedThreadPool, HttpConfiguration, SslContextFactory and ServerConnector).
When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.
An example http source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).
One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.
Property Name | Default | Description |
---|---|---|
handler | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
StressSource is an internal load-generating source implementation which is very useful for stress tests. It allows User to configure the size of Event payload, with empty headers. User can configure total number of events to be sent as well maximum number of Successful Event to be delivered.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.StressSource |
size | 500 | Payload size of each Event. Unit:byte |
maxTotalEvents | -1 | Maximum number of Events to be sent |
maxSuccessfulEvents | -1 | Maximum number of Events successfully sent |
batchSize | 1 | Number of Events to be sent in one batch |
maxEventsPerSecond | 0 | When set to an integer greater than zero, enforces a rate limiter onto the source. |
Example for agent named a1:
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4 agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume 1.0 format, and stores them in the connected channel. The 0.9.4 event properties like timestamp, pri, host, nanos, etc get converted to 1.x event header attributes. The legacy source supports both Avro and Thrift RPC connections. To use this bridge between two Flume versions, you need to start a Flume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agent should have the agent Sink pointing to the host/port of the 1.x agent.
Note
The reliability semantics of Flume 1.x are different from that of Flume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not be supported by the legacy source. The only supported 0.9.x mode is the best effort, though the reliability setting of the 1.x flow will be applicable to the events once they are saved into the Flume 1.x channel by the legacy source.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
A custom source is your own implementation of the Source interface. A custom source’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom source is its FQCN.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be your FQCN |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. For deployment of Scribe please follow the guide from Facebook. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
selector.type | ||
selector.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.
The following are the escape sequences supported:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%e | day of month without padding (1) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%n | month without padding (1..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
%[localhost] | Substitute the hostname of the host where the agent is running |
%[IP] | Substitute the IP address of the host where the agent is running |
%[FQDN] | Substitute the canonical hostname of the host where the agent is running |
Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability to obtain the hostname, which may fail in some networking environments.
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hdfs |
hdfs.path | – | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix | – | Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.inUsePrefix | – | Prefix that is used for temporal files that flume actively writes into |
hdfs.inUseSuffix | .tmp | Suffix that is used for temporal files that flume actively writes into |
hdfs.emptyInUseSuffix | false | If false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead. |
hdfs.rollInterval | 30 | Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize | 1024 | File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount | 10 | Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout | 0 | Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize | 100 | number of events written to file before it is flushed to HDFS |
hdfs.codeC | – | Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy |
hdfs.fileType | SequenceFile | File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles | 5000 | Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.minBlockReplicas | – | Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. |
hdfs.writeFormat | Writable | Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive. |
hdfs.threadsPoolSize | 10 | Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize | 1 | Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal | – | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | – | Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser | ||
hdfs.round | false | Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit | second | The unit of the round down value - second, minute or hour. |
hdfs.timeZone | Local Time | Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
hdfs.useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.closeTries | 0 | Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. |
hdfs.retryInterval | 180 | Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension. |
serializer | TEXT | Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface. |
serializer.* |
Deprecated Properties
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - content/releases/content/1.9.0/FlumeUserGuide.html [2108:3611]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - For more details about the global SSL setup, see the SSL/TLS support section.Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties
a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:
If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.
# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer keystores:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | true | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
As per the original Netcat (TCP) source, this source that listens on a given port and turns each line of text into an event and sent via the connected channel. Acts like nc -u -k -l [host] [port].
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcatudp |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
remoteAddressHeader | – | |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
A simple sequence generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents. Retries when it can’t send events to the channel. Useful mainly for testing. During retries it keeps the body of the retried messages the same as before so that the number of unique events - after de-duplication at destination - is expected to be equal to the specified totalEvents. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be seq |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
batchSize | 1 | Number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | Number of unique events sent by the source. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Reads syslog data and generate Flume events. The UDP source treats an entire message as a single event. The TCP sources create a new event for each string of characters separated by a newline (‘n’).
Required properties are in bold.
The original, tried-and-true syslog TCP source.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
eventSize | 2500 | Maximum size of a single event line, in bytes |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
This is a newer, faster, multi-port capable version of the Syslog TCP source. Note that the ports configuration setting has replaced port. Multi-port capability means that it can listen on many ports at once in an efficient manner. This source uses the Apache Mina library to do that. Provides support for RFC-3164 and many common RFC-5424 formatted messages. Also provides the capability to configure the character set used on a per-port basis.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
eventSize | 2500 | Maximum size of a single event line, in bytes. |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
charset.default | UTF-8 | Default character set used while parsing syslog events into strings. |
charset.port.<port> | – | Character set is configurable on a per-port basis. |
batchSize | 100 | Maximum number of events to attempt to process per request loop. Using the default is usually fine. |
readBufferSize | 1024 | Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine. |
numProcessors | (auto-detected) | Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable. |
selector.type | replicating | replicating, multiplexing, or custom |
selector.* | – | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors. |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a multiport syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogudp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
keepFields | false | Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event. |
clientIPHeader | – | If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
For example, a syslog UDP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
A source which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable “handler” which must implement the HTTPSourceHandler interface. This handler takes a HttpServletRequest and returns a list of flume events. All events handled from one Http request are committed to the channel in one transaction, thus allowing for increased efficiency on channels like the file channel. If the handler throws an exception, this source will return a HTTP status of 400. If the channel is full, or the source is unable to append events to the channel, the source will return a HTTP 503 - Temporarily unavailable status.
All events sent in one post request are considered to be one batch and inserted into the channel in one transaction.
This source is based on Jetty 9.4 and offers the ability to set additional Jetty-specific parameters which will be passed directly to the Jetty components.
Property Name | Default | Description |
---|---|---|
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
handler.* | – | Config parameters for the handler |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
ssl | false | Set the property true, to enable SSL. HTTP Source does not support SSLv3. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
keystore | Location of the keystore including keystore file name. If SSL is enabled but the keystore is not specified here, then the global keystore will be used (if defined, otherwise configuration error). | |
keystore-password | Keystore password. If SSL is enabled but the keystore password is not specified here, then the global keystore password will be used (if defined, otherwise configuration error). | |
keystore-type | JKS | Keystore type. This can be “JKS” or “PKCS12”. |
QueuedThreadPool.* | Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool. N.B. QueuedThreadPool will only be used if at least one property of this class is set. | |
HttpConfiguration.* | Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration | |
SslContextFactory.* | Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (only applicable when ssl is set to true). | |
ServerConnector.* | Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector |
Deprecated Properties
Property Name | Default | Description |
---|---|---|
keystorePassword | – | Use keystore-password. Deprecated value will be overwritten with the new one. |
excludeProtocols | SSLv3 | Use exclude-protocols. Deprecated value will be overwritten with the new one. |
enableSSL | false | Use ssl. Deprecated value will be overwritten with the new one. |
N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes (QueuedThreadPool, HttpConfiguration, SslContextFactory and ServerConnector).
When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.
An example http source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).
One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.
Property Name | Default | Description |
---|---|---|
handler | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
StressSource is an internal load-generating source implementation which is very useful for stress tests. It allows User to configure the size of Event payload, with empty headers. User can configure total number of events to be sent as well maximum number of Successful Event to be delivered.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.StressSource |
size | 500 | Payload size of each Event. Unit:byte |
maxTotalEvents | -1 | Maximum number of Events to be sent |
maxSuccessfulEvents | -1 | Maximum number of Events successfully sent |
batchSize | 1 | Number of Events to be sent in one batch |
maxEventsPerSecond | 0 | When set to an integer greater than zero, enforces a rate limiter onto the source. |
Example for agent named a1:
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4 agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume 1.0 format, and stores them in the connected channel. The 0.9.4 event properties like timestamp, pri, host, nanos, etc get converted to 1.x event header attributes. The legacy source supports both Avro and Thrift RPC connections. To use this bridge between two Flume versions, you need to start a Flume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agent should have the agent Sink pointing to the host/port of the 1.x agent.
Note
The reliability semantics of Flume 1.x are different from that of Flume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not be supported by the legacy source. The only supported 0.9.x mode is the best effort, though the reliability setting of the 1.x flow will be applicable to the events once they are saved into the Flume 1.x channel by the legacy source.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
A custom source is your own implementation of the Source interface. A custom source’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom source is its FQCN.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be your FQCN |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. For deployment of Scribe please follow the guide from Facebook. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
selector.type | ||
selector.* |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.
The following are the escape sequences supported:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%e | day of month without padding (1) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%n | month without padding (1..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
%[localhost] | Substitute the hostname of the host where the agent is running |
%[IP] | Substitute the IP address of the host where the agent is running |
%[FQDN] | Substitute the canonical hostname of the host where the agent is running |
Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability to obtain the hostname, which may fail in some networking environments.
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hdfs |
hdfs.path | – | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix | – | Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.inUsePrefix | – | Prefix that is used for temporal files that flume actively writes into |
hdfs.inUseSuffix | .tmp | Suffix that is used for temporal files that flume actively writes into |
hdfs.emptyInUseSuffix | false | If false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead. |
hdfs.rollInterval | 30 | Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize | 1024 | File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount | 10 | Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout | 0 | Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize | 100 | number of events written to file before it is flushed to HDFS |
hdfs.codeC | – | Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy |
hdfs.fileType | SequenceFile | File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles | 5000 | Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.minBlockReplicas | – | Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. |
hdfs.writeFormat | Writable | Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive. |
hdfs.threadsPoolSize | 10 | Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize | 1 | Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal | – | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | – | Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser | ||
hdfs.round | false | Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit | second | The unit of the round down value - second, minute or hour. |
hdfs.timeZone | Local Time | Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
hdfs.useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.closeTries | 0 | Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. |
hdfs.retryInterval | 180 | Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension. |
serializer | TEXT | Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface. |
serializer.* |
Deprecated Properties
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -