in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [211:312]
public void open(Configuration parameters) throws Exception {
super.open(parameters);
synchronized (Adb4PgTableSink.class) {
dataSourceKey = url + userName + password + tableName;
if (dataSourcePool.contains(dataSourceKey)) {
dataSource = dataSourcePool.get(dataSourceKey);
} else {
dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
dataSource.setDriverClassName(driverClassName);
dataSource.setMaxActive(connectionMaxActive);
dataSource.setInitialSize(connectionInitialSize);
dataSource.setMaxWait(maxWait);
dataSource.setMinIdle(connectionMinIdle);
dataSource.setTimeBetweenEvictionRunsMillis(2000);
dataSource.setMinEvictableIdleTimeMillis(600000);
dataSource.setMaxEvictableIdleTimeMillis(900000);
dataSource.setValidationQuery("select 1");
dataSource.setTestOnBorrow(false);
dataSource.setTestWhileIdle(connectionTestWhileIdle);
dataSource.setRemoveAbandoned(true);
dataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
try {
dataSource.init();
} catch (SQLException e) {
LOG.error("Init DataSource Or Get Connection Error!", e);
throw new RuntimeException("cannot get connection for url: " + url +", userName: " + userName +", password: " + password, e);
}
dataSourcePool.put(dataSourceKey, dataSource);
}
if (primaryKeys == null || primaryKeys.isEmpty()) {
existsPrimaryKeys = false;
if (2 == this.writeMode) {
throw new RuntimeException("primary key cannot be empty when setting write mode to 2:upsert.");
}
}
else {
existsPrimaryKeys = true;
Joiner joinerOnComma = Joiner.on(",").useForNull("null");
String[] primaryFieldNamesStr = new String[primaryKeys.size()];
String[] nonPrimaryFieldNamesStr = new String[schema.getLength() - primaryKeys.size()];
String[] primaryFieldNamesStrCaseSensitive = new String[primaryKeys.size()];
String[] nonPrimaryFieldNamesStrCaseSensitive = new String[schema.getLength() - primaryKeys.size()];
String[] excludedNonPrimaryFieldNamesStr = new String[schema.getLength() - primaryKeys.size()];
String[] excludedNonPrimaryFieldNamesStrCaseSensitive = new String[schema.getLength() - primaryKeys.size()];
int primaryIndex = 0;
int excludedIndex = 0;
for (int i = 0; i < schema.getLength(); i++) {
String fileName = schema.getFieldNames().get(i);
if (primaryKeys.contains(fileName)) {
primaryFieldNamesStr[primaryIndex] = fileName;
primaryFieldNamesStrCaseSensitive[primaryIndex++] = "\"" + fileName + "\"";
}
else {
nonPrimaryFieldNamesStr[excludedIndex] = fileName;
nonPrimaryFieldNamesStrCaseSensitive[excludedIndex] = "\"" + fileName + "\"";
excludedNonPrimaryFieldNamesStr[excludedIndex] = "excluded." + fileName;
excludedNonPrimaryFieldNamesStrCaseSensitive[excludedIndex++] = "excluded.\"" + fileName + "\"";
}
}
this.primaryFieldNames = joinerOnComma.join(primaryFieldNamesStr);
this.nonPrimaryFieldNames = joinerOnComma.join(nonPrimaryFieldNamesStr);
this.primaryFieldNamesCaseSensitive = joinerOnComma.join(primaryFieldNamesStrCaseSensitive);
this.nonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(nonPrimaryFieldNamesStrCaseSensitive);
this.excludedNonPrimaryFieldNames = joinerOnComma.join(excludedNonPrimaryFieldNamesStr);
this.excludedNonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(excludedNonPrimaryFieldNamesStrCaseSensitive);
}
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("adbpg-flusher-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if(System.currentTimeMillis() - lastWriteTime >= batchWriteTimeout){
sync();
}
} catch (Exception e) {
LOG.error("flush buffer to ADBPG failed", e);
}
}
}, batchWriteTimeout, batchWriteTimeout, TimeUnit.MILLISECONDS);
}
LOG.info("connector created using url=" + url + ", " +
"tableName=" + tableName + ", " +
"userName=" + userName + ", " +
"password=" + password + ", " +
"maxRetries=" + maxRetryTime + ", " +
"batchSize=" + batchSize + ", " +
"connectionMaxActive=" + connectionMaxActive + ", " +
"batchWriteTimeoutMs=" + batchWriteTimeout + ", " +
"conflictMode=" + conflictMode + ", " +
"timeZone=" + timeZone + ", " +
"useCopy=" + useCopy + ", " +
"targetSchema=" + targetSchema + ", " +
"exceptionMode=" + exceptionMode + ", " +
"reserveMs=" + reserveMs + ", " +
"caseSensitive=" + caseSensitive +", " +
"writeMode=" + writeMode);
}