in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [331:365]
public void open(int taskNumber, int numTasks) throws IOException {
dataSource = AdbpgOptions.buildDataSourceFromOptions(config);
try {
dataSource.init();
executeSql("set optimizer to off");
rawConn = (DruidPooledConnection) connection;
baseConn = (BaseConnection) (rawConn.getConnection());
copyManager = new CopyManager(baseConn);
} catch (SQLException e) {
LOG.error("Init DataSource Or Get Connection Error!", e);
throw new IOException("cannot get connection for url: " + url + ", userName: " + userName + ", password: " + password, e);
}
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("adbpg-flusher-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (System.currentTimeMillis() - lastWriteTime >= batchWriteTimeout) {
sync();
}
} catch (Exception e) {
LOG.error("flush buffer to ADBPG failed", e);
}
}
}, batchWriteTimeout, batchWriteTimeout, TimeUnit.MILLISECONDS);
outRps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), CONNECTOR_TYPE);
latencyGauge = MetricUtils.registerCurrentSendTime(getRuntimeContext());
sinkSkipCounter = MetricUtils.registerNumRecordsOutErrors(getRuntimeContext());
deleteCounter = MetricUtils.registerSinkDeleteCounter(getRuntimeContext());
}