public FeatureResult getOnlineFeaturesWithEntity()

in src/main/java/com/aliyun/openservices/paifeaturestore/domain/Model.java [211:277]


    public FeatureResult getOnlineFeaturesWithEntity(Map<String, List<String>> joinIds, String featureEntityName) throws Exception {
        FeatureEntity featureEntity = this.featureEntityMap.get(featureEntityName);
        if (featureEntity == null) {
            throw new RuntimeException(String.format("feature entity name:%s not found", featureEntityName));
        }

        String entityJoinId = featureEntity.getFeatureEntity().getFeatureEntityJoinid();
        if (!joinIds.containsKey(entityJoinId)) {
            throw new RuntimeException(String.format("join id:%s not found", entityJoinId));
        }

        Map<String, IFeatureView> featureViewMap = this.featureEntityJoinIdMap.get(entityJoinId);

        String[] joinIdsArray = joinIds.get(entityJoinId).toArray(new String[0]);
        FeatureStoreResult featureStoreResult = new FeatureStoreResult();
        List<String> featureFields = new CopyOnWriteArrayList<>();
        Map<String, FSType> featureFieldTypeMap = new ConcurrentHashMap<>();
        Map<String, Map<String, Object>> joinIdFeaturMap = new ConcurrentHashMap<>();
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (IFeatureView featureView : featureViewMap.values()) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
                try {
                    FeatureResult featureResult =   featureView.getOnlineFeatures(joinIdsArray,
                            this.featureNamesMap.get(featureView.getFeatureView().getName()).toArray(new String[0]), this.aliasNamesMap.get(featureView.getFeatureView().getName()));

                    if (featureResult.getFeatureData()!=null) {
                        featureFields.addAll(Arrays.asList(featureResult.getFeatureFields()));
                        if (featureResult.getFeatureFieldTypeMap()!=null) {
                            featureFieldTypeMap.putAll(featureResult.getFeatureFieldTypeMap());
                        }
                        for (Map<String, Object> featureData : featureResult.getFeatureData()) {
                            if (featureData != null) {
                                String joinIdValue = String.valueOf(featureData.get(entityJoinId));
                                // 过滤掉值为 null 的条目
                                Map<String, Object> filteredData = featureData.entrySet().stream()
                                        .filter(entry -> entry.getValue() != null)
                                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                                joinIdFeaturMap.computeIfAbsent(joinIdValue, k -> new ConcurrentHashMap<>()).putAll(filteredData);
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error("get feature view features error", e);
                }
            }, executorService );
            futures.add(future);
        }

        List<Map<String, Object>> featureDataList = new ArrayList<>();
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        for (String joinIdValue : joinIdsArray) {
            if (joinIdFeaturMap.containsKey(joinIdValue)) {
                featureDataList.add(joinIdFeaturMap.get(joinIdValue));
            } else {
                Map<String, Object> featureMap = new HashMap<>();
                featureMap.put(entityJoinId, joinIdValue);
                featureDataList.add(featureMap);
            }
        }

        featureStoreResult.setFeatureFields(featureFields.toArray(new String[0]));
        featureStoreResult.setFeatureDataList(featureDataList);
        featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);

        return featureStoreResult;
    }