public void start()

in connectors/rocketmq-connect-iotdb/src/main/java/org/apache/rocketmq/connect/iotdb/replicator/source/IotdbQuery.java [49:108]


    public void start(RecordOffset recordOffset, KeyValue keyValue) {
        if (session == null) {
            session =
                new Session.Builder()
                    .host(config.getIotdbHost())
                    .port(config.getIotdbPort())
                    .build();
            try {
                session.open();
            } catch (IoTDBConnectionException e) {
                log.error("iotdb session open failed", e);
            }
        }

        String path = null;
        try {
            path = keyValue.getString(IotdbConstant.IOTDB_PATH);
            if (path == null || path.trim().equals("")) {
                log.warn("the path is empty,please check config");
                return;
            }
            long time = 0;
            if (recordOffset != null && recordOffset.getOffset() != null && recordOffset.getOffset().size() > 0) {
                final Long offsetValue = (Long) recordOffset.getOffset().get(keyValue.getString(IotdbConstant.IOTDB_PARTITION) + path);
                if (offsetValue != null) {
                    time = offsetValue;
                }
            }
            int limit = 500;
            long offset = 1;
            String sql = "select * from " + path + " where time > " + time + " limit " + limit + " offset " + offset;
            while (true) {
                sql = "select * from " + path + " where time > " + time + " limit " + limit + " offset " + offset;
                final SessionDataSet timeseries = session.executeQueryStatement(sql);

                final List<String> names = timeseries.getColumnNames();
                if (!timeseries.hasNext()) {
                    break;
                }
                final List<String> types = timeseries.getColumnTypes();
                while (timeseries.hasNext()) {
                    DeviceEntity entity = new DeviceEntity();
                    entity.setColumnNames(names);
                    entity.setColumnTypes(types);
                    entity.setRowRecord(timeseries.next());
                    entity.setPath(path);
                    this.replicator.getQueue().add(entity);
                }
                offset += limit;
            }

        } catch (StatementExecutionException e) {
            log.error("search data from path:[{}] failed, cause StatementExecutionException", path, e);
        } catch (IoTDBConnectionException e) {
            log.error("search data from path:[{}] failed, cause IoTDBConnectionException", path, e);
        } catch (Exception e) {
            log.error("search data from path:[{}] failed, cause Exception", path, e);
        }

    }