in src/main/java/com/aliyun/openservices/paifeaturestore/dao/FeatureViewHologresDao.java [306:394]
public List<SequenceInfo> getSeqDB(String key, String[] selectFields, HashMap<String, Double> seqPlayFilter,
FeatureViewSeqConfig config, String event, String userIdFields, Long currentime, boolean useOnlineTable) {
List<SequenceInfo> sequenceInfos = new ArrayList<>();
ArrayList<Field> fields = new ArrayList<>();
for (String f:selectFields) {
fields.add(field(String.format("\"%s\"",f)));
}
String primaryKeyField = String.format("\"%s\"",this.primaryKeyField);
DSLContext dsl=DSL.using(SQLDialect.POSTGRES);
FeatureViewSeqConfig fsc = new FeatureViewSeqConfig();
String table="";
Query query=null;
long nt=(currentime-86400*5);
if (useOnlineTable) {
table=onlinetable;
query=dsl.select(fields)
.from(table)
.where(field(primaryKeyField).eq(key)).and(field(config.getEventField()).eq(event)).and(field(config.getTimestampField()).greaterThan(nt))
.orderBy(field(config.getTimestampField()).desc())
.limit(config.getSeqLenOnline());
} else {
table=offlinetable;
query=dsl.select(fields)
.from(offlinetable)
.where(field(primaryKeyField).eq(key)).and(field(config.getEventField()).eq(event))
.orderBy(field(config.getTimestampField()).desc())
.limit(config.getSeqLenOnline());
}
String sql=query.getSQL();
try (Connection connection=this.datasource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
int index = 1;
if (this.fieldTypeMap.get(this.primaryKeyField) == FSType.FS_STRING) {
preparedStatement.setString(index++, key);
}else if (this.fieldTypeMap.get(this.primaryKeyField) == FSType.FS_INT32) {
preparedStatement.setInt(index++, Integer.parseInt(key));
}else if (this.fieldTypeMap.get(this.primaryKeyField) == FSType.FS_INT64){
preparedStatement.setLong(index++, Long.parseLong(key));
}
preparedStatement.setString(index++, event);
if (useOnlineTable) {
preparedStatement.setLong(index++, nt);
preparedStatement.setInt(index++,config.getSeqLenOnline());
} else {
preparedStatement.setInt(index++,config.getSeqLenOnline());
}
try (ResultSet rs = preparedStatement.executeQuery()) {
String qz="";
for (SeqConfig s:config.getSeqConfigs()) {
if (s.getSeqEvent().equals(event)) {
qz=s.getOnlineSeqName();
break;
}
}
while (rs.next()) {
if (!StringUtils.isEmpty(seqPlayFilter) && !StringUtils.isEmpty(seqPlayFilter.get(event))) {
if (rs.getDouble(config.getPlayTimeField()) < seqPlayFilter.get(event)) {
continue;
}
}
SequenceInfo sequenceInfo = new SequenceInfo();
for (String name:selectFields) {
if (name.equals(config.getItemIdField())) {
sequenceInfo.setItemIdField(Long.valueOf(rs.getString(name)));
} else if (name.equals(config.getEventField())) {
sequenceInfo.setEventField(rs.getString(name));
} else if (name.equals(config.getPlayTimeField())) {
sequenceInfo.setPlayTimeField(Double.valueOf(rs.getString(name)));
} else if (name.equals(config.getTimestampField())) {
sequenceInfo.setTimestampField(Long.valueOf(rs.getString(name)));
}
}
sequenceInfos.add(sequenceInfo);
}
}
} catch (Exception e) {
log.error("getSequenceFeatures from hologres error", e);
throw new RuntimeException(e);
}
return sequenceInfos;
}