eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java [95:118]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void initPositions() {
        for (RdbDBDefinition database : config.getSourceConnectorConfig().getDatabases()) {
            for (RdbTableDefinition table : database.getTables()) {
                try {
                    RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName());
                    RdbTableDefinition tableDefinition;
                    if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) {
                        log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName());
                        continue;
                    }
                    log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName());

                    JobRdbFullPosition recordPosition = positions.get(simpleTable);
                    if (recordPosition == null || !recordPosition.isFinished()) {
                        positions.put(simpleTable,
                            fetchTableInfo(DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, recordPosition));
                    }
                } catch (Exception e) {
                    log.error("process schema [{}] table [{}] position fail", database.getSchemaName(), table.getTableName(), e);
                }

            }
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java [96:119]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void initPositions() {
        for (RdbDBDefinition database : config.getSourceConnectorConfig().getDatabases()) {
            for (RdbTableDefinition table : database.getTables()) {
                try {
                    RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName());
                    RdbTableDefinition tableDefinition;
                    if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) {
                        log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName());
                        continue;
                    }
                    log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName());

                    JobRdbFullPosition recordPosition = positions.get(simpleTable);
                    if (recordPosition == null || !recordPosition.isFinished()) {
                        positions.put(simpleTable,
                            fetchTableInfo(DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, recordPosition));
                    }
                } catch (Exception e) {
                    log.error("process schema [{}] table [{}] position fail", database.getSchemaName(), table.getTableName(), e);
                }

            }
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



