in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java [113:168]
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
if (null == connection) {
this.connection = ConnectionFactory.createConnection(config);
}
TableName tableName = TableName.valueOf(hTableName);
if (!connection.getAdmin().tableExists(tableName)) {
throw new TableNotFoundException(tableName);
}
// create a parameter instance, set the table name and custom listener reference.
BufferedMutatorParams params = new BufferedMutatorParams(tableName).listener(this);
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}
this.mutator =
new DeduplicatedMutator(
(int) bufferFlushMaxMutations, connection.getBufferedMutator(params));
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture =
this.executor.scheduleWithFixedDelay(
() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
},
bufferFlushIntervalMillis,
bufferFlushIntervalMillis,
TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}