in src/main/java/com/aliyun/openservices/paifeaturestore/domain/Model.java [137:209]
public FeatureResult getOnlineFeatures(Map<String, List<String>> joinIds) throws Exception {
int size = -1;
for (String joinId : this.featureEntityJoinIdList) {
if (!joinIds.containsKey(joinId)) {
throw new RuntimeException(String.format("join id:%s not found", joinId));
}
if (-1 == size) {
size = joinIds.get(joinId).size();
} else {
if (size != joinIds.get(joinId).size()) {
throw new RuntimeException(String.format("join id:%s length not equal", joinId));
}
}
}
// thread safe map
List<String> featureFields = new CopyOnWriteArrayList<>();
Map<String, FSType> featureFieldTypeMap = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
Map<Integer, Map<String, Object>> indexFeatrueMap = new ConcurrentHashMap<>();
for (Map.Entry<String, List<String>> entry : joinIds.entrySet()) {
int finalSize = size;
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
try {
FeatureResult featureResult = new FeatureStoreResult();
try {
featureResult = getOnlineFeaturesWithEntity(joinIds, this.entityJoinIdToFeatureEntityMap.get(entry.getKey()).featureEntity.getFeatureEntityName());
} catch (Exception e) {
logger.error("featureview get online features error", e);
}
featureFields.addAll(Arrays.asList(featureResult.getFeatureFields()));
if (featureResult.getFeatureFieldTypeMap()!=null) {
featureFieldTypeMap.putAll(featureResult.getFeatureFieldTypeMap());
}
for (int i = 0; i < finalSize; i++) {
String joinIdValue = entry.getValue().get(i);
for (Map<String, Object> featureData : featureResult.getFeatureData()) {
if (featureData != null) {
if ( joinIdValue.equals(String.valueOf(featureData.get(entry.getKey())))) {
indexFeatrueMap.computeIfAbsent(i, k -> new ConcurrentHashMap<>()).putAll(featureData);
break;
}
}
}
}
} catch (Exception e) {
logger.error("featureview get online features error", e);
}
}, executorService);
futures.add(future);
}
FeatureStoreResult featureStoreResult = new FeatureStoreResult();
List<Map<String, Object>> featureDataList = new ArrayList<>(size);
// wait all featureview get features
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (int i = 0; i < size; i++) {
if (indexFeatrueMap.get(i)==null) {
featureDataList.add(new HashMap<>());
} else {
featureDataList.add(indexFeatrueMap.get(i));
}
}
featureStoreResult.setFeatureFields(featureFields.toArray(new String[0]));
featureStoreResult.setFeatureDataList(featureDataList);
featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
return featureStoreResult;
}