public FeatureResult getSequenceFeatures()

in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewTableStoreDao.java [173:275]


    public FeatureResult getSequenceFeatures(String[] keys, String userIdField, FeatureViewSeqConfig config) {
        FeatureStoreResult featureStoreResult = new FeatureStoreResult();

        String[] selectFields=null;
        if (!StringUtils.isEmpty(config.getPlayTimeField())) {
            selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getPlayTimeField(),config.getTimestampField()};
        } else {
            selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getTimestampField()};
        }

        HashMap<String, Double> playtimefilter = new HashMap<>();
        if (!StringUtils.isEmpty(config.getPlayTimeFilter())) {
            for (String event:Strings.split(config.getPlayTimeFilter(), ';')) {
                String[] s = Strings.split(event, ':');
                if (s.length==2) {
                    playtimefilter.put(s[0], Double.valueOf(s[1]));
                }
            }
        }

        long currentime=System.currentTimeMillis();

        String pkField = String.format("%s_%s", userIdField, config.getEventField());
        String skFiled="";
        if (config.getDeduplicationMethodNum()==1) {
            skFiled=""+config.getItemIdField();//item_id
        } else if (config.getDeduplicationMethodNum()==2) {
            skFiled=String.format("%s_%s",config.getItemIdField(),config.getTimestampField());//item_id_event_time
        }
        Set<String> featureFieldList = new HashSet<>();
        List<Map<String, Object>> featureDataList = new ArrayList<>();


        String[] events=new String[config.getSeqConfigs().length];
        for (int i=0;i<events.length;i++) {
            events[i]=config.getSeqConfigs()[i].getSeqEvent();
        }

        for (String key:keys) {
            HashMap<String, String> keyEventsDatasOnline = new HashMap<>();
            for (String event : events) {
                //Traverse to get online data
                List<SequenceInfo> seqOnlineDB = getOtsSeqResult(key, config, selectFields, playtimefilter, pkField, skFiled, event, this.onlinetable, currentime, true);

                //Traverse to get offline data
                List<SequenceInfo> seqOfflineDB = getOtsSeqResult(key, config, selectFields, playtimefilter, pkField, skFiled, event, this.offlinetable, currentime, false);

                //merge
                seqOnlineDB = MergeOnOfflineSeq(seqOnlineDB, seqOfflineDB, config, event);

                Map<String, String> resultData = disposeDB(seqOnlineDB,selectFields,config,event,currentime);

                if (seqOnlineDB.size() > 0) {
                    keyEventsDatasOnline.putAll(resultData);
                }

            }
            if (keyEventsDatasOnline.size() > 0) {
                keyEventsDatasOnline.put(this.primaryKeyField, key);
            }

            if (!keyEventsDatasOnline.isEmpty()) {
                featureFieldList.addAll(keyEventsDatasOnline.keySet());

                boolean found = false;
                for (Map<String, Object> features : featureDataList) {
                    if (features.containsKey(keyEventsDatasOnline.get(this.primaryKeyField))) {
                        for (Map.Entry<String,String> entry:keyEventsDatasOnline.entrySet()) {
                            features.put(entry.getKey(),entry.getValue());
                        }
                        found = true;
                        break;

                    }
                }

                if (!found) {
                    Map<String, Object> featureData = new HashMap<>();
                    for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
                        featureData.put(entry.getKey(), entry.getValue());
                    }
                    featureDataList.add(featureData);
                }
            }

        }
        String[] fields = new String[featureFieldList.size()];
        int f=0;
        for (String field:featureFieldList) {
            fields[f++]=field;
        }

        Map<String, FSType> featureFieldTypeMap = new HashMap<>();
        for (String featureName : featureFieldList) {
            featureFieldTypeMap.put(featureName, FSType.FS_STRING);
        }
        featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
        featureStoreResult.setFeatureFieldTypeMap(this.fieldTypeMap);
        featureStoreResult.setFeatureFields(fields);
        featureStoreResult.setFeatureDataList(featureDataList);

        return featureStoreResult;
    }