in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [199:219]
public void open(FunctionContext context) throws Exception {
this.cache = cacheStrategy.equals("none") ? null : CacheBuilder.newBuilder().expireAfterWrite(cacheTTLMs, TimeUnit.MILLISECONDS).maximumSize(cacheSize).build();
if (1 == this.usePool) {
this.dataSource = AdbpgOptions.buildDataSourceFromOptions(config);
try {
this.dataSource.init();
} catch (SQLException e) {
LOG.error("Init DataSource Or Get Connection Error!", e);
throw new IOException("cannot get connection for url: " + this.url + ", userName: " + this.username
+ ", password: " + this.password, e);
}
}
LOG.info("source connector created with "
+ "filedNum=" + fieldNum + ", "
+ "fieldNamesStr=" + Arrays.asList(fieldNamesStr).toString() + ", "
+ "lts=" + Arrays.asList(lts).toString() + ", "
+ "keyNames=" + Arrays.asList(keyNames).toString() + ", "
+ "keyTypes=" + Arrays.asList(keyTypes) + ", "
+ "cacheSize=" + cacheSize + ", "
+ "cacheTTLMs=" + cacheTTLMs);
}