public void poll()

in connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java [81:148]


    public void poll()  {
        try {
            LinkedList<Table> tableLinkedList = new LinkedList<>();

            for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
                String dbName = entry.getKey();
                Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, Table> tableEntry = iterator.next();
                    String tableName = tableEntry.getKey();
                    Table table = tableEntry.getValue();
                    Map<String, String> tableFilterMap = table.getFilterMap();


                    Select selectFrom = QueryBuilder.selectFrom(dbName, tableName).all();
                    if (tableFilterMap != null && !tableFilterMap.keySet().contains("NO-FILTER")) {
                        for (String key: tableFilterMap.keySet()) {
                            String value = tableFilterMap.get(key);
                            long increment = 0;
                            // get increment
                            final RecordOffset recordOffset = offsetStorageReader.readOffset(this.buildRecordPartition(dbName, tableName));
                            if (recordOffset != null &&  recordOffset.getOffset().size() > 0) {
                                final Object offset = recordOffset.getOffset().get(ConstDefine.INCREASE + dbName + tableName);
                                if (offset != null) {
                                    increment = Long.valueOf(offset.toString());
                                }
                            }
                            selectFrom = selectFrom.whereColumn(key).isGreaterThan(QueryBuilder.literal(increment)).allowFiltering();
                        }
                    }


                    SimpleStatement stmt;
                    boolean finishUpdate = false;
                    log.info("trying to execute sql query,{}", selectFrom.asCql());
                    ResultSet result = null;
                    while (!cqlSession.isClosed() && !finishUpdate) {
                        stmt = selectFrom.build();
                        result = cqlSession.execute(stmt);
                        if (result.wasApplied()) {
                            log.info("query columns success, executed cql query {}", selectFrom.asCql());
                        }
                        finishUpdate = true;
                    }

                    List<String> colList = tableEntry.getValue().getColList();
                    List<String> dataTypeList = tableEntry.getValue().getRawDataTypeList();
                    List<ColumnParser> parserList = tableEntry.getValue().getParserList();

                    for (Row row : result) {
                        Table tableWithData = new Table(dbName, tableName);
                        tableWithData.setColList(colList);
                        tableWithData.setRawDataTypeList(dataTypeList);
                        tableWithData.setParserList(parserList);

                        for (String col : colList) {
                            tableWithData.getDataList().add(row.getObject(col));
                        }
                        tableLinkedList.add(tableWithData);
                    }
                }
            }
            list = tableLinkedList;
        } catch (Exception e) {
            log.error("fail to poll data, {}", e);
        }

    }