in src/main/java/com/aliyun/openservices/paifeaturestore/domain/Model.java [211:277]
public FeatureResult getOnlineFeaturesWithEntity(Map<String, List<String>> joinIds, String featureEntityName) throws Exception {
FeatureEntity featureEntity = this.featureEntityMap.get(featureEntityName);
if (featureEntity == null) {
throw new RuntimeException(String.format("feature entity name:%s not found", featureEntityName));
}
String entityJoinId = featureEntity.getFeatureEntity().getFeatureEntityJoinid();
if (!joinIds.containsKey(entityJoinId)) {
throw new RuntimeException(String.format("join id:%s not found", entityJoinId));
}
Map<String, IFeatureView> featureViewMap = this.featureEntityJoinIdMap.get(entityJoinId);
String[] joinIdsArray = joinIds.get(entityJoinId).toArray(new String[0]);
FeatureStoreResult featureStoreResult = new FeatureStoreResult();
List<String> featureFields = new CopyOnWriteArrayList<>();
Map<String, FSType> featureFieldTypeMap = new ConcurrentHashMap<>();
Map<String, Map<String, Object>> joinIdFeaturMap = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (IFeatureView featureView : featureViewMap.values()) {
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
try {
FeatureResult featureResult = featureView.getOnlineFeatures(joinIdsArray,
this.featureNamesMap.get(featureView.getFeatureView().getName()).toArray(new String[0]), this.aliasNamesMap.get(featureView.getFeatureView().getName()));
if (featureResult.getFeatureData()!=null) {
featureFields.addAll(Arrays.asList(featureResult.getFeatureFields()));
if (featureResult.getFeatureFieldTypeMap()!=null) {
featureFieldTypeMap.putAll(featureResult.getFeatureFieldTypeMap());
}
for (Map<String, Object> featureData : featureResult.getFeatureData()) {
if (featureData != null) {
String joinIdValue = String.valueOf(featureData.get(entityJoinId));
// 过滤掉值为 null 的条目
Map<String, Object> filteredData = featureData.entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
joinIdFeaturMap.computeIfAbsent(joinIdValue, k -> new ConcurrentHashMap<>()).putAll(filteredData);
}
}
}
} catch (Exception e) {
logger.error("get feature view features error", e);
}
}, executorService );
futures.add(future);
}
List<Map<String, Object>> featureDataList = new ArrayList<>();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (String joinIdValue : joinIdsArray) {
if (joinIdFeaturMap.containsKey(joinIdValue)) {
featureDataList.add(joinIdFeaturMap.get(joinIdValue));
} else {
Map<String, Object> featureMap = new HashMap<>();
featureMap.put(entityJoinId, joinIdValue);
featureDataList.add(featureMap);
}
}
featureStoreResult.setFeatureFields(featureFields.toArray(new String[0]));
featureStoreResult.setFeatureDataList(featureDataList);
featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
return featureStoreResult;
}