in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewHologresDao.java [147:239]
public FeatureResult getSequenceFeatures(String[] keys, String userIdFields, FeatureViewSeqConfig config) {
FeatureStoreResult featureStoreResult = new FeatureStoreResult();
String[] selectFields=null;
if (!StringUtils.isEmpty(config.getPlayTimeField())) {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getPlayTimeField(),config.getTimestampField()};
} else {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getTimestampField()};
}
long currentime = System.currentTimeMillis()/1000; // unit: second
HashMap<String, Double> seqPlayFilter = new HashMap<>();//<"click":10,"praise":5>
if (!StringUtils.isEmpty(config.getPlayTimeFilter())) {
for (String event:Strings.split(config.getPlayTimeFilter(), ';')) {//"click:10"
String[] et=Strings.split(event,':');//{"click","10"}
if (et.length==2) {
seqPlayFilter.put(et[0], Double.valueOf(et[1]));//<click,10>
}
}
}
//Traverse to get online data
String[] events=new String[config.getSeqConfigs().length];
for (int i=0;i<events.length;i++) {
events[i]=config.getSeqConfigs()[i].getSeqEvent();
}
Set<String> featureFieldList = new HashSet<>();
List<Map<String, Object>> featureDataList = new ArrayList<>();
for (String key:keys) {
HashMap<String, String> keyEventsDatasOnline = new HashMap<>();
for (String event:events) {
List<SequenceInfo> seqOnlineDB = getSeqDB(key, selectFields, seqPlayFilter, config, event, userIdFields, currentime, true);
List<SequenceInfo> seqOfflineDB = getSeqDB(key, selectFields, seqPlayFilter, config, event, userIdFields, currentime, false);
//merge
seqOnlineDB=MergeOnOfflineSeq(seqOnlineDB,seqOfflineDB,config,event);
Map<String, String> resultData = disposeDB(seqOnlineDB,selectFields,config,event,currentime);
if (seqOnlineDB.size()>0) {
keyEventsDatasOnline.putAll(resultData);
}
}
if (keyEventsDatasOnline.size()>0) {
keyEventsDatasOnline.put(this.primaryKeyField,key);
}
if (!keyEventsDatasOnline.isEmpty()) {
featureFieldList.addAll(keyEventsDatasOnline.keySet());
boolean found = false;
for (Map<String, Object> features : featureDataList) {
if (features.containsKey(keyEventsDatasOnline.get(this.primaryKeyField))) {
for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
features.put(entry.getKey(), entry.getKey());
}
found = true;
break;
}
}
if (!found) {
Map<String, Object> featureData = new HashMap<>();
for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
featureData.put(entry.getKey(), entry.getValue());
}
featureDataList.add(featureData);
}
}
}
String[] fields = new String[featureFieldList.size()];
int f=0;
for (String field:featureFieldList) {
fields[f++]=field;
}
Map<String, FSType> featureFieldTypeMap = new HashMap<>();
for (String featureName : featureFieldList) {
featureFieldTypeMap.put(featureName, FSType.FS_STRING);
}
featureStoreResult.setFeatureFields(featureFieldList.toArray(new String[0]));
featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
featureStoreResult.setFeatureFields(fields);
featureStoreResult.setFeatureDataList(featureDataList);
return featureStoreResult;
}