in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewTableStoreDao.java [343:424]
public List<SequenceInfo> getOtsSeqResult(String key,FeatureViewSeqConfig config, String[] selectFields,HashMap<String, Double> playtimefilter,
String pkField,String skField, String event, String tablename, Long currentime,boolean useOnlinetable) {
ArrayList<SequenceInfo> sequenceInfos = new ArrayList<>();
//Range-based search
GetRangeRequest rangeRequest = new GetRangeRequest();
//Bind query criteria
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(tablename);
PrimaryKeyBuilder startPK =PrimaryKeyBuilder.createPrimaryKeyBuilder();
startPK.addPrimaryKeyColumn(pkField,PrimaryKeyValue.fromString(key+"_"+event));
startPK.addPrimaryKeyColumn(skField,PrimaryKeyValue.INF_MIN);
PrimaryKeyBuilder endPK =PrimaryKeyBuilder.createPrimaryKeyBuilder();
endPK.addPrimaryKeyColumn(pkField,PrimaryKeyValue.fromString(key+"_"+event));
endPK.addPrimaryKeyColumn(skField,PrimaryKeyValue.INF_MAX);
rangeRowQueryCriteria.setMaxVersions(1);
//Binding starting range(pk,sk)
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(startPK.build());
//End of binding range
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(endPK.build());
rangeRowQueryCriteria.setDirection(Direction.FORWARD);
rangeRowQueryCriteria.addColumnsToGet(selectFields);
if (useOnlinetable) {
rangeRowQueryCriteria.setTimeRange(new TimeRange((currentime-86400*5*1000),currentime));
}
rangeRequest.setRangeRowQueryCriteria(rangeRowQueryCriteria);
//Get response result
GetRangeResponse range = this.syncClient.getRange(rangeRequest);
//Process the data and add it to the map
if (StringUtils.isEmpty(range.getConsumedCapacity())) {
System.out.println("get range don't have any data");
} else {
for (Row r:range.getRows()) {
if (!StringUtils.isEmpty(playtimefilter) && !StringUtils.isEmpty(playtimefilter.get(event))) {
if (r.getLatestColumn(config.getPlayTimeField()).getValue().asDouble() < playtimefilter.get(event)) {
continue;
}
}
if (r.getPrimaryKey().equals(null)) {
continue;
}
if (r.getColumns().length==0) {
continue;
}
String qz="";
String newname="";
SequenceInfo sequenceInfo = new SequenceInfo();
if (config.getDeduplicationMethodNum()==1) {//pk:item_id
for (SeqConfig s:config.getSeqConfigs()) {
if (s.getSeqEvent().equals(event)) {
qz=s.getOnlineSeqName();//click_5_seq
newname=s.getOnlineSeqName()+"_"+config.getItemIdField();//click_5_seq_item_id
break;
}
}
Long item_id= Long.valueOf(String.valueOf(r.getPrimaryKey().getPrimaryKeyColumn(1).getValue()));
sequenceInfo.setItemIdField(item_id);
}
for (Column c:r.getColumns()) {
String result=String.valueOf(c.getValue());
if (c.getName().equals(config.getItemIdField())) {
sequenceInfo.setItemIdField(Long.valueOf(result));
} else if (c.getName().equals(config.getEventField())) {
sequenceInfo.setEventField(result);
} else if (c.getName().equals(config.getPlayTimeField())) {
sequenceInfo.setPlayTimeField(Double.valueOf(result));
} else if (c.getName().equals(config.getTimestampField())) {
sequenceInfo.setTimestampField(Long.valueOf(result));
}
}
sequenceInfos.add(sequenceInfo);
}
}
return sequenceInfos;
}