eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java [120:150]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef tableDefinition, JobRdbFullPosition recordPosition)
        throws SQLException {
        TableFullPosition position = new TableFullPosition();
        Map<String, Object> preMinPrimaryKeys = new LinkedHashMap<>();
        Map<String, Object> preMaxPrimaryKeys = new LinkedHashMap<>();
        for (String pk : tableDefinition.getPrimaryKeys()) {
            Object min = fetchMinPrimaryKey(dataSource, tableDefinition, preMinPrimaryKeys, pk);
            Object max = fetchMaxPrimaryKey(dataSource, tableDefinition, preMaxPrimaryKeys, pk);
            preMinPrimaryKeys.put(pk, min);
            preMaxPrimaryKeys.put(pk, max);
            position.getCurPrimaryKeyCols().put(pk, min);
            position.getMinPrimaryKeyCols().put(pk, min);
            position.getMaxPrimaryKeyCols().put(pk, max);
        }
        JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition();
        if (recordPosition != null) {
            if (StringUtils.isNotBlank(recordPosition.getPrimaryKeyRecords())) {
                TableFullPosition record = JsonUtils.parseObject(recordPosition.getPrimaryKeyRecords(), TableFullPosition.class);
                if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) {
                    position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols());
                }
            }
            jobRdbFullPosition.setPercent(recordPosition.getPercent());
        }
        long rowCount = queryCurTableRowCount(dataSource, tableDefinition);
        jobRdbFullPosition.setSchema(tableDefinition.getSchemaName());
        jobRdbFullPosition.setTableName(tableDefinition.getTableName());
        jobRdbFullPosition.setMaxCount(rowCount);
        jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position));
        return jobRdbFullPosition;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java [121:151]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef tableDefinition, JobRdbFullPosition recordPosition)
        throws SQLException {
        TableFullPosition position = new TableFullPosition();
        Map<String, Object> preMinPrimaryKeys = new LinkedHashMap<>();
        Map<String, Object> preMaxPrimaryKeys = new LinkedHashMap<>();
        for (String pk : tableDefinition.getPrimaryKeys()) {
            Object min = fetchMinPrimaryKey(dataSource, tableDefinition, preMinPrimaryKeys, pk);
            Object max = fetchMaxPrimaryKey(dataSource, tableDefinition, preMaxPrimaryKeys, pk);
            preMinPrimaryKeys.put(pk, min);
            preMaxPrimaryKeys.put(pk, max);
            position.getCurPrimaryKeyCols().put(pk, min);
            position.getMinPrimaryKeyCols().put(pk, min);
            position.getMaxPrimaryKeyCols().put(pk, max);
        }
        JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition();
        if (recordPosition != null) {
            if (StringUtils.isNotBlank(recordPosition.getPrimaryKeyRecords())) {
                TableFullPosition record = JsonUtils.parseObject(recordPosition.getPrimaryKeyRecords(), TableFullPosition.class);
                if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) {
                    position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols());
                }
            }
            jobRdbFullPosition.setPercent(recordPosition.getPercent());
        }
        long rowCount = queryCurTableRowCount(dataSource, tableDefinition);
        jobRdbFullPosition.setSchema(tableDefinition.getSchemaName());
        jobRdbFullPosition.setTableName(tableDefinition.getTableName());
        jobRdbFullPosition.setMaxCount(rowCount);
        jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position));
        return jobRdbFullPosition;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



