in flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java [98:148]
private void handle(GetAction action) {
if (action.getGetList().size() <= 0) {
return;
}
LookupSchema schema = action.getGetList().get(0).getRecord().getSchema();
List<Get> recordList = action.getGetList();
List<Get> deduplicateList = deduplicateRecords(recordList);
LOG.debug(
"record size {}, after deduplicate size {}",
recordList.size(),
deduplicateList.size());
StringBuilder sb = new StringBuilder();
sb.append("/* ApplicationName=Flink Lookup Query */ ");
boolean first;
for (int i = 0; i < deduplicateList.size(); i++) {
if (i > 0) {
sb.append(" union all ");
}
first = true;
appendSelect(sb, schema);
sb.append(" where ( ");
for (String condition : schema.getConditionFields()) {
if (!first) {
sb.append(" and ");
}
first = false;
sb.append(quoteIdentifier(condition)).append("=?");
}
sb.append(" ) ");
}
String sql = sb.toString();
try {
Map<RecordKey, List<Record>> resultRecordMap =
executeQuery(sql, deduplicateList, schema);
for (Get get : recordList) {
Record record = get.getRecord();
if (get.getFuture() != null) {
RecordKey key = new RecordKey(record);
List<Record> result = resultRecordMap.get(key);
get.getFuture().complete(result);
}
}
} catch (Exception e) {
for (Get get : recordList) {
if (get.getFuture() != null && !get.getFuture().isDone()) {
get.getFuture().completeExceptionally(e);
}
}
}
}