content/FlumeDeveloperGuide.html (958 lines of code) (raw):
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Flume 1.11.0 Developer Guide — Apache Flume</title>
<link rel="stylesheet" href="_static/flume.css" type="text/css" />
<link rel="stylesheet" href="_static/pygments.css" type="text/css" />
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT: '',
VERSION: '',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '.html',
HAS_SOURCE: true
};
</script>
<script type="text/javascript" src="_static/jquery.js"></script>
<script type="text/javascript" src="_static/underscore.js"></script>
<script type="text/javascript" src="_static/doctools.js"></script>
<link rel="top" title="Apache Flume" href="index.html" />
<link rel="up" title="Documentation" href="documentation.html" />
<link rel="next" title="Releases" href="releases/index.html" />
<link rel="prev" title="Flume 1.11.0 User Guide" href="FlumeUserGuide.html" />
</head>
<body>
<div class="header">
<table width="100%" border="0">
<tr>
<td width="10%">
<div class="logo">
<a href="index.html">
<img class="logo" src="_static/flume-logo.png" alt="Logo"/>
</a>
</div>
</td>
<td width="2%">
<span class="trademark">™</span>
</td>
<td width="68%" align="center" class="pageTitle">Apache Flume<sup><span class="trademark">™</span></sup>
</td>
<td width="20%">
<a href="https://www.apache.org">
<img src="_static/feather-small.png" alt="Apache Software Foundation" height="70"/>
</a>
</td>
</tr>
</table>
</div>
<div class="document">
<div class="documentwrapper">
<div class="bodywrapper">
<div class="body">
<div class="section" id="flume-1-11-0-developer-guide">
<h1>Flume 1.11.0 Developer Guide<a class="headerlink" href="#flume-1-11-0-developer-guide" title="Permalink to this headline">¶</a></h1>
<div class="section" id="introduction">
<h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to this headline">¶</a></h2>
<div class="section" id="overview">
<h3>Overview<a class="headerlink" href="#overview" title="Permalink to this headline">¶</a></h3>
<p>Apache Flume is a distributed, reliable, and available system for
efficiently collecting, aggregating and moving large amounts of log
data from many different sources to a centralized data store.</p>
<p>Apache Flume is a top-level project at the Apache Software Foundation.
There are currently two release code lines available, versions 0.9.x and 1.x.
This documentation applies to the 1.x codeline.
For the 0.9.x codeline, please see the <a class="reference external" href="https://archive.cloudera.com/cdh/3/flume/DeveloperGuide/">Flume 0.9.x Developer Guide</a>.</p>
</div>
<div class="section" id="architecture">
<h3>Architecture<a class="headerlink" href="#architecture" title="Permalink to this headline">¶</a></h3>
<div class="section" id="data-flow-model">
<h4>Data flow model<a class="headerlink" href="#data-flow-model" title="Permalink to this headline">¶</a></h4>
<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is a unit of data that flows through a Flume agent. The <tt class="docutils literal"><span class="pre">Event</span></tt>
flows from <tt class="docutils literal"><span class="pre">Source</span></tt> to <tt class="docutils literal"><span class="pre">Channel</span></tt> to <tt class="docutils literal"><span class="pre">Sink</span></tt>, and is represented by an
implementation of the <tt class="docutils literal"><span class="pre">Event</span></tt> interface. An <tt class="docutils literal"><span class="pre">Event</span></tt> carries a payload (byte
array) that is accompanied by an optional set of headers (string attributes).
A Flume agent is a process (JVM) that hosts the components that allow
<tt class="docutils literal"><span class="pre">Event</span></tt>s to flow from an external source to a external destination.</p>
<div class="figure align-center">
<img alt="Agent component diagram" src="_images/DevGuide_image00.png" />
</div>
<p>A <tt class="docutils literal"><span class="pre">Source</span></tt> consumes <tt class="docutils literal"><span class="pre">Event</span></tt>s having a specific format, and those
<tt class="docutils literal"><span class="pre">Event</span></tt>s are delivered to the <tt class="docutils literal"><span class="pre">Source</span></tt> by an external source like a web
server. For example, an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> can be used to receive Avro <tt class="docutils literal"><span class="pre">Event</span></tt>s
from clients or from other Flume agents in the flow. When a <tt class="docutils literal"><span class="pre">Source</span></tt> receives
an <tt class="docutils literal"><span class="pre">Event</span></tt>, it stores it into one or more <tt class="docutils literal"><span class="pre">Channel</span></tt>s. The <tt class="docutils literal"><span class="pre">Channel</span></tt> is
a passive store that holds the <tt class="docutils literal"><span class="pre">Event</span></tt> until that <tt class="docutils literal"><span class="pre">Event</span></tt> is consumed by a
<tt class="docutils literal"><span class="pre">Sink</span></tt>. One type of <tt class="docutils literal"><span class="pre">Channel</span></tt> available in Flume is the <tt class="docutils literal"><span class="pre">FileChannel</span></tt>
which uses the local filesystem as its backing store. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is responsible
for removing an <tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and putting it into an external
repository like HDFS (in the case of an <tt class="docutils literal"><span class="pre">HDFSEventSink</span></tt>) or forwarding it to
the <tt class="docutils literal"><span class="pre">Source</span></tt> at the next hop of the flow. The <tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> within
the given agent run asynchronously with the <tt class="docutils literal"><span class="pre">Event</span></tt>s staged in the
<tt class="docutils literal"><span class="pre">Channel</span></tt>.</p>
</div>
<div class="section" id="reliability">
<h4>Reliability<a class="headerlink" href="#reliability" title="Permalink to this headline">¶</a></h4>
<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is staged in a Flume agent’s <tt class="docutils literal"><span class="pre">Channel</span></tt>. Then it’s the
<tt class="docutils literal"><span class="pre">Sink</span></tt>‘s responsibility to deliver the <tt class="docutils literal"><span class="pre">Event</span></tt> to the next agent or
terminal repository (like HDFS) in the flow. The <tt class="docutils literal"><span class="pre">Sink</span></tt> removes an <tt class="docutils literal"><span class="pre">Event</span></tt>
from the <tt class="docutils literal"><span class="pre">Channel</span></tt> only after the <tt class="docutils literal"><span class="pre">Event</span></tt> is stored into the <tt class="docutils literal"><span class="pre">Channel</span></tt> of
the next agent or stored in the terminal repository. This is how the single-hop
message delivery semantics in Flume provide end-to-end reliability of the flow.
Flume uses a transactional approach to guarantee the reliable delivery of the
<tt class="docutils literal"><span class="pre">Event</span></tt>s. The <tt class="docutils literal"><span class="pre">Source</span></tt>s and <tt class="docutils literal"><span class="pre">Sink</span></tt>s encapsulate the
storage/retrieval of the <tt class="docutils literal"><span class="pre">Event</span></tt>s in a <tt class="docutils literal"><span class="pre">Transaction</span></tt> provided by the
<tt class="docutils literal"><span class="pre">Channel</span></tt>. This ensures that the set of <tt class="docutils literal"><span class="pre">Event</span></tt>s are reliably passed from
point to point in the flow. In the case of a multi-hop flow, the <tt class="docutils literal"><span class="pre">Sink</span></tt> from
the previous hop and the <tt class="docutils literal"><span class="pre">Source</span></tt> of the next hop both have their
<tt class="docutils literal"><span class="pre">Transaction</span></tt>s open to ensure that the <tt class="docutils literal"><span class="pre">Event</span></tt> data is safely stored in
the <tt class="docutils literal"><span class="pre">Channel</span></tt> of the next hop.</p>
</div>
</div>
<div class="section" id="building-flume">
<h3>Building Flume<a class="headerlink" href="#building-flume" title="Permalink to this headline">¶</a></h3>
<div class="section" id="getting-the-source">
<h4>Getting the source<a class="headerlink" href="#getting-the-source" title="Permalink to this headline">¶</a></h4>
<p>Check-out the code using Git. Click here for
<a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">the git repository root</a>
or at <a class="reference external" href="https://github.com/apache/flume.git">GitHub</a>.</p>
<p>The Flume 1.x development happens under the branch “trunk” so this command line
can be used:</p>
<blockquote>
<div>git clone <a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">https://git-wip-us.apache.org/repos/asf/flume.git</a></div></blockquote>
<p>or</p>
<blockquote>
<div>git clone <a class="reference external" href="https://github.com/apache/flume.git">https://github.com/apache/flume.git</a></div></blockquote>
</div>
<div class="section" id="compile-test-flume">
<h4>Compile/test Flume<a class="headerlink" href="#compile-test-flume" title="Permalink to this headline">¶</a></h4>
<p>The Flume build is mavenized. You can compile Flume using the standard Maven
commands:</p>
<ol class="arabic simple">
<li>Compile only: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">compile</span></tt></li>
<li>Compile and run unit tests: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span></tt></li>
<li>Run individual test(s): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span> <span class="pre">-Dtest=<Test1>,<Test2>,...</span> <span class="pre">-DfailIfNoTests=false</span></tt></li>
<li>Create tarball package: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span></tt></li>
<li>Create tarball package (skip unit tests): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span> <span class="pre">-DskipTests</span></tt></li>
</ol>
</div>
<div class="section" id="updating-protocol-buffer-version">
<h4>Updating Protocol Buffer Version<a class="headerlink" href="#updating-protocol-buffer-version" title="Permalink to this headline">¶</a></h4>
<p>File channel has a dependency on Protocol Buffer. When updating the version of Protocol Buffer
used by Flume, it is necessary to regenerate the data access classes using the protoc compiler
that is part of Protocol Buffer as follows.</p>
<ol class="arabic simple">
<li>Update version of Protocol Buffer in pom.xml</li>
<li>Add Apache license header to any of the generated files that are missing it</li>
<li>Rebuild and test Flume: <tt class="docutils literal"><span class="pre">cd</span> <span class="pre">../..;</span> <span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span></tt></li>
</ol>
</div>
</div>
<div class="section" id="developing-custom-components">
<h3>Developing custom components<a class="headerlink" href="#developing-custom-components" title="Permalink to this headline">¶</a></h3>
<div class="section" id="client">
<h4>Client<a class="headerlink" href="#client" title="Permalink to this headline">¶</a></h4>
<p>The client operates at the point of origin of events and delivers them to a
Flume agent. Clients typically operate in the process space of the application
they are consuming data from. Flume currently supports Avro, log4j, syslog,
and Http POST (with a JSON body) as ways to transfer data from a external
source. Additionally, there’s an <tt class="docutils literal"><span class="pre">ExecSource</span></tt> that can consume the output of a
local process as input to Flume.</p>
<p>It’s quite possible to have a use case where these existing options are not
sufficient. In this case you can build a custom mechanism to send data to
Flume. There are two ways of achieving this. The first option is to create a
custom client that communicates with one of Flume’s existing <tt class="docutils literal"><span class="pre">Source</span></tt>s like
<tt class="docutils literal"><span class="pre">AvroSource</span></tt> or <tt class="docutils literal"><span class="pre">SyslogTcpSource</span></tt>. Here the client should convert its data
into messages understood by these Flume <tt class="docutils literal"><span class="pre">Source</span></tt>s. The other option is to
write a custom Flume <tt class="docutils literal"><span class="pre">Source</span></tt> that directly talks with your existing client
application using some IPC or RPC protocol, and then converts the client data
into Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s to be sent downstream. Note that all events stored
within the <tt class="docutils literal"><span class="pre">Channel</span></tt> of a Flume agent must exist as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p>
<div class="section" id="client-sdk">
<h5>Client SDK<a class="headerlink" href="#client-sdk" title="Permalink to this headline">¶</a></h5>
<p>Though Flume contains a number of built-in mechanisms (i.e. <tt class="docutils literal"><span class="pre">Source</span></tt>s) to
ingest data, often one wants the ability to communicate with Flume directly from
a custom application. The Flume Client SDK is a library that enables
applications to connect to Flume and send data into Flume’s data flow over RPC.</p>
</div>
<div class="section" id="rpc-client-interface">
<h5>RPC client interface<a class="headerlink" href="#rpc-client-interface" title="Permalink to this headline">¶</a></h5>
<p>An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism
supported by Flume. The user’s application can simply call the Flume Client
SDK’s <tt class="docutils literal"><span class="pre">append(Event)</span></tt> or <tt class="docutils literal"><span class="pre">appendBatch(List<Event>)</span></tt> to send data and not
worry about the underlying message exchange details. The user can provide the
required <tt class="docutils literal"><span class="pre">Event</span></tt> arg by either directly implementing the <tt class="docutils literal"><span class="pre">Event</span></tt> interface,
by using a convenience implementation such as the SimpleEvent class, or by using
<tt class="docutils literal"><span class="pre">EventBuilder</span></tt>‘s overloaded <tt class="docutils literal"><span class="pre">withBody()</span></tt> static helper methods.</p>
</div>
<div class="section" id="rpc-clients-avro-and-thrift">
<h5>RPC clients - Avro and Thrift<a class="headerlink" href="#rpc-clients-avro-and-thrift" title="Permalink to this headline">¶</a></h5>
<p>As of Flume 1.4.0, Avro is the default RPC protocol. The
<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> implement the <tt class="docutils literal"><span class="pre">RpcClient</span></tt>
interface. The client needs to create this object with the host and port of
the target Flume agent, and can then use the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> to send data into
the agent. The following example shows how to use the Flume Client SDK API
within a user’s data-generating application:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientFactory</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
<span class="n">MyRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyRpcClientFacade</span><span class="o">();</span>
<span class="c1">// Initialize client with the remote Flume agent's host and port</span>
<span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="s">"host.example.org"</span><span class="o">,</span> <span class="mi">41414</span><span class="o">);</span>
<span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span>
<span class="c1">// configured to listen with an AvroSource.</span>
<span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
<span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span>
<span class="o">}</span>
<span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">class</span> <span class="nc">MyRpcClientFacade</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">hostname</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">port</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">String</span> <span class="n">hostname</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Setup the RPC connection</span>
<span class="k">this</span><span class="o">.</span><span class="na">hostname</span> <span class="o">=</span> <span class="n">hostname</span><span class="o">;</span>
<span class="k">this</span><span class="o">.</span><span class="na">port</span> <span class="o">=</span> <span class="n">port</span><span class="o">;</span>
<span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
<span class="c1">// Use the following method to create a thrift client (instead of the above line):</span>
<span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Create a Flume Event object that encapsulates the sample data</span>
<span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
<span class="c1">// Send the event</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// clean up and recreate the client</span>
<span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
<span class="c1">// Use the following method to create a thrift client (instead of the above line):</span>
<span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// Close the RPC connection</span>
<span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</div>
<p>The remote Flume agent needs to have an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> (or a
<tt class="docutils literal"><span class="pre">ThriftSource</span></tt> if you are using a Thrift client) listening on some port.
Below is an example Flume agent configuration that’s waiting for a connection
from MyApp:</p>
<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span>
<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span>
<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">avro</span>
<span class="c"># For using a thrift source set the following instead of the above line.</span>
<span class="c"># a1.source.r1.type = thrift</span>
<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span>
<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span>
<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span>
</pre></div>
</div>
<p>For more flexibility, the default Flume client implementations
(<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt>) can be configured with these
properties:</p>
<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default (for avro) or thrift (for thrift)</span>
<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 # default client accepts only 1 host</span>
<span class="c"># (additional hosts will be ignored)</span>
<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414 # host and port must both be specified</span>
<span class="c"># (neither has a default)</span>
<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
</pre></div>
</div>
</div>
<div class="section" id="secure-rpc-client-thrift">
<h5>Secure RPC client - Thrift<a class="headerlink" href="#secure-rpc-client-thrift" title="Permalink to this headline">¶</a></h5>
<p>As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.
The client needs to use the getThriftInstance method of <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>
to get hold of a <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt>. <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt> extends
<tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> which implements the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> interface. The kerberos
authentication module resides in flume-ng-auth module which is
required in classpath, when using the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>. Both the client
principal and the client keytab should be passed in as parameters through the
properties and they reflect the credentials of the client to authenticate
against the kerberos KDC. In addition, the server principal of the destination
Thrift source to which this client is connecting to, should also be provided.
The following example shows how to use the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>
within a user’s data-generating application:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.api.SecureRpcClientFactory</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientConfigurationConstants</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
<span class="n">MySecureRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MySecureRpcClientFacade</span><span class="o">();</span>
<span class="c1">// Initialize client with the remote Flume agent's host, port</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="n">RpcClientConfigurationConstants</span><span class="o">.</span><span class="na">CONFIG_CLIENT_TYPE</span><span class="o">,</span> <span class="s">"thrift"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="s">"client.example.org"</span><span class="o">+</span><span class="s">":"</span><span class="o">+</span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="mi">41414</span><span class="o">));</span>
<span class="c1">// Initialize client with the kerberos authentication related properties</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"kerberos"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-principal"</span><span class="o">,</span> <span class="s">"flumeclient/client.example.org@EXAMPLE.ORG"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-keytab"</span><span class="o">,</span> <span class="s">"/tmp/flumeclient.keytab"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"server-principal"</span><span class="o">,</span> <span class="s">"flume/server.example.org@EXAMPLE.ORG"</span><span class="o">);</span>
<span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
<span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span>
<span class="c1">// configured to listen with an AvroSource.</span>
<span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
<span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span>
<span class="o">}</span>
<span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">class</span> <span class="nc">MySecureRpcClientFacade</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">Properties</span> <span class="n">properties</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Properties</span> <span class="n">properties</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Setup the RPC connection</span>
<span class="k">this</span><span class="o">.</span><span class="na">properties</span> <span class="o">=</span> <span class="n">properties</span><span class="o">;</span>
<span class="c1">// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory</span>
<span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Create a Flume Event object that encapsulates the sample data</span>
<span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
<span class="c1">// Send the event</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// clean up and recreate the client</span>
<span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="n">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// Close the RPC connection</span>
<span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</div>
<p>The remote <tt class="docutils literal"><span class="pre">ThriftSource</span></tt> should be started in kerberos mode.
Below is an example Flume agent configuration that’s waiting for a connection
from MyApp:</p>
<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span>
<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span>
<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">thrift</span>
<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span>
<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span>
<span class="na">a1.sources.r1.kerberos</span> <span class="o">=</span> <span class="s">true</span>
<span class="na">a1.sources.r1.agent-principal</span> <span class="o">=</span> <span class="s">flume/server.example.org@EXAMPLE.ORG</span>
<span class="na">a1.sources.r1.agent-keytab</span> <span class="o">=</span> <span class="s">/tmp/flume.keytab</span>
<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span>
</pre></div>
</div>
</div>
<div class="section" id="failover-client">
<h5>Failover Client<a class="headerlink" href="#failover-client" title="Permalink to this headline">¶</a></h5>
<p>This class wraps the default Avro RPC client to provide failover handling
capability to clients. This takes a whitespace-separated list of <host>:<port>
representing the Flume agents that make-up a failover group. The Failover RPC
Client currently does not support thrift. If there’s a
communication error with the currently selected host (i.e. agent) agent,
then the failover client automatically fails-over to the next host in the list.
For example:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the failover</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_failover"</span><span class="o">);</span>
<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span>
<span class="c1">// host/port pair for each host alias</span>
<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span>
<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span>
<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span>
<span class="c1">// create the client with failover properties</span>
<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
</pre></div>
</div>
<p>For more flexibility, the failover Flume client implementation
(<tt class="docutils literal"><span class="pre">FailoverRpcClient</span></tt>) can be configured with these properties:</p>
<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_failover</span>
<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # at least one is required, but 2 or</span>
<span class="c"># more makes better sense</span>
<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span>
<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span>
<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span>
<span class="na">max-attempts</span> <span class="o">=</span> <span class="s">3 # Must be >=0 (default: number of hosts</span>
<span class="c"># specified, 3 in this case). A '0'</span>
<span class="c"># value doesn't make much sense because</span>
<span class="c"># it will just cause an append call to</span>
<span class="c"># immmediately fail. A '1' value means</span>
<span class="c"># that the failover client will try only</span>
<span class="c"># once to send the Event, and if it</span>
<span class="c"># fails then there will be no failover</span>
<span class="c"># to a second client, so this value</span>
<span class="c"># causes the failover client to</span>
<span class="c"># degenerate into just a default client.</span>
<span class="c"># It makes sense to set this value to at</span>
<span class="c"># least the number of hosts that you</span>
<span class="c"># specified.</span>
<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
</pre></div>
</div>
</div>
<div class="section" id="loadbalancing-rpc-client">
<h5>LoadBalancing RPC client<a class="headerlink" href="#loadbalancing-rpc-client" title="Permalink to this headline">¶</a></h5>
<p>The Flume Client SDK also supports an RpcClient which load-balances among
multiple hosts. This type of client takes a whitespace-separated list of
<host>:<port> representing the Flume agents that make-up a load-balancing group.
This client can be configured with a load balancing strategy that either
randomly selects one of the configured hosts, or selects a host in a round-robin
fashion. You can also specify your own custom class that implements the
<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient$HostSelector</span></tt> interface so that a custom selection
order is used. In that case, the FQCN of the custom class needs to be specified
as the value of the <tt class="docutils literal"><span class="pre">host-selector</span></tt> property. The LoadBalancing RPC Client
currently does not support thrift.</p>
<p>If <tt class="docutils literal"><span class="pre">backoff</span></tt> is enabled then the client will temporarily blacklist
hosts that fail, causing them to be excluded from being selected as a failover
host until a given timeout. When the timeout elapses, if the host is still
unresponsive then this is considered a sequential failure, and the timeout is
increased exponentially to avoid potentially getting stuck in long waits on
unresponsive hosts.</p>
<p>The maximum backoff time can be configured by setting <tt class="docutils literal"><span class="pre">maxBackoff</span></tt> (in
milliseconds). The maxBackoff default is 30 seconds (specified in the
<tt class="docutils literal"><span class="pre">OrderSelector</span></tt> class that’s the superclass of both load balancing
strategies). The backoff timeout will increase exponentially with each
sequential failure up to the maximum possible backoff timeout.
The maximum possible backoff is limited to 65536 seconds (about 18.2 hours).
For example:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the load balancing</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_loadbalance"</span><span class="o">);</span>
<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span>
<span class="c1">// host/port pair for each host alias</span>
<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span>
<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span>
<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"host-selector"</span><span class="o">,</span> <span class="s">"random"</span><span class="o">);</span> <span class="c1">// For random host selection</span>
<span class="c1">// props.put("host-selector", "round_robin"); // For round-robin host</span>
<span class="c1">// // selection</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"backoff"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span> <span class="c1">// Disabled by default.</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"maxBackoff"</span><span class="o">,</span> <span class="s">"10000"</span><span class="o">);</span> <span class="c1">// Defaults 0, which effectively</span>
<span class="c1">// becomes 30000 ms</span>
<span class="c1">// Create the client with load balancing properties</span>
<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
</pre></div>
</div>
<p>For more flexibility, the load-balancing Flume client implementation
(<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient</span></tt>) can be configured with these properties:</p>
<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_loadbalance</span>
<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # At least 2 hosts are required</span>
<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span>
<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span>
<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span>
<span class="na">backoff</span> <span class="o">=</span> <span class="s">false # Specifies whether the client should</span>
<span class="c"># back-off from (i.e. temporarily</span>
<span class="c"># blacklist) a failed host</span>
<span class="c"># (default: false).</span>
<span class="na">maxBackoff</span> <span class="o">=</span> <span class="s">0 # Max timeout in millis that a will</span>
<span class="c"># remain inactive due to a previous</span>
<span class="c"># failure with that host (default: 0,</span>
<span class="c"># which effectively becomes 30000)</span>
<span class="na">host-selector</span> <span class="o">=</span> <span class="s">round_robin # The host selection strategy used</span>
<span class="c"># when load-balancing among hosts</span>
<span class="c"># (default: round_robin).</span>
<span class="c"># Other values are include "random"</span>
<span class="c"># or the FQCN of a custom class</span>
<span class="c"># that implements</span>
<span class="c"># LoadBalancingRpcClient$HostSelector</span>
<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
</pre></div>
</div>
</div>
</div>
<div class="section" id="embedded-agent">
<h4>Embedded agent<a class="headerlink" href="#embedded-agent" title="Permalink to this headline">¶</a></h4>
<p>Flume has an embedded agent api which allows users to embed an agent in their
application. This agent is meant to be lightweight and as such not all
sources, sinks, and channels are allowed. Specifically the source used
is a special embedded source and events should be send to the source
via the put, putAll methods on the EmbeddedAgent object. Only File Channel
and Memory Channel are allowed as channels while Avro Sink is the only
supported sink. Interceptors are also supported by the embedded agent.</p>
<p>Note: The embedded agent has a dependency on hadoop-core.jar.</p>
<p>Configuration of an Embedded Agent is similar to configuration of a
full Agent. The following is an exhaustive list of configration options:</p>
<p>Required properties are in <strong>bold</strong>.</p>
<table border="1" class="docutils">
<colgroup>
<col width="20%" />
<col width="15%" />
<col width="65%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
<th class="head">Default</th>
<th class="head">Description</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>source.type</td>
<td>embedded</td>
<td>The only available source is the embedded source.</td>
</tr>
<tr class="row-odd"><td><strong>channel.type</strong></td>
<td>–</td>
<td>Either <tt class="docutils literal"><span class="pre">memory</span></tt> or <tt class="docutils literal"><span class="pre">file</span></tt> which correspond
to MemoryChannel and FileChannel respectively.</td>
</tr>
<tr class="row-even"><td>channel.*</td>
<td>–</td>
<td>Configuration options for the channel type requested,
see MemoryChannel or FileChannel user guide for an exhaustive list.</td>
</tr>
<tr class="row-odd"><td><strong>sinks</strong></td>
<td>–</td>
<td>List of sink names</td>
</tr>
<tr class="row-even"><td><strong>sink.type</strong></td>
<td>–</td>
<td>Property name must match a name in the list of sinks.
Value must be <tt class="docutils literal"><span class="pre">avro</span></tt></td>
</tr>
<tr class="row-odd"><td>sink.*</td>
<td>–</td>
<td>Configuration options for the sink.
See AvroSink user guide for an exhaustive list,
however note AvroSink requires at least hostname and port.</td>
</tr>
<tr class="row-even"><td><strong>processor.type</strong></td>
<td>–</td>
<td>Either <tt class="docutils literal"><span class="pre">failover</span></tt> or <tt class="docutils literal"><span class="pre">load_balance</span></tt> which correspond
to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.</td>
</tr>
<tr class="row-odd"><td>processor.*</td>
<td>–</td>
<td>Configuration options for the sink processor selected.
See FailoverSinksProcessor and LoadBalancingSinkProcessor
user guide for an exhaustive list.</td>
</tr>
<tr class="row-even"><td>source.interceptors</td>
<td>–</td>
<td>Space-separated list of interceptors</td>
</tr>
<tr class="row-odd"><td>source.interceptors.*</td>
<td>–</td>
<td>Configuration options for individual interceptors
specified in the source.interceptors property</td>
</tr>
</tbody>
</table>
<p>Below is an example of how to use the agent:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">properties</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>();</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.type"</span><span class="o">,</span> <span class="s">"memory"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.capacity"</span><span class="o">,</span> <span class="s">"200"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sinks"</span><span class="o">,</span> <span class="s">"sink1 sink2"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.hostname"</span><span class="o">,</span> <span class="s">"collector1.apache.org"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.port"</span><span class="o">,</span> <span class="s">"5564"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.hostname"</span><span class="o">,</span> <span class="s">"collector2.apache.org"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.port"</span><span class="o">,</span> <span class="s">"5565"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"processor.type"</span><span class="o">,</span> <span class="s">"load_balance"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors"</span><span class="o">,</span> <span class="s">"i1"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.type"</span><span class="o">,</span> <span class="s">"static"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.key"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">);</span>
<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.value"</span><span class="o">,</span> <span class="s">"value1"</span><span class="o">);</span>
<span class="n">EmbeddedAgent</span> <span class="n">agent</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EmbeddedAgent</span><span class="o">(</span><span class="s">"myagent"</span><span class="o">);</span>
<span class="n">agent</span><span class="o">.</span><span class="na">configure</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
<span class="n">agent</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="n">List</span><span class="o"><</span><span class="n">Event</span><span class="o">></span> <span class="n">events</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">();</span>
<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="n">agent</span><span class="o">.</span><span class="na">putAll</span><span class="o">(</span><span class="n">events</span><span class="o">);</span>
<span class="o">...</span>
<span class="n">agent</span><span class="o">.</span><span class="na">stop</span><span class="o">();</span>
</pre></div>
</div>
</div>
<div class="section" id="transaction-interface">
<h4>Transaction interface<a class="headerlink" href="#transaction-interface" title="Permalink to this headline">¶</a></h4>
<p>The <tt class="docutils literal"><span class="pre">Transaction</span></tt> interface is the basis of reliability for Flume. All the
major components (ie. <tt class="docutils literal"><span class="pre">Source</span></tt>s, <tt class="docutils literal"><span class="pre">Sink</span></tt>s and <tt class="docutils literal"><span class="pre">Channel</span></tt>s) must use a
Flume <tt class="docutils literal"><span class="pre">Transaction</span></tt>.</p>
<div class="figure align-center">
<img alt="Transaction sequence diagram" src="_images/DevGuide_image01.png" />
</div>
<p>A <tt class="docutils literal"><span class="pre">Transaction</span></tt> is implemented within a <tt class="docutils literal"><span class="pre">Channel</span></tt> implementation. Each
<tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> that is connected to a <tt class="docutils literal"><span class="pre">Channel</span></tt> must obtain a
<tt class="docutils literal"><span class="pre">Transaction</span></tt> object. The <tt class="docutils literal"><span class="pre">Source</span></tt>s use a <tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt>
to manage the <tt class="docutils literal"><span class="pre">Transaction</span></tt>s, the <tt class="docutils literal"><span class="pre">Sink</span></tt>s manage them explicitly via
their configured <tt class="docutils literal"><span class="pre">Channel</span></tt>. The operation to stage an
<tt class="docutils literal"><span class="pre">Event</span></tt> (put it into a <tt class="docutils literal"><span class="pre">Channel</span></tt>) or extract an <tt class="docutils literal"><span class="pre">Event</span></tt> (take it out of a
<tt class="docutils literal"><span class="pre">Channel</span></tt>) is done inside an active <tt class="docutils literal"><span class="pre">Transaction</span></tt>. For example:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MemoryChannel</span><span class="o">();</span>
<span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
<span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
<span class="k">try</span> <span class="o">{</span>
<span class="c1">// This try clause includes whatever Channel operations you want to do</span>
<span class="n">Event</span> <span class="n">eventToStage</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="s">"Hello Flume!"</span><span class="o">,</span>
<span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
<span class="n">ch</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">eventToStage</span><span class="o">);</span>
<span class="c1">// Event takenEvent = ch.take();</span>
<span class="c1">// ...</span>
<span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
<span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
<span class="c1">// Log exception, handle individual exceptions as needed</span>
<span class="c1">// re-throw all Errors</span>
<span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
<span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
<span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="o">}</span>
</pre></div>
</div>
<p>Here we get hold of a <tt class="docutils literal"><span class="pre">Transaction</span></tt> from a <tt class="docutils literal"><span class="pre">Channel</span></tt>. After <tt class="docutils literal"><span class="pre">begin()</span></tt>
returns, the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is now active/open and the <tt class="docutils literal"><span class="pre">Event</span></tt> is then put
into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. If the put is successful, then the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is
committed and closed.</p>
</div>
<div class="section" id="sink">
<h4>Sink<a class="headerlink" href="#sink" title="Permalink to this headline">¶</a></h4>
<p>The purpose of a <tt class="docutils literal"><span class="pre">Sink</span></tt> to extract <tt class="docutils literal"><span class="pre">Event</span></tt>s from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and
forward them to the next Flume Agent in the flow or store them in an external
repository. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is associated with exactly one <tt class="docutils literal"><span class="pre">Channel</span></tt>s, as
configured in the Flume properties file. There’s one <tt class="docutils literal"><span class="pre">SinkRunner</span></tt> instance
associated with every configured <tt class="docutils literal"><span class="pre">Sink</span></tt>, and when the Flume framework calls
<tt class="docutils literal"><span class="pre">SinkRunner.start()</span></tt>, a new thread is created to drive the <tt class="docutils literal"><span class="pre">Sink</span></tt> (using
<tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> as the thread’s <tt class="docutils literal"><span class="pre">Runnable</span></tt>). This thread manages
the <tt class="docutils literal"><span class="pre">Sink</span></tt>’s lifecycle. The <tt class="docutils literal"><span class="pre">Sink</span></tt> needs to implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and
<tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are part of the <tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The
<tt class="docutils literal"><span class="pre">Sink.start()</span></tt> method should initialize the <tt class="docutils literal"><span class="pre">Sink</span></tt> and bring it to a state
where it can forward the <tt class="docutils literal"><span class="pre">Event</span></tt>s to its next destination. The
<tt class="docutils literal"><span class="pre">Sink.process()</span></tt> method should do the core processing of extracting the
<tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and forwarding it. The <tt class="docutils literal"><span class="pre">Sink.stop()</span></tt> method
should do the necessary cleanup (e.g. releasing resources). The <tt class="docutils literal"><span class="pre">Sink</span></tt>
implementation also needs to implement the <tt class="docutils literal"><span class="pre">Configurable</span></tt> interface for
processing its own configuration settings. For example:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySink</span> <span class="kd">extends</span> <span class="n">AbstractSink</span> <span class="kd">implements</span> <span class="n">Configurable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span>
<span class="c1">// Process the myProp value (e.g. validation)</span>
<span class="c1">// Store myProp for later retrieval by process() method</span>
<span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// Initialize the connection to the external repository (e.g. HDFS) that</span>
<span class="c1">// this Sink will forward Events to ..</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span>
<span class="c1">// Disconnect from the external respository and do any</span>
<span class="c1">// additional cleanup (e.g. releasing resources or nulling-out</span>
<span class="c1">// field values) ..</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span>
<span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="c1">// Start transaction</span>
<span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="n">getChannel</span><span class="o">();</span>
<span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
<span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
<span class="k">try</span> <span class="o">{</span>
<span class="c1">// This try clause includes whatever Channel operations you want to do</span>
<span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">take</span><span class="o">();</span>
<span class="c1">// Send the Event to the external repository.</span>
<span class="c1">// storeSomeData(e);</span>
<span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
<span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
<span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
<span class="c1">// Log exception, handle individual exceptions as needed</span>
<span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span>
<span class="c1">// re-throw all Errors</span>
<span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
<span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">status</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</div>
</div>
<div class="section" id="source">
<h4>Source<a class="headerlink" href="#source" title="Permalink to this headline">¶</a></h4>
<p>The purpose of a <tt class="docutils literal"><span class="pre">Source</span></tt> is to receive data from an external client and store
it into the configured <tt class="docutils literal"><span class="pre">Channel</span></tt>s. A <tt class="docutils literal"><span class="pre">Source</span></tt> can get an instance of its own
<tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt> to process an <tt class="docutils literal"><span class="pre">Event</span></tt>, commited within a <tt class="docutils literal"><span class="pre">Channel</span></tt>
local transaction, in serial. In the case of an exception, required
<tt class="docutils literal"><span class="pre">Channel</span></tt>s will propagate the exception, all <tt class="docutils literal"><span class="pre">Channel</span></tt>s will rollback their
transaction, but events processed previously on other <tt class="docutils literal"><span class="pre">Channel</span></tt>s will remain
committed.</p>
<p>Similar to the <tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt>, there’s
a <tt class="docutils literal"><span class="pre">PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt> that executes on a thread created when the
Flume framework calls <tt class="docutils literal"><span class="pre">PollableSourceRunner.start()</span></tt>. Each configured
<tt class="docutils literal"><span class="pre">PollableSource</span></tt> is associated with its own thread that runs a
<tt class="docutils literal"><span class="pre">PollingRunner</span></tt>. This thread manages the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>’s lifecycle,
such as starting and stopping. A <tt class="docutils literal"><span class="pre">PollableSource</span></tt> implementation must
implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and <tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are declared in the
<tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The runner of a <tt class="docutils literal"><span class="pre">PollableSource</span></tt> invokes that
<tt class="docutils literal"><span class="pre">Source</span></tt>‘s <tt class="docutils literal"><span class="pre">process()</span></tt> method. The <tt class="docutils literal"><span class="pre">process()</span></tt> method should check for
new data and store it into the <tt class="docutils literal"><span class="pre">Channel</span></tt> as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p>
<p>Note that there are actually two types of <tt class="docutils literal"><span class="pre">Source</span></tt>s. The <tt class="docutils literal"><span class="pre">PollableSource</span></tt>
was already mentioned. The other is the <tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>. The
<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>, unlike the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>, must have its own callback
mechanism that captures the new data and stores it into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. The
<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>s are not each driven by their own thread like the
<tt class="docutils literal"><span class="pre">PollableSource</span></tt>s are. Below is an example of a custom <tt class="docutils literal"><span class="pre">PollableSource</span></tt>:</p>
<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySource</span> <span class="kd">extends</span> <span class="n">AbstractSource</span> <span class="kd">implements</span> <span class="n">Configurable</span><span class="o">,</span> <span class="n">PollableSource</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span>
<span class="c1">// Process the myProp value (e.g. validation, convert to another type, ...)</span>
<span class="c1">// Store myProp for later retrieval by process() method</span>
<span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// Initialize the connection to the external client</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span>
<span class="c1">// Disconnect from external client and do any additional cleanup</span>
<span class="c1">// (e.g. releasing resources or nulling-out field values) ..</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span>
<span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="k">try</span> <span class="o">{</span>
<span class="c1">// This try clause includes whatever Channel/Event operations you want to do</span>
<span class="c1">// Receive new data</span>
<span class="n">Event</span> <span class="n">e</span> <span class="o">=</span> <span class="n">getSomeData</span><span class="o">();</span>
<span class="c1">// Store the Event into this Source's associated Channel(s)</span>
<span class="n">getChannelProcessor</span><span class="o">().</span><span class="na">processEvent</span><span class="o">(</span><span class="n">e</span><span class="o">);</span>
<span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Log exception, handle individual exceptions as needed</span>
<span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span>
<span class="c1">// re-throw all Errors</span>
<span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
<span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
<span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">status</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</div>
</div>
<div class="section" id="channel">
<h4>Channel<a class="headerlink" href="#channel" title="Permalink to this headline">¶</a></h4>
<p>TBD</p>
</div>
<div class="section" id="initializable">
<h4>Initializable<a class="headerlink" href="#initializable" title="Permalink to this headline">¶</a></h4>
<p>As of Flume 1.10.0 Sources, Sinks, and Channels may implement the Intitializable interface. Doing so
allows the component to have access the materialized configuration before any of the components have been
started. While this ability is quite useful when using the standard configuration, it is less useful when
configuring using Spring Boot as Spring’s autowiring generally can be used to accomplish the same thing.</p>
<p>This example shows a Sink being configured with the name of a Source. While initializing it will
retrieve the Source from the configuration and save it. During event processing a new event will be
sent to the Source, presumably after the event has be modified in some way.</p>
<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">NullInitSink</span> <span class="kd">extends</span> <span class="n">NullSink</span> <span class="kd">implements</span> <span class="n">Initializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Logger</span> <span class="n">logger</span> <span class="o">=</span> <span class="n">LoggerFactory</span><span class="o">.</span><span class="na">getLogger</span><span class="o">(</span><span class="n">NullInitSink</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">sourceName</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">EventProcessor</span> <span class="n">eventProcessor</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">long</span> <span class="n">total</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">NullInitSink</span><span class="o">()</span> <span class="o">{</span>
<span class="kd">super</span><span class="o">();</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
<span class="n">sourceName</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"targetSource"</span><span class="o">);</span>
<span class="kd">super</span><span class="o">.</span><span class="na">configure</span><span class="o">(</span><span class="n">context</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">initialize</span><span class="o">(</span><span class="n">MaterializedConfiguration</span> <span class="n">configuration</span><span class="o">)</span> <span class="o">{</span>
<span class="n">logger</span><span class="o">.</span><span class="na">debug</span><span class="o">(</span><span class="s">"Locating source for event publishing"</span><span class="o">);</span>
<span class="k">for</span> <span class="o">(</span><span class="n">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">SourceRunner</span><span class="o">></span> <span class="n">entry</span> <span class="o">:</span> <span class="n">configuration</span><span class="o">.</span><span class="na">getSourceRunners</span><span class="o">().</span><span class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">entry</span><span class="o">.</span><span class="na">getKey</span><span class="o">().</span><span class="na">equals</span><span class="o">(</span><span class="n">sourceName</span><span class="o">))</span> <span class="o">{</span>
<span class="n">Source</span> <span class="n">source</span> <span class="o">=</span> <span class="n">entry</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getSource</span><span class="o">();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">source</span> <span class="k">instanceof</span> <span class="n">EventProcessor</span><span class="o">)</span> <span class="o">{</span>
<span class="n">eventProcessor</span> <span class="o">=</span> <span class="o">(</span><span class="n">EventProcessor</span><span class="o">)</span> <span class="n">source</span><span class="o">;</span>
<span class="n">logger</span><span class="o">.</span><span class="na">debug</span><span class="o">(</span><span class="s">"Found event processor {}"</span><span class="o">,</span> <span class="n">source</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span>
<span class="k">return</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">logger</span><span class="o">.</span><span class="na">warn</span><span class="o">(</span><span class="s">"No Source named {} found for republishing events."</span><span class="o">,</span> <span class="n">sourceName</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span>
<span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span>
<span class="n">Channel</span> <span class="n">channel</span> <span class="o">=</span> <span class="n">getChannel</span><span class="o">();</span>
<span class="n">Transaction</span> <span class="n">transaction</span> <span class="o">=</span> <span class="n">channel</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
<span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
<span class="n">CounterGroup</span> <span class="n">counterGroup</span> <span class="o">=</span> <span class="n">getCounterGroup</span><span class="o">();</span>
<span class="kt">long</span> <span class="n">batchSize</span> <span class="o">=</span> <span class="n">getBatchSize</span><span class="o">();</span>
<span class="kt">long</span> <span class="n">eventCounter</span> <span class="o">=</span> <span class="n">counterGroup</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"events.success"</span><span class="o">);</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">transaction</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
<span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="n">batchSize</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
<span class="n">event</span> <span class="o">=</span> <span class="n">channel</span><span class="o">.</span><span class="na">take</span><span class="o">();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">event</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">id</span> <span class="o">=</span> <span class="n">Long</span><span class="o">.</span><span class="na">parseLong</span><span class="o">(</span><span class="k">new</span> <span class="n">String</span><span class="o">(</span><span class="n">event</span><span class="o">.</span><span class="na">getBody</span><span class="o">()));</span>
<span class="n">total</span> <span class="o">+=</span> <span class="n">id</span><span class="o">;</span>
<span class="n">event</span><span class="o">.</span><span class="na">getHeaders</span><span class="o">().</span><span class="na">put</span><span class="o">(</span><span class="s">"Total"</span><span class="o">,</span> <span class="n">Long</span><span class="o">.</span><span class="na">toString</span><span class="o">(</span><span class="n">total</span><span class="o">));</span>
<span class="n">eventProcessor</span><span class="o">.</span><span class="na">processEvent</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
<span class="n">logger</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"Null sink {} successful processed event {}"</span><span class="o">,</span> <span class="n">getName</span><span class="o">(),</span> <span class="n">id</span><span class="o">);</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span>
<span class="k">break</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">transaction</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
<span class="n">counterGroup</span><span class="o">.</span><span class="na">addAndGet</span><span class="o">(</span><span class="s">"events.success"</span><span class="o">,</span> <span class="o">(</span><span class="kt">long</span><span class="o">)</span> <span class="n">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">batchSize</span><span class="o">,</span> <span class="n">i</span><span class="o">));</span>
<span class="n">counterGroup</span><span class="o">.</span><span class="na">incrementAndGet</span><span class="o">(</span><span class="s">"transaction.success"</span><span class="o">);</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Exception</span> <span class="n">ex</span><span class="o">)</span> <span class="o">{</span>
<span class="n">transaction</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
<span class="n">counterGroup</span><span class="o">.</span><span class="na">incrementAndGet</span><span class="o">(</span><span class="s">"transaction.failed"</span><span class="o">);</span>
<span class="n">logger</span><span class="o">.</span><span class="na">error</span><span class="o">(</span><span class="s">"Failed to deliver event. Exception follows."</span><span class="o">,</span> <span class="n">ex</span><span class="o">);</span>
<span class="k">throw</span> <span class="k">new</span> <span class="nf">EventDeliveryException</span><span class="o">(</span><span class="s">"Failed to deliver event: "</span> <span class="o">+</span> <span class="n">event</span><span class="o">,</span> <span class="n">ex</span><span class="o">);</span>
<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
<span class="n">transaction</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">status</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<div class="sphinxsidebar">
<div class="sphinxsidebarwrapper"><h3><a href="index.html">Apache Flume</a></h3>
<ul>
<li class="toctree-l1"><a class="reference internal" href="getinvolved.html">How to Get Involved</a></li>
<li class="toctree-l1"><a class="reference internal" href="download.html">Download</a></li>
<li class="toctree-l1"><a class="reference internal" href="security.html">Apache Flume Security Vulnerabilities</a></li>
<li class="toctree-l1"><a class="reference internal" href="documentation.html">Documentation</a></li>
<li class="toctree-l1"><a class="reference internal" href="releases/index.html">Releases</a></li>
<li class="toctree-l1"><a class="reference internal" href="mailinglists.html">Mailing lists</a></li>
<li class="toctree-l1"><a class="reference internal" href="team.html">Team</a></li>
<li class="toctree-l1"><a class="reference internal" href="source.html">Source Repository</a></li>
<li class="toctree-l1"><a class="reference internal" href="testing.html">Testing</a></li>
<li class="toctree-l1"><a class="reference internal" href="license.html">Apache License</a></li>
<li class="toctree-l1"><a class="reference internal" href="subprojects.html">Sub Projects</a></li>
</ul>
<h3>Resources</h3>
<ul class="this-page-menu">
<li><a href="https://issues.apache.org/jira/browse/FLUME">Flume Issue Tracking (Jira)</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLUME">Flume Wiki</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLUME/Getting+Started">Getting Started Guide</a></li>
</ul>
<h3>Apache</h3>
<ul class="this-page-menu">
<li><a href="https://www.apache.org">Home</a></li>
<li><a href="https://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
<li><a href="https://www.apache.org/licenses">Licenses</a> </li>
<li><a href="https://www.apache.org/foundation/thanks.html">Thanks</a></li>
<li><a href="https://www.apachecon.com">Conferences</a></li>
<li><a href="https://www.apache.org/security/">Security</a></li>
<li><a href="https://privacy.apache.org/policies/privacy-policy-public.html">Data Privacy</a></li>
</ul>
<h3><a href="index.html">This Page</a></h3>
<ul>
<li><a class="reference internal" href="#">Flume 1.11.0 Developer Guide</a><ul>
<li><a class="reference internal" href="#introduction">Introduction</a><ul>
<li><a class="reference internal" href="#overview">Overview</a></li>
<li><a class="reference internal" href="#architecture">Architecture</a><ul>
<li><a class="reference internal" href="#data-flow-model">Data flow model</a></li>
<li><a class="reference internal" href="#reliability">Reliability</a></li>
</ul>
</li>
<li><a class="reference internal" href="#building-flume">Building Flume</a><ul>
<li><a class="reference internal" href="#getting-the-source">Getting the source</a></li>
<li><a class="reference internal" href="#compile-test-flume">Compile/test Flume</a></li>
<li><a class="reference internal" href="#updating-protocol-buffer-version">Updating Protocol Buffer Version</a></li>
</ul>
</li>
<li><a class="reference internal" href="#developing-custom-components">Developing custom components</a><ul>
<li><a class="reference internal" href="#client">Client</a><ul>
<li><a class="reference internal" href="#client-sdk">Client SDK</a></li>
<li><a class="reference internal" href="#rpc-client-interface">RPC client interface</a></li>
<li><a class="reference internal" href="#rpc-clients-avro-and-thrift">RPC clients - Avro and Thrift</a></li>
<li><a class="reference internal" href="#secure-rpc-client-thrift">Secure RPC client - Thrift</a></li>
<li><a class="reference internal" href="#failover-client">Failover Client</a></li>
<li><a class="reference internal" href="#loadbalancing-rpc-client">LoadBalancing RPC client</a></li>
</ul>
</li>
<li><a class="reference internal" href="#embedded-agent">Embedded agent</a></li>
<li><a class="reference internal" href="#transaction-interface">Transaction interface</a></li>
<li><a class="reference internal" href="#sink">Sink</a></li>
<li><a class="reference internal" href="#source">Source</a></li>
<li><a class="reference internal" href="#channel">Channel</a></li>
<li><a class="reference internal" href="#initializable">Initializable</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
</div>
<div class="clearer"></div>
</div>
<div class="footer">
© Copyright 2009-2023 The Apache Software Foundation. Apache Flume, Flume, Apache, the Apache feather logo, and the Apache Flume project logo are trademarks of The Apache Software Foundation..
</div>
</body>
</html>