in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [147:184]
public void eval(Object... keys) {
if (1 == verbose) {
StringBuffer sb = new StringBuffer();
for (Object key : keys) {
sb.append(key);
sb.append(",");
}
LOG.info("start to loop up from adbpg, keys:" + sb.toString());
}
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
if (1 == verbose) {
LOG.info("fetched from cache");
}
return;
}
}
try {
ArrayList<RowData> rows = new ArrayList<>();
retryExecuteQuery(keyRow, rows);
for (RowData row : rows) {
collect(row);
}
if (cache != null) {
cache.put(keyRow, rows);
}
} catch (Exception e) {
e.printStackTrace();
LOG.info("error fetch from adbpg", e);
throw new RuntimeException("cannot fetch from adbpg source", e);
}
}