in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java [197:222]
public DynamicTableSource createDynamicTableSource(Context context) {
ReadableConfig config = getReadableConfig(context);
String masterAddresses = config.get(KUDU_MASTERS);
int scanRowSize = config.get(KUDU_SCAN_ROW_SIZE);
long kuduCacheMaxRows = config.get(KUDU_LOOKUP_CACHE_MAX_ROWS);
long kuduCacheTtl = config.get(KUDU_LOOKUP_CACHE_TTL);
int kuduMaxReties = config.get(KUDU_LOOKUP_MAX_RETRIES);
// build kudu lookup options
KuduLookupOptions kuduLookupOptions = KuduLookupOptions.Builder.options().withCacheMaxSize(kuduCacheMaxRows)
.withCacheExpireMs(kuduCacheTtl)
.withMaxRetryTimes(kuduMaxReties)
.build();
TableSchema schema = context.getCatalogTable().getSchema();
TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(config.get(KUDU_TABLE), schema,
context.getCatalogTable().toProperties());
KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
.setMasters(masterAddresses)
.setRowLimit(scanRowSize);
return new KuduDynamicTableSource(configBuilder, tableInfo, physicalSchema, physicalSchema.getFieldNames(),
kuduLookupOptions);
}