in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java [127:155]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
DataType physicalRowDataType = physicalSchema.toRowDataType();
String[] keyNames = new String[context.getKeys().length];
int[] keyIndexs = new int[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
keyIndexs[i] = innerKeyArr[0];
}
if (lookupOptions.isAsync()) {
return AsyncTableFunctionProvider.of(
new DorisRowDataAsyncLookupFunction(
options,
lookupOptions,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
keyIndexs));
} else {
return TableFunctionProvider.of(
new DorisRowDataJdbcLookupFunction(
options,
lookupOptions,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
keyIndexs));
}
}