public void open()

in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java [108:156]


    public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
        try {
            this.mutationConverter.open();
            this.numPendingRequests = new AtomicLong(0);

            if (null == connection) {
                this.connection = ConnectionFactory.createConnection(config);
            }
            // create a parameter instance, set the table name and custom listener reference.
            BufferedMutatorParams params =
                    new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
            if (bufferFlushMaxSizeInBytes > 0) {
                params.writeBufferSize(bufferFlushMaxSizeInBytes);
            }
            this.mutator = connection.getBufferedMutator(params);

            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
                this.executor =
                        Executors.newScheduledThreadPool(
                                1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture =
                        this.executor.scheduleWithFixedDelay(
                                () -> {
                                    if (closed) {
                                        return;
                                    }
                                    try {
                                        flush();
                                    } catch (Exception e) {
                                        // fail the sink and skip the rest of the items
                                        // if the failure handler decides to throw an exception
                                        failureThrowable.compareAndSet(null, e);
                                    }
                                },
                                bufferFlushIntervalMillis,
                                bufferFlushIntervalMillis,
                                TimeUnit.MILLISECONDS);
            }
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + hTableName + " not found ", tnfe);
            throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
        } catch (IOException ioe) {
            LOG.error("Exception while creating connection to HBase.", ioe);
            throw new RuntimeException("Cannot create connection to HBase.", ioe);
        }
        LOG.info("end open.");
    }