public synchronized void start()

in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [117:152]


  public synchronized void start() {
    Preconditions.checkState(table == null && session == null,
        "Please call stop before calling start on an old instance.");

    // Client is not null only inside tests.
    if (client == null) {
      // Creating client with FlumeAuthenticator.
      client = privilegedExecutor.execute(
        new PrivilegedAction<KuduClient>() {
          @Override
          public KuduClient run() {
            return new KuduClient.KuduClientBuilder(masterAddresses).build();
          }
        }
      );
    }
    session = client.newSession();
    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
    session.setTimeoutMillis(timeoutMillis);
    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
    session.setMutationBufferSpace(batchSize);

    try {
      table = client.openTable(tableName);
    } catch (Exception ex) {
      sinkCounter.incrementConnectionFailedCount();
      String msg = String.format("Could not open Kudu table '%s'", tableName);
      logger.error(msg, ex);
      throw new FlumeException(msg, ex);
    }
    operationsProducer.initialize(table);

    super.start();
    sinkCounter.incrementConnectionCreatedCount();
    sinkCounter.start();
  }