in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [135:190]
public void start() {
Preconditions.checkArgument(table == null, "Please call stop " +
"before calling start on an old instance.");
try {
privilegedExecutor =
FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
+ "provided credentials.", ex);
}
try {
conn = privilegedExecutor.execute((PrivilegedExceptionAction<Connection>) () -> {
conn = ConnectionFactory.createConnection(config);
return conn;
});
// Flush is controlled by us. This ensures that HBase changing
// their criteria for flushing does not change how we flush.
table = conn.getBufferedMutator(TableName.valueOf(tableName));
} catch (Exception e) {
sinkCounter.incrementConnectionFailedCount();
logger.error("Could not load table, " + tableName +
" from HBase", e);
throw new FlumeException("Could not load table, " + tableName +
" from HBase", e);
}
try {
if (!privilegedExecutor.execute((PrivilegedExceptionAction<Boolean>) () -> {
Table t = null;
try {
t = conn.getTable(TableName.valueOf(tableName));
return t.getTableDescriptor().hasFamily(columnFamily);
} finally {
if (t != null) {
t.close();
}
}
})) {
throw new IOException("Table " + tableName
+ " has no such column family " + Bytes.toString(columnFamily));
}
} catch (Exception e) {
//Get getTableDescriptor also throws IOException, so catch the IOException
//thrown above or by the getTableDescriptor() call.
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Error getting column family from HBase."
+ "Please verify that the table " + tableName + " and Column Family, "
+ Bytes.toString(columnFamily) + " exists in HBase, and the"
+ " current user has permissions to access that table.", e);
}
super.start();
sinkCounter.incrementConnectionCreatedCount();
sinkCounter.start();
}