in connectors/rocketmq-connect-iotdb/src/main/java/org/apache/rocketmq/connect/iotdb/replicator/source/IotdbQuery.java [49:108]
public void start(RecordOffset recordOffset, KeyValue keyValue) {
if (session == null) {
session =
new Session.Builder()
.host(config.getIotdbHost())
.port(config.getIotdbPort())
.build();
try {
session.open();
} catch (IoTDBConnectionException e) {
log.error("iotdb session open failed", e);
}
}
String path = null;
try {
path = keyValue.getString(IotdbConstant.IOTDB_PATH);
if (path == null || path.trim().equals("")) {
log.warn("the path is empty,please check config");
return;
}
long time = 0;
if (recordOffset != null && recordOffset.getOffset() != null && recordOffset.getOffset().size() > 0) {
final Long offsetValue = (Long) recordOffset.getOffset().get(keyValue.getString(IotdbConstant.IOTDB_PARTITION) + path);
if (offsetValue != null) {
time = offsetValue;
}
}
int limit = 500;
long offset = 1;
String sql = "select * from " + path + " where time > " + time + " limit " + limit + " offset " + offset;
while (true) {
sql = "select * from " + path + " where time > " + time + " limit " + limit + " offset " + offset;
final SessionDataSet timeseries = session.executeQueryStatement(sql);
final List<String> names = timeseries.getColumnNames();
if (!timeseries.hasNext()) {
break;
}
final List<String> types = timeseries.getColumnTypes();
while (timeseries.hasNext()) {
DeviceEntity entity = new DeviceEntity();
entity.setColumnNames(names);
entity.setColumnTypes(types);
entity.setRowRecord(timeseries.next());
entity.setPath(path);
this.replicator.getQueue().add(entity);
}
offset += limit;
}
} catch (StatementExecutionException e) {
log.error("search data from path:[{}] failed, cause StatementExecutionException", path, e);
} catch (IoTDBConnectionException e) {
log.error("search data from path:[{}] failed, cause IoTDBConnectionException", path, e);
} catch (Exception e) {
log.error("search data from path:[{}] failed, cause Exception", path, e);
}
}