in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [184:231]
public synchronized void configure(Context context) {
masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
Preconditions.checkNotNull(masterAddresses,
"Missing master addresses. Please specify property '%s'.",
KuduSinkConfigurationConstants.MASTER_ADDRESSES);
tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
Preconditions.checkNotNull(tableName,
"Missing table name. Please specify property '%s'",
KuduSinkConfigurationConstants.TABLE_NAME);
batchSize = context.getInteger(KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS,
DEFAULT_TIMEOUT_MILLIS);
ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
DEFAULT_IGNORE_DUPLICATE_ROWS);
String operationProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
String kerberosPrincipal =
context.getString(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL);
String kerberosKeytab = context.getString(KuduSinkConfigurationConstants.KERBEROS_KEYTAB);
String proxyUser = context.getString(KuduSinkConfigurationConstants.PROXY_USER);
privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
// Check for operations producer, if null set default operations producer type.
if (operationProducerType == null || operationProducerType.isEmpty()) {
operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
logger.warn("No Kudu operations producer provided, using default");
}
Context producerContext = new Context();
producerContext.putAll(context.getSubProperties(
KuduSinkConfigurationConstants.PRODUCER_PREFIX));
try {
Class<? extends KuduOperationsProducer> clazz =
(Class<? extends KuduOperationsProducer>)
Class.forName(operationProducerType);
operationsProducer = clazz.getDeclaredConstructor().newInstance();
operationsProducer.configure(producerContext);
} catch (ClassNotFoundException | NoSuchMethodException |
InstantiationException | IllegalAccessException | InvocationTargetException e) {
logger.error("Could not instantiate Kudu operations producer" , e);
throw new RuntimeException(e);
}
sinkCounter = new SinkCounter(this.getName());
}