in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [117:152]
public synchronized void start() {
Preconditions.checkState(table == null && session == null,
"Please call stop before calling start on an old instance.");
// Client is not null only inside tests.
if (client == null) {
// Creating client with FlumeAuthenticator.
client = privilegedExecutor.execute(
new PrivilegedAction<KuduClient>() {
@Override
public KuduClient run() {
return new KuduClient.KuduClientBuilder(masterAddresses).build();
}
}
);
}
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.setTimeoutMillis(timeoutMillis);
session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
session.setMutationBufferSpace(batchSize);
try {
table = client.openTable(tableName);
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
String msg = String.format("Could not open Kudu table '%s'", tableName);
logger.error(msg, ex);
throw new FlumeException(msg, ex);
}
operationsProducer.initialize(table);
super.start();
sinkCounter.incrementConnectionCreatedCount();
sinkCounter.start();
}