in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewTableStoreDao.java [173:275]
public FeatureResult getSequenceFeatures(String[] keys, String userIdField, FeatureViewSeqConfig config) {
FeatureStoreResult featureStoreResult = new FeatureStoreResult();
String[] selectFields=null;
if (!StringUtils.isEmpty(config.getPlayTimeField())) {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getPlayTimeField(),config.getTimestampField()};
} else {
selectFields=new String[]{config.getItemIdField(),config.getEventField(),config.getTimestampField()};
}
HashMap<String, Double> playtimefilter = new HashMap<>();
if (!StringUtils.isEmpty(config.getPlayTimeFilter())) {
for (String event:Strings.split(config.getPlayTimeFilter(), ';')) {
String[] s = Strings.split(event, ':');
if (s.length==2) {
playtimefilter.put(s[0], Double.valueOf(s[1]));
}
}
}
long currentime=System.currentTimeMillis();
String pkField = String.format("%s_%s", userIdField, config.getEventField());
String skFiled="";
if (config.getDeduplicationMethodNum()==1) {
skFiled=""+config.getItemIdField();//item_id
} else if (config.getDeduplicationMethodNum()==2) {
skFiled=String.format("%s_%s",config.getItemIdField(),config.getTimestampField());//item_id_event_time
}
Set<String> featureFieldList = new HashSet<>();
List<Map<String, Object>> featureDataList = new ArrayList<>();
String[] events=new String[config.getSeqConfigs().length];
for (int i=0;i<events.length;i++) {
events[i]=config.getSeqConfigs()[i].getSeqEvent();
}
for (String key:keys) {
HashMap<String, String> keyEventsDatasOnline = new HashMap<>();
for (String event : events) {
//Traverse to get online data
List<SequenceInfo> seqOnlineDB = getOtsSeqResult(key, config, selectFields, playtimefilter, pkField, skFiled, event, this.onlinetable, currentime, true);
//Traverse to get offline data
List<SequenceInfo> seqOfflineDB = getOtsSeqResult(key, config, selectFields, playtimefilter, pkField, skFiled, event, this.offlinetable, currentime, false);
//merge
seqOnlineDB = MergeOnOfflineSeq(seqOnlineDB, seqOfflineDB, config, event);
Map<String, String> resultData = disposeDB(seqOnlineDB,selectFields,config,event,currentime);
if (seqOnlineDB.size() > 0) {
keyEventsDatasOnline.putAll(resultData);
}
}
if (keyEventsDatasOnline.size() > 0) {
keyEventsDatasOnline.put(this.primaryKeyField, key);
}
if (!keyEventsDatasOnline.isEmpty()) {
featureFieldList.addAll(keyEventsDatasOnline.keySet());
boolean found = false;
for (Map<String, Object> features : featureDataList) {
if (features.containsKey(keyEventsDatasOnline.get(this.primaryKeyField))) {
for (Map.Entry<String,String> entry:keyEventsDatasOnline.entrySet()) {
features.put(entry.getKey(),entry.getValue());
}
found = true;
break;
}
}
if (!found) {
Map<String, Object> featureData = new HashMap<>();
for (Map.Entry<String, String> entry : keyEventsDatasOnline.entrySet()) {
featureData.put(entry.getKey(), entry.getValue());
}
featureDataList.add(featureData);
}
}
}
String[] fields = new String[featureFieldList.size()];
int f=0;
for (String field:featureFieldList) {
fields[f++]=field;
}
Map<String, FSType> featureFieldTypeMap = new HashMap<>();
for (String featureName : featureFieldList) {
featureFieldTypeMap.put(featureName, FSType.FS_STRING);
}
featureStoreResult.setFeatureFieldTypeMap(featureFieldTypeMap);
featureStoreResult.setFeatureFieldTypeMap(this.fieldTypeMap);
featureStoreResult.setFeatureFields(fields);
featureStoreResult.setFeatureDataList(featureDataList);
return featureStoreResult;
}