in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java [96:148]
public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException {
RowData lookupKey = GenericRowData.of(obj);
if (cache != null) {
RowData cacheRow = cache.getIfPresent(lookupKey);
if (cacheRow != null) {
collect(cacheRow);
return;
}
}
long timestamp = lookupKey.getLong(0);
String sql = String.format("%s WHERE TIME=%d", this.sql, timestamp);
SessionDataSet dataSet = session.executeQueryStatement(sql);
List<String> columnNames = dataSet.getColumnNames();
columnNames.remove("Time");
RowRecord rowRecord = dataSet.next();
if (rowRecord == null) {
ArrayList<Object> values = new ArrayList<>();
values.add(timestamp);
for (int i = 0; i < schema.size(); i++) {
values.add(null);
}
GenericRowData rowData = GenericRowData.of(values.toArray());
collect(rowData);
return;
}
List<Field> fields = rowRecord.getFields();
ArrayList<Object> values = new ArrayList<>();
values.add(timestamp);
for (Tuple2<String, DataType> field : schema) {
if (!columnNames.contains(field.f0)) {
values.add(null);
continue;
}
int index = columnNames.indexOf(field.f0);
DataType flinkType = field.f1;
TSDataType iotdbType = fields.get(index).getDataType();
if (!Utils.isTypeEqual(iotdbType, flinkType)) {
throw new IllegalSchemaException(
String.format(
"The data type of column `%s` is different in IoTDB and Flink", field.f0));
}
values.add(Utils.getValue(fields.get(index), field.f1));
}
GenericRowData rowData = GenericRowData.of(values.toArray());
if (cache != null) {
cache.put(lookupKey, rowData);
}
collect(rowData);
}