public FeatureResult getOnlineFeatures()

in src/main/java/com/aliyun/openservices/paifeaturestore/domain/Model.java [137:209]


    public FeatureResult getOnlineFeatures(Map<String, List<String>> joinIds) throws Exception {
        int size = -1;
        for (String joinId : this.featureEntityJoinIdList) {
            if (!joinIds.containsKey(joinId)) {
                throw new RuntimeException(String.format("join id:%s not found", joinId));
            }
            if (-1 == size) {
                size = joinIds.get(joinId).size();
            } else {
                 if (size != joinIds.get(joinId).size()) {
                    throw new RuntimeException(String.format("join id:%s length not equal", joinId));
                }
            }

        }

        // thread safe map
        List<String> featureFields = new CopyOnWriteArrayList<>();
        Map<String, FSType> featureFieldTypeMap = new ConcurrentHashMap<>();
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        Map<Integer, Map<String, Object>> indexFeatrueMap = new ConcurrentHashMap<>();
        for (Map.Entry<String, List<String>> entry : joinIds.entrySet()) {
            int finalSize = size;
            CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
                try {
                    FeatureResult featureResult = new FeatureStoreResult();
                    try {
                        featureResult = getOnlineFeaturesWithEntity(joinIds, this.entityJoinIdToFeatureEntityMap.get(entry.getKey()).featureEntity.getFeatureEntityName());
                    } catch (Exception e) {
                        logger.error("featureview get online features error", e);
                    }

                    featureFields.addAll(Arrays.asList(featureResult.getFeatureFields()));
                    if (featureResult.getFeatureFieldTypeMap()!=null) {
                        featureFieldTypeMap.putAll(featureResult.getFeatureFieldTypeMap());
                    }
                    for (int i = 0; i < finalSize; i++) {
                        String joinIdValue = entry.getValue().get(i);
                        for (Map<String, Object> featureData : featureResult.getFeatureData()) {
                            if (featureData != null) {
                                if ( joinIdValue.equals(String.valueOf(featureData.get(entry.getKey())))) {
                                    indexFeatrueMap.computeIfAbsent(i, k -> new ConcurrentHashMap<>()).putAll(featureData);
                                    break;
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error("featureview get online features error", e);
                }
            }, executorService);
            futures.add(future);
        }

        FeatureStoreResult featureStoreResult = new FeatureStoreResult();
        List<Map<String, Object>> featureDataList = new ArrayList<>(size);
        // wait all featureview get features
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        for (int i = 0; i < size; i++) {
            if (indexFeatrueMap.get(i)==null) {
                featureDataList.add(new HashMap<>());
            } else {
                featureDataList.add(indexFeatrueMap.get(i));
            }
        }


        featureStoreResult.setFeatureFields(featureFields.toArray(new String[0]));
        featureStoreResult.setFeatureDataList(featureDataList);
        featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
        return featureStoreResult;
    }