content/releases/content/1.10.0/FlumeUserGuide.html [7495:8699]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
SequenceGenerator | SpoolDirectory | SyslogTcp | SyslogUDP | Taildir | Thrift | |
AppendAcceptedCount | x | |||||
AppendBatchAcceptedCount | x | x | x | x | ||
AppendBatchReceivedCount | x | x | x | |||
AppendReceivedCount | x | |||||
ChannelWriteFail | x | x | x | x | x | x |
EventAcceptedCount | x | x | x | x | x | x |
EventReadFail | x | x | x | x | ||
EventReceivedCount | x | x | x | x | x | |
GenericProcessingFail | x | x | ||||
KafkaCommitTimer | ||||||
KafkaEmptyCount | ||||||
KafkaEventGetTimer | ||||||
OpenConnectionCount |
HDFSEvent | Hive | Http | Kafka | Morphline | RollingFile | |
BatchCompleteCount | x | x | x | |||
BatchEmptyCount | x | x | x | x | ||
BatchUnderflowCount | x | x | x | x | ||
ChannelReadFail | x | x | x | x | x | x |
ConnectionClosedCount | x | x | x | |||
ConnectionCreatedCount | x | x | x | |||
ConnectionFailedCount | x | x | x | |||
EventDrainAttemptCount | x | x | x | x | x | |
EventDrainSuccessCount | x | x | x | x | x | x |
EventWriteFail | x | x | x | x | x | x |
KafkaEventSendTimer | x | |||||
RollbackCount | x |
File | Kafka | Memory | PseudoTxnMemory | SpillableMemory | |
ChannelCapacity | x | x | x | ||
ChannelSize | x | x | x | x | |
CheckpointBackupWriteErrorCount | x | ||||
CheckpointWriteErrorCount | x | ||||
EventPutAttemptCount | x | x | x | x | x |
EventPutErrorCount | x | ||||
EventPutSuccessCount | x | x | x | x | x |
EventTakeAttemptCount | x | x | x | x | x |
EventTakeErrorCount | x | ||||
EventTakeSuccessCount | x | x | x | x | x |
KafkaCommitTimer | x | ||||
KafkaEventGetTimer | x | ||||
KafkaEventSendTimer | x | ||||
Open | x | ||||
RollbackCounter | x | ||||
Unhealthy | x |
JMX Reporting can be enabled by specifying JMX parameters in the JAVA_OPTS environment variable using flume-env.sh, like
export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
NOTE: The sample above disables the security. To enable Security, please refer http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Flume can also report these metrics to Ganglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agent must be started with this support. The Flume agent has to be started by passing in the following parameters as system properties prefixed by flume.monitoring., and can be specified in the flume-env.sh:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be ganglia |
hosts | – | Comma-separated list of hostname:port of Ganglia servers |
pollFrequency | 60 | Time, in seconds, between consecutive reporting to Ganglia server |
isGanglia3 | false | Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format |
We can start Flume with Ganglia support as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hosts a Web server on a configurable port. Flume reports metrics in the following JSON format:
{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}
Here is an example:
{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"468086",
"ChannelSize":"233428",
"StartTime":"1344882233070",
"EventTakeSuccessCount":"458200",
"ChannelCapacity":"600000",
"EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"22948908",
"ChannelSize":"5",
"StartTime":"1344882209413",
"EventTakeSuccessCount":"22948900",
"ChannelCapacity":"100",
"EventTakeAttemptCount":"22948908"}
}
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be http |
port | 41414 | The port to start the server on. |
We can start Flume with JSON Reporting support as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
Metrics will then be available at http://<hostname>:<port>/metrics webpage. Custom components can report metrics as mentioned in the Ganglia section above.
It is possible to report metrics to other systems by writing servers that do the reporting. Any reporting class has to implement the interface, org.apache.flume.instrumentation.MonitorService. Such a class can be used the same way the GangliaServer is used for reporting. They can poll the platform mbean server to poll the mbeans for metrics. For example, if an HTTP monitoring service called HTTPReporting can be used as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be FQCN |
Any custom flume components should inherit from the org.apache.flume.instrumentation.MonitoredCounterGroup class. The class should then provide getter methods for each of the metrics it exposes. See the code below. The MonitoredCounterGroup expects a list of attributes whose metrics are exposed by this class. As of now, this class only supports exposing metrics as long values.
public class SinkCounter extends MonitoredCounterGroup implements
SinkCounterMBean {
private static final String COUNTER_CONNECTION_CREATED =
"sink.connection.creation.count";
private static final String COUNTER_CONNECTION_CLOSED =
"sink.connection.closed.count";
private static final String COUNTER_CONNECTION_FAILED =
"sink.connection.failed.count";
private static final String COUNTER_BATCH_EMPTY =
"sink.batch.empty";
private static final String COUNTER_BATCH_UNDERFLOW =
"sink.batch.underflow";
private static final String COUNTER_BATCH_COMPLETE =
"sink.batch.complete";
private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
"sink.event.drain.attempt";
private static final String COUNTER_EVENT_DRAIN_SUCCESS =
"sink.event.drain.sucess";
private static final String[] ATTRIBUTES = {
COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
};
public SinkCounter(String name) {
super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
}
@Override
public long getConnectionCreatedCount() {
return get(COUNTER_CONNECTION_CREATED);
}
public long incrementConnectionCreatedCount() {
return increment(COUNTER_CONNECTION_CREATED);
}
}
File Channel Integrity tool verifies the integrity of individual Events in the File channel and removes corrupted Events.
The tools can be run as follows:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir
where datadir is the comma separated list of data directory to be verified.
Following are the options available
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
Event validator tool can be used to validate the File Channel Event’s in application specific way. The tool applies the user provider validation login on each event and drop the event which do not confirm to the logic.
The tools can be run as follows:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000
where datadir is the comma separated list of data directory to be verified.
Following are the options available
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
e/eventValidator | Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath |
The Event validator implementation must implement EventValidator interface. It’s recommended not to throw any exception from the implementation as they are treated as invalid events. Additional parameters can be passed to EventValitor implementation via -D options.
Let’s see an example of simple size based Event Validator, which shall reject event’s larger than maximum size specified.
public static class MyEventValidator implements EventValidator {
private int value = 0;
private MyEventValidator(int val) {
value = val;
}
@Override
public boolean validateEvent(Event event) {
return event.getBody() <= value;
}
public static class Builder implements EventValidator.Builder {
private int sizeValidator = 0;
@Override
public EventValidator build() {
return new DummyEventVerifier(sizeValidator);
}
@Override
public void configure(Context context) {
binaryValidator = context.getInteger("maxSize");
}
}
}
Flume is very flexible and allows a large range of possible deployment scenarios. If you plan to use Flume in a large, production deployment, it is prudent to spend some time thinking about how to express your problem in terms of a Flume topology. This section covers a few considerations.
If you need to ingest textual log data into Hadoop/HDFS then Flume is the right fit for your problem, full stop. For other use cases, here are some guidelines:
Flume is designed to transport and ingest regularly-generated event data over relatively stable, potentially complex topologies. The notion of “event data” is very broadly defined. To Flume, an event is just a generic blob of bytes. There are some limitations on how large an event can be - for instance, it cannot be larger than what you can store in memory or on disk on a single machine - but in practice, flume events can be everything from textual log entries to image files. The key property of an event is that they are generated in a continuous, streaming fashion. If your data is not regularly generated (i.e. you are trying to do a single bulk load of data into a Hadoop cluster) then Flume will still work, but it is probably overkill for your situation. Flume likes relatively stable topologies. Your topologies do not need to be immutable, because Flume can deal with changes in topology without losing data and can also tolerate periodic reconfiguration due to fail-over or provisioning. It probably won’t work well if you plant to change topologies every day, because reconfiguration takes some thought and overhead.
The reliability of a Flume flow depends on several factors. By adjusting these factors, you can achieve a wide array of reliability options with Flume.
What type of channel you use. Flume has both durable channels (those which will persist data to disk) and non durable channels (those which will lose data if a machine fails). Durable channels use disk-based storage, and data stored in such channels will persist across machine restarts or non disk-related failures.
Whether your channels are sufficiently provisioned for the workload. Channels in Flume act as buffers at various hops. These buffers have a fixed capacity, and once that capacity is full you will create back pressure on earlier points in the flow. If this pressure propagates to the source of the flow, Flume will become unavailable and may lose data.
Whether you use redundant topologies. Flume let’s you replicate flows across redundant topologies. This can provide a very easy source of fault tolerance and one which is overcomes both disk or machine failures.
The best way to think about reliability in a Flume topology is to consider various failure scenarios and their outcomes. What happens if a disk fails? What happens if a machine fails? What happens if your terminal sink (e.g. HDFS) goes down for some time and you have back pressure? The space of possible designs is huge, but the underlying questions you need to ask are just a handful.
The first step in designing a Flume topology is to enumerate all sources and destinations (terminal sinks) for your data. These will define the edge points of your topology. The next consideration is whether to introduce intermediate aggregation tiers or event routing. If you are collecting data form a large number of sources, it can be helpful to aggregate the data in order to simplify ingestion at the terminal sink. An aggregation tier can also smooth out burstiness from sources or unavailability at sinks, by acting as a buffer. If you are routing data between different locations, you may also want to split flows at various points: this creates sub-topologies which may themselves include aggregation points.
Once you have an idea of what your topology will look like, the next question is how much hardware and networking capacity is needed. This starts by quantifying how much data you generate. That is not always a simple task! Most data streams are bursty (for instance, due to diurnal patterns) and potentially unpredictable. A good starting point is to think about the maximum throughput you’ll have in each tier of the topology, both in terms of events per second and bytes per second. Once you know the required throughput of a given tier, you can calulate a lower bound on how many nodes you require for that tier. To determine attainable throughput, it’s best to experiment with Flume on your hardware, using synthetic or sampled event data. In general, disk-based channels should get 10’s of MB/s and memory based channels should get 100’s of MB/s or more. Performance will vary widely, however depending on hardware and operating environment.
Sizing aggregate throughput gives you a lower bound on the number of nodes you will need to each tier. There are several reasons to have additional nodes, such as increased redundancy and better ability to absorb bursts in load.
If the Flume agent goes down, then the all the flows hosted on that agent are aborted. Once the agent is restarted, then flow will resume. The flow using file channel or other stable channel will resume processing events where it left off. If the agent can’t be restarted on the same hardware, then there is an option to migrate the database to another hardware and setup a new Flume agent that can resume processing the events saved in the db. The database HA futures can be leveraged to move the Flume agent to another host.
Currently Flume supports HDFS 0.20.2 and 0.23.
TBD
TBD
TBD
TBD
Component Interface | Type Alias | Implementation Class |
---|---|---|
org.apache.flume.Channel | memory | org.apache.flume.channel.MemoryChannel |
org.apache.flume.Channel | jdbc | org.apache.flume.channel.jdbc.JdbcChannel |
org.apache.flume.Channel | file | org.apache.flume.channel.file.FileChannel |
org.apache.flume.Channel | – | org.apache.flume.channel.PseudoTxnMemoryChannel |
org.apache.flume.Channel | – | org.example.MyChannel |
org.apache.flume.Source | avro | org.apache.flume.source.AvroSource |
org.apache.flume.Source | netcat | org.apache.flume.source.NetcatSource |
org.apache.flume.Source | seq | org.apache.flume.source.SequenceGeneratorSource |
org.apache.flume.Source | exec | org.apache.flume.source.ExecSource |
org.apache.flume.Source | syslogtcp | org.apache.flume.source.SyslogTcpSource |
org.apache.flume.Source | multiport_syslogtcp | org.apache.flume.source.MultiportSyslogTCPSource |
org.apache.flume.Source | syslogudp | org.apache.flume.source.SyslogUDPSource |
org.apache.flume.Source | spooldir | org.apache.flume.source.SpoolDirectorySource |
org.apache.flume.Source | http | org.apache.flume.source.http.HTTPSource |
org.apache.flume.Source | thrift | org.apache.flume.source.ThriftSource |
org.apache.flume.Source | jms | org.apache.flume.source.jms.JMSSource |
org.apache.flume.Source | – | org.apache.flume.source.avroLegacy.AvroLegacySource |
org.apache.flume.Source | – | org.apache.flume.source.thriftLegacy.ThriftLegacySource |
org.apache.flume.Source | – | org.example.MySource |
org.apache.flume.Sink | null | org.apache.flume.sink.NullSink |
org.apache.flume.Sink | logger | org.apache.flume.sink.LoggerSink |
org.apache.flume.Sink | avro | org.apache.flume.sink.AvroSink |
org.apache.flume.Sink | hdfs | org.apache.flume.sink.hdfs.HDFSEventSink |
org.apache.flume.Sink | hbase | org.apache.flume.sink.hbase.HBaseSink |
org.apache.flume.Sink | hbase2 | org.apache.flume.sink.hbase2.HBase2Sink |
org.apache.flume.Sink | asynchbase | org.apache.flume.sink.hbase.AsyncHBaseSink |
org.apache.flume.Sink | file_roll | org.apache.flume.sink.RollingFileSink |
org.apache.flume.Sink | irc | org.apache.flume.sink.irc.IRCSink |
org.apache.flume.Sink | thrift | org.apache.flume.sink.ThriftSink |
org.apache.flume.Sink | – | org.example.MySink |
org.apache.flume.ChannelSelector | replicating | org.apache.flume.channel.ReplicatingChannelSelector |
org.apache.flume.ChannelSelector | multiplexing | org.apache.flume.channel.MultiplexingChannelSelector |
org.apache.flume.ChannelSelector | – | org.example.MyChannelSelector |
org.apache.flume.SinkProcessor | default | org.apache.flume.sink.DefaultSinkProcessor |
org.apache.flume.SinkProcessor | failover | org.apache.flume.sink.FailoverSinkProcessor |
org.apache.flume.SinkProcessor | load_balance | org.apache.flume.sink.LoadBalancingSinkProcessor |
org.apache.flume.SinkProcessor | – | |
org.apache.flume.interceptor.Interceptor | timestamp | org.apache.flume.interceptor.TimestampInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | host | org.apache.flume.interceptor.HostInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | static | org.apache.flume.interceptor.StaticInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_filter | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_extractor | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | jceksfile | org.apache.flume.channel.file.encryption.JCEFileKeyProvider |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
org.apache.flume.channel.file.encryption.CipherProvider | aesctrnopadding | org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider |
org.apache.flume.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
org.apache.flume.serialization.EventSerializer$Builder | text | org.apache.flume.serialization.BodyTextEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | avro_event | org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
These conventions for alias names are used in the component-specific examples above, to keep the names short and consistent across all examples.
Alias Name | Alias Type |
---|---|
a | agent |
c | channel |
r | source |
k | sink |
g | sink group |
i | interceptor |
y | key |
h | host |
s | serializer |
JMX Reporting can be enabled by specifying JMX parameters in the JAVA_OPTS environment variable using flume-env.sh, like
export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
NOTE: The sample above disables the security. To enable Security, please refer http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Flume can also report these metrics to Ganglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agent must be started with this support. The Flume agent has to be started by passing in the following parameters as system properties prefixed by flume.monitoring., and can be specified in the flume-env.sh:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be ganglia |
hosts | – | Comma-separated list of hostname:port of Ganglia servers |
pollFrequency | 60 | Time, in seconds, between consecutive reporting to Ganglia server |
isGanglia3 | false | Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format |
We can start Flume with Ganglia support as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hosts a Web server on a configurable port. Flume reports metrics in the following JSON format:
{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}
Here is an example:
{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"468086",
"ChannelSize":"233428",
"StartTime":"1344882233070",
"EventTakeSuccessCount":"458200",
"ChannelCapacity":"600000",
"EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"22948908",
"ChannelSize":"5",
"StartTime":"1344882209413",
"EventTakeSuccessCount":"22948900",
"ChannelCapacity":"100",
"EventTakeAttemptCount":"22948908"}
}
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be http |
port | 41414 | The port to start the server on. |
We can start Flume with JSON Reporting support as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
Metrics will then be available at http://<hostname>:<port>/metrics webpage. Custom components can report metrics as mentioned in the Ganglia section above.
It is possible to report metrics to other systems by writing servers that do the reporting. Any reporting class has to implement the interface, org.apache.flume.instrumentation.MonitorService. Such a class can be used the same way the GangliaServer is used for reporting. They can poll the platform mbean server to poll the mbeans for metrics. For example, if an HTTP monitoring service called HTTPReporting can be used as follows:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be FQCN |
Any custom flume components should inherit from the org.apache.flume.instrumentation.MonitoredCounterGroup class. The class should then provide getter methods for each of the metrics it exposes. See the code below. The MonitoredCounterGroup expects a list of attributes whose metrics are exposed by this class. As of now, this class only supports exposing metrics as long values.
public class SinkCounter extends MonitoredCounterGroup implements
SinkCounterMBean {
private static final String COUNTER_CONNECTION_CREATED =
"sink.connection.creation.count";
private static final String COUNTER_CONNECTION_CLOSED =
"sink.connection.closed.count";
private static final String COUNTER_CONNECTION_FAILED =
"sink.connection.failed.count";
private static final String COUNTER_BATCH_EMPTY =
"sink.batch.empty";
private static final String COUNTER_BATCH_UNDERFLOW =
"sink.batch.underflow";
private static final String COUNTER_BATCH_COMPLETE =
"sink.batch.complete";
private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
"sink.event.drain.attempt";
private static final String COUNTER_EVENT_DRAIN_SUCCESS =
"sink.event.drain.sucess";
private static final String[] ATTRIBUTES = {
COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
};
public SinkCounter(String name) {
super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
}
@Override
public long getConnectionCreatedCount() {
return get(COUNTER_CONNECTION_CREATED);
}
public long incrementConnectionCreatedCount() {
return increment(COUNTER_CONNECTION_CREATED);
}
}
File Channel Integrity tool verifies the integrity of individual Events in the File channel and removes corrupted Events.
The tools can be run as follows:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir
where datadir is the comma separated list of data directory to be verified.
Following are the options available
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
Event validator tool can be used to validate the File Channel Event’s in application specific way. The tool applies the user provider validation login on each event and drop the event which do not confirm to the logic.
The tools can be run as follows:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000
where datadir is the comma separated list of data directory to be verified.
Following are the options available
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
e/eventValidator | Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath |
The Event validator implementation must implement EventValidator interface. It’s recommended not to throw any exception from the implementation as they are treated as invalid events. Additional parameters can be passed to EventValitor implementation via -D options.
Let’s see an example of simple size based Event Validator, which shall reject event’s larger than maximum size specified.
public static class MyEventValidator implements EventValidator {
private int value = 0;
private MyEventValidator(int val) {
value = val;
}
@Override
public boolean validateEvent(Event event) {
return event.getBody() <= value;
}
public static class Builder implements EventValidator.Builder {
private int sizeValidator = 0;
@Override
public EventValidator build() {
return new DummyEventVerifier(sizeValidator);
}
@Override
public void configure(Context context) {
binaryValidator = context.getInteger("maxSize");
}
}
}
Flume is very flexible and allows a large range of possible deployment scenarios. If you plan to use Flume in a large, production deployment, it is prudent to spend some time thinking about how to express your problem in terms of a Flume topology. This section covers a few considerations.
If you need to ingest textual log data into Hadoop/HDFS then Flume is the right fit for your problem, full stop. For other use cases, here are some guidelines:
Flume is designed to transport and ingest regularly-generated event data over relatively stable, potentially complex topologies. The notion of “event data” is very broadly defined. To Flume, an event is just a generic blob of bytes. There are some limitations on how large an event can be - for instance, it cannot be larger than what you can store in memory or on disk on a single machine - but in practice, flume events can be everything from textual log entries to image files. The key property of an event is that they are generated in a continuous, streaming fashion. If your data is not regularly generated (i.e. you are trying to do a single bulk load of data into a Hadoop cluster) then Flume will still work, but it is probably overkill for your situation. Flume likes relatively stable topologies. Your topologies do not need to be immutable, because Flume can deal with changes in topology without losing data and can also tolerate periodic reconfiguration due to fail-over or provisioning. It probably won’t work well if you plant to change topologies every day, because reconfiguration takes some thought and overhead.
The reliability of a Flume flow depends on several factors. By adjusting these factors, you can achieve a wide array of reliability options with Flume.
What type of channel you use. Flume has both durable channels (those which will persist data to disk) and non durable channels (those which will lose data if a machine fails). Durable channels use disk-based storage, and data stored in such channels will persist across machine restarts or non disk-related failures.
Whether your channels are sufficiently provisioned for the workload. Channels in Flume act as buffers at various hops. These buffers have a fixed capacity, and once that capacity is full you will create back pressure on earlier points in the flow. If this pressure propagates to the source of the flow, Flume will become unavailable and may lose data.
Whether you use redundant topologies. Flume let’s you replicate flows across redundant topologies. This can provide a very easy source of fault tolerance and one which is overcomes both disk or machine failures.
The best way to think about reliability in a Flume topology is to consider various failure scenarios and their outcomes. What happens if a disk fails? What happens if a machine fails? What happens if your terminal sink (e.g. HDFS) goes down for some time and you have back pressure? The space of possible designs is huge, but the underlying questions you need to ask are just a handful.
The first step in designing a Flume topology is to enumerate all sources and destinations (terminal sinks) for your data. These will define the edge points of your topology. The next consideration is whether to introduce intermediate aggregation tiers or event routing. If you are collecting data form a large number of sources, it can be helpful to aggregate the data in order to simplify ingestion at the terminal sink. An aggregation tier can also smooth out burstiness from sources or unavailability at sinks, by acting as a buffer. If you are routing data between different locations, you may also want to split flows at various points: this creates sub-topologies which may themselves include aggregation points.
Once you have an idea of what your topology will look like, the next question is how much hardware and networking capacity is needed. This starts by quantifying how much data you generate. That is not always a simple task! Most data streams are bursty (for instance, due to diurnal patterns) and potentially unpredictable. A good starting point is to think about the maximum throughput you’ll have in each tier of the topology, both in terms of events per second and bytes per second. Once you know the required throughput of a given tier, you can calulate a lower bound on how many nodes you require for that tier. To determine attainable throughput, it’s best to experiment with Flume on your hardware, using synthetic or sampled event data. In general, disk-based channels should get 10’s of MB/s and memory based channels should get 100’s of MB/s or more. Performance will vary widely, however depending on hardware and operating environment.
Sizing aggregate throughput gives you a lower bound on the number of nodes you will need to each tier. There are several reasons to have additional nodes, such as increased redundancy and better ability to absorb bursts in load.
If the Flume agent goes down, then the all the flows hosted on that agent are aborted. Once the agent is restarted, then flow will resume. The flow using file channel or other stable channel will resume processing events where it left off. If the agent can’t be restarted on the same hardware, then there is an option to migrate the database to another hardware and setup a new Flume agent that can resume processing the events saved in the db. The database HA futures can be leveraged to move the Flume agent to another host.
Currently Flume supports HDFS 0.20.2 and 0.23.
TBD
TBD
TBD
TBD
Component Interface | Type Alias | Implementation Class |
---|---|---|
org.apache.flume.Channel | memory | org.apache.flume.channel.MemoryChannel |
org.apache.flume.Channel | jdbc | org.apache.flume.channel.jdbc.JdbcChannel |
org.apache.flume.Channel | file | org.apache.flume.channel.file.FileChannel |
org.apache.flume.Channel | – | org.apache.flume.channel.PseudoTxnMemoryChannel |
org.apache.flume.Channel | – | org.example.MyChannel |
org.apache.flume.Source | avro | org.apache.flume.source.AvroSource |
org.apache.flume.Source | netcat | org.apache.flume.source.NetcatSource |
org.apache.flume.Source | seq | org.apache.flume.source.SequenceGeneratorSource |
org.apache.flume.Source | exec | org.apache.flume.source.ExecSource |
org.apache.flume.Source | syslogtcp | org.apache.flume.source.SyslogTcpSource |
org.apache.flume.Source | multiport_syslogtcp | org.apache.flume.source.MultiportSyslogTCPSource |
org.apache.flume.Source | syslogudp | org.apache.flume.source.SyslogUDPSource |
org.apache.flume.Source | spooldir | org.apache.flume.source.SpoolDirectorySource |
org.apache.flume.Source | http | org.apache.flume.source.http.HTTPSource |
org.apache.flume.Source | thrift | org.apache.flume.source.ThriftSource |
org.apache.flume.Source | jms | org.apache.flume.source.jms.JMSSource |
org.apache.flume.Source | – | org.apache.flume.source.avroLegacy.AvroLegacySource |
org.apache.flume.Source | – | org.apache.flume.source.thriftLegacy.ThriftLegacySource |
org.apache.flume.Source | – | org.example.MySource |
org.apache.flume.Sink | null | org.apache.flume.sink.NullSink |
org.apache.flume.Sink | logger | org.apache.flume.sink.LoggerSink |
org.apache.flume.Sink | avro | org.apache.flume.sink.AvroSink |
org.apache.flume.Sink | hdfs | org.apache.flume.sink.hdfs.HDFSEventSink |
org.apache.flume.Sink | hbase | org.apache.flume.sink.hbase.HBaseSink |
org.apache.flume.Sink | hbase2 | org.apache.flume.sink.hbase2.HBase2Sink |
org.apache.flume.Sink | asynchbase | org.apache.flume.sink.hbase.AsyncHBaseSink |
org.apache.flume.Sink | file_roll | org.apache.flume.sink.RollingFileSink |
org.apache.flume.Sink | irc | org.apache.flume.sink.irc.IRCSink |
org.apache.flume.Sink | thrift | org.apache.flume.sink.ThriftSink |
org.apache.flume.Sink | – | org.example.MySink |
org.apache.flume.ChannelSelector | replicating | org.apache.flume.channel.ReplicatingChannelSelector |
org.apache.flume.ChannelSelector | multiplexing | org.apache.flume.channel.MultiplexingChannelSelector |
org.apache.flume.ChannelSelector | – | org.example.MyChannelSelector |
org.apache.flume.SinkProcessor | default | org.apache.flume.sink.DefaultSinkProcessor |
org.apache.flume.SinkProcessor | failover | org.apache.flume.sink.FailoverSinkProcessor |
org.apache.flume.SinkProcessor | load_balance | org.apache.flume.sink.LoadBalancingSinkProcessor |
org.apache.flume.SinkProcessor | – | |
org.apache.flume.interceptor.Interceptor | timestamp | org.apache.flume.interceptor.TimestampInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | host | org.apache.flume.interceptor.HostInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | static | org.apache.flume.interceptor.StaticInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_filter | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_extractor | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | jceksfile | org.apache.flume.channel.file.encryption.JCEFileKeyProvider |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
org.apache.flume.channel.file.encryption.CipherProvider | aesctrnopadding | org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider |
org.apache.flume.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
org.apache.flume.serialization.EventSerializer$Builder | text | org.apache.flume.serialization.BodyTextEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | avro_event | org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
These conventions for alias names are used in the component-specific examples above, to keep the names short and consistent across all examples.
Alias Name | Alias Type |
---|---|
a | agent |
c | channel |
r | source |
k | sink |
g | sink group |
i | interceptor |
y | key |
h | host |
s | serializer |