public void start()

in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [135:190]


  public void start() {
    Preconditions.checkArgument(table == null, "Please call stop " +
        "before calling start on an old instance.");
    try {
      privilegedExecutor =
          FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
    } catch (Exception ex) {
      sinkCounter.incrementConnectionFailedCount();
      throw new FlumeException("Failed to login to HBase using "
          + "provided credentials.", ex);
    }
    try {
      conn = privilegedExecutor.execute((PrivilegedExceptionAction<Connection>) () -> {
        conn = ConnectionFactory.createConnection(config);
        return conn;
      });
      // Flush is controlled by us. This ensures that HBase changing
      // their criteria for flushing does not change how we flush.
      table = conn.getBufferedMutator(TableName.valueOf(tableName));

    } catch (Exception e) {
      sinkCounter.incrementConnectionFailedCount();
      logger.error("Could not load table, " + tableName +
          " from HBase", e);
      throw new FlumeException("Could not load table, " + tableName +
          " from HBase", e);
    }
    try {
      if (!privilegedExecutor.execute((PrivilegedExceptionAction<Boolean>) () -> {
        Table t = null;
        try {
          t = conn.getTable(TableName.valueOf(tableName));
          return t.getTableDescriptor().hasFamily(columnFamily);
        } finally {
          if (t != null) {
            t.close();
          }
        }
      })) {
        throw new IOException("Table " + tableName
            + " has no such column family " + Bytes.toString(columnFamily));
      }
    } catch (Exception e) {
      //Get getTableDescriptor also throws IOException, so catch the IOException
      //thrown above or by the getTableDescriptor() call.
      sinkCounter.incrementConnectionFailedCount();
      throw new FlumeException("Error getting column family from HBase."
          + "Please verify that the table " + tableName + " and Column Family, "
          + Bytes.toString(columnFamily) + " exists in HBase, and the"
          + " current user has permissions to access that table.", e);
    }

    super.start();
    sinkCounter.incrementConnectionCreatedCount();
    sinkCounter.start();
  }