public synchronized void configure()

in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [184:231]


  public synchronized void configure(Context context) {
    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
    Preconditions.checkNotNull(masterAddresses,
        "Missing master addresses. Please specify property '%s'.",
        KuduSinkConfigurationConstants.MASTER_ADDRESSES);

    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
    Preconditions.checkNotNull(tableName,
        "Missing table name. Please specify property '%s'",
        KuduSinkConfigurationConstants.TABLE_NAME);

    batchSize = context.getInteger(KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
    timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS,
                                    DEFAULT_TIMEOUT_MILLIS);
    ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
                                             DEFAULT_IGNORE_DUPLICATE_ROWS);
    String operationProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
    String kerberosPrincipal =
        context.getString(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL);
    String kerberosKeytab = context.getString(KuduSinkConfigurationConstants.KERBEROS_KEYTAB);
    String proxyUser = context.getString(KuduSinkConfigurationConstants.PROXY_USER);

    privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
        kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);

    // Check for operations producer, if null set default operations producer type.
    if (operationProducerType == null || operationProducerType.isEmpty()) {
      operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
      logger.warn("No Kudu operations producer provided, using default");
    }

    Context producerContext = new Context();
    producerContext.putAll(context.getSubProperties(
        KuduSinkConfigurationConstants.PRODUCER_PREFIX));

    try {
      Class<? extends KuduOperationsProducer> clazz =
          (Class<? extends KuduOperationsProducer>)
          Class.forName(operationProducerType);
      operationsProducer = clazz.getDeclaredConstructor().newInstance();
      operationsProducer.configure(producerContext);
    } catch (ClassNotFoundException | NoSuchMethodException |
        InstantiationException | IllegalAccessException | InvocationTargetException e) {
      logger.error("Could not instantiate Kudu operations producer" , e);
      throw new RuntimeException(e);
    }
    sinkCounter = new SinkCounter(this.getName());
  }