in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewFeatureDBDao.java [763:820]
public List<SequenceInfo> fetchData(String key, HashMap<String, Double> playtimefilter,
FeatureViewSeqConfig config, String event, String userIdFields, Long currentime, boolean useOnlineTable) {
List<SequenceInfo> sequenceInfos = new ArrayList<>();
List<String> pks = new ArrayList<>();
for (String e : event.split("\\|")) {
pks.add(String.format("%s\u001D%s", key, e));
}
try {
byte[] content = this.featureDBClient.kkvRequestFeatureDB(pks, this.database, this.schema, this.table, config.getSeqLenOnline());
if (content != null) {
KKVRecordBlock kkvRecordBlock = KKVRecordBlock.getRootAsKKVRecordBlock(ByteBuffer.wrap(content));
for (int i = 0; i < kkvRecordBlock.valuesLength(); i++) {
KKVData kkvData = new KKVData();
kkvRecordBlock.values(kkvData, i);
String pk = kkvData.pk();
String[] userIdEvent = pk.split("\u001D");
if (userIdEvent.length != 2) {
continue;
}
String itemId = "";
if (config.getDeduplicationMethodNum() == 1) {
itemId = kkvData.sk();
} else if (config.getDeduplicationMethodNum() == 2) {
String sk = kkvData.sk();
String[] itemIdTimestamp = sk.split("\u001D");
if (itemIdTimestamp.length != 2) {
continue;
}
itemId = itemIdTimestamp[0];
} else {
continue;
}
SequenceInfo sequenceInfo = new SequenceInfo();
sequenceInfo.setEventField(userIdEvent[1]);
sequenceInfo.setItemIdField(Long.valueOf(itemId));
sequenceInfo.setPlayTimeField(kkvData.playTime());
sequenceInfo.setTimestampField(kkvData.eventTimestamp());
if (Objects.equals(sequenceInfo.getEventField(), "") || sequenceInfo.getItemIdField() == 0) {
continue;
}
if (playtimefilter.containsKey(sequenceInfo.getEventField())) {
double t = playtimefilter.get(sequenceInfo.getEventField());
if (sequenceInfo.getPlayTimeField() <= t) {
continue;
}
}
sequenceInfos.add(sequenceInfo);
}
}
} catch (Exception e) {
log.error(String.format("request featuredb error:%s", e.getMessage()));
throw new RuntimeException(e);
}
return sequenceInfos;
}