in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java [159:174]
public void open(FunctionContext context) {
try {
super.open(context);
this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
// build kudu cache
this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
Arrays.asList(projectedFields) : null);
this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(this.cacheMaxSize)
.build();
} catch (Exception ioe) {
LOG.error("Exception while creating connection to Kudu.", ioe);
throw new RuntimeException("Cannot create connection to Kudu.", ioe);
}
}