in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewIgraphDao.java [142:229]
public FeatureResult getSequenceFeatures(String[] keys, String userIdField, FeatureViewSeqConfig config) {
FeatureStoreResult featureStoreResult = new FeatureStoreResult();
String[] selectFields=null;
if (!StringUtils.isEmpty(config.getPlayTimeField())) {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getPlayTimeField(),config.getTimestampField()};
} else {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getTimestampField()};
}
long currentime=System.currentTimeMillis()/1000;
HashMap<String, Double> playtimefilter = new HashMap<>();
if (!StringUtils.isEmpty(config.getPlayTimeFilter())) {
for (String event:Strings.split(config.getPlayTimeFilter(), ';')) {
String[] s = Strings.split(event, ':');
if (s.length == 2) {//key有值
playtimefilter.put(s[0], Double.valueOf(s[1]));
}
}
}
String[] events=new String[config.getSeqConfigs().length];
for (int i=0;i<events.length;i++) {
events[i]=config.getSeqConfigs()[i].getSeqEvent();
}
Set<String> featureFieldList = new HashSet<>();
List<Map<String, Object>> featureDataList = new ArrayList<>();
//Traverse to get online data
for (String key:keys) {
HashMap<String, String> keyEventsDatasOnline = new HashMap<>();
for (String event:events) {
List<SequenceInfo> igraphData = getSeqDB(key, selectFields, playtimefilter, event, config, currentime);
Map<String, String> resultDB = disposeDB(igraphData,selectFields,config,event,currentime);
if (igraphData.size()>0) {
keyEventsDatasOnline.putAll(resultDB);
}
}
if (keyEventsDatasOnline.size()>0) {
keyEventsDatasOnline.put(this.primaryKeyField,key);
}
if (!keyEventsDatasOnline.isEmpty()) {
featureFieldList.addAll(keyEventsDatasOnline.keySet());
boolean found = false;
for (Map<String, Object> features : featureDataList) {
if (features.containsKey(keyEventsDatasOnline.get(this.primaryKeyField))) {
for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
features.put(entry.getKey(), entry.getKey());
}
found = true;
break;
}
}
if (!found) {
Map<String, Object> featureData = new HashMap<>();
for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
featureData.put(entry.getKey(), entry.getValue());
}
featureDataList.add(featureData);
}
}
}
String[] fields = new String[featureFieldList.size()];
int f=0;
for (String field:featureFieldList) {
fields[f++]=field;
}
Map<String, FSType> featureFieldTypeMap = new HashMap<>();
for (String featureName : featureFieldList) {
featureFieldTypeMap.put(featureName, FSType.FS_STRING);
}
featureStoreResult.setFeatureFields(fields);
featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
featureStoreResult.setFeatureFieldTypeMap(this.fieldTypeMap);
featureStoreResult.setFeatureDataList(featureDataList);
return featureStoreResult;
}