private CanalEventParser doInitEventParser()

in instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java [261:422]


    private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAddress> dbAddresses) {
        CanalEventParser eventParser;
        if (type.isMysql()) {
            MysqlEventParser mysqlEventParser = null;
            if (StringUtils.isNotEmpty(parameters.getRdsAccesskey())
                && StringUtils.isNotEmpty(parameters.getRdsSecretkey())
                && StringUtils.isNotEmpty(parameters.getRdsInstanceId())) {

                mysqlEventParser = new RdsBinlogEventParserProxy();
                ((RdsBinlogEventParserProxy) mysqlEventParser).setAccesskey(parameters.getRdsAccesskey());
                ((RdsBinlogEventParserProxy) mysqlEventParser).setSecretkey(parameters.getRdsSecretkey());
                ((RdsBinlogEventParserProxy) mysqlEventParser).setInstanceId(parameters.getRdsInstanceId());
            } else {
                mysqlEventParser = new MysqlEventParser();
            }
            mysqlEventParser.setDestination(destination);
            // 编码参数
            mysqlEventParser.setConnectionCharset(parameters.getConnectionCharset());
            // 网络相关参数1
            mysqlEventParser.setDefaultConnectionTimeoutInSeconds(parameters.getDefaultConnectionTimeoutInSeconds());
            mysqlEventParser.setSendBufferSize(parameters.getSendBufferSize());
            mysqlEventParser.setReceiveBufferSize(parameters.getReceiveBufferSize());
            // 心跳检查参数
            mysqlEventParser.setDetectingEnable(parameters.getDetectingEnable());
            mysqlEventParser.setDetectingSQL(parameters.getDetectingSQL());
            mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
            // 数据库信息参数
            mysqlEventParser.setSlaveId(parameters.getSlaveId());
            if (!CollectionUtils.isEmpty(dbAddresses)) {
                mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                    parameters.getDbUsername(),
                    parameters.getDbPassword(),
                    parameters.getDefaultDatabaseName(),
                    parameters.getSslInfo()));

                if (dbAddresses.size() > 1) {
                    mysqlEventParser.setStandbyInfo(new AuthenticationInfo(dbAddresses.get(1),
                        parameters.getDbUsername(),
                        parameters.getDbPassword(),
                        parameters.getDefaultDatabaseName(),
                        parameters.getSslInfo()));
                }
            }

            if (!CollectionUtils.isEmpty(parameters.getPositions())) {
                EntryPosition masterPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(0),
                    EntryPosition.class);
                // binlog位置参数
                mysqlEventParser.setMasterPosition(masterPosition);

                if (parameters.getPositions().size() > 1) {
                    EntryPosition standbyPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(1),
                        EntryPosition.class);
                    mysqlEventParser.setStandbyPosition(standbyPosition);
                }
            }
            mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
            mysqlEventParser.setProfilingEnabled(false);
            mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
            mysqlEventParser.setParallel(parameters.getParallel());
            mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
            mysqlEventParser.setMultiStreamEnable(parameters.getMultiStreamEnable());
            // tsdb
            if (parameters.getTsdbSnapshotInterval() != null) {
                mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());
            }
            if (parameters.getTsdbSnapshotExpire() != null) {
                mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
            }
            boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
            // manager启动模式默认使用mysql tsdb机制
            final String tsdbSpringXml = "classpath:spring/tsdb/mysql-tsdb.xml";
            if (tsdbEnable) {
                mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {

                    @Override
                    public void destory(String destination) {
                        TableMetaTSDBBuilder.destory(destination);
                    }

                    @Override
                    public TableMetaTSDB build(String destination, String springXml) {
                        synchronized (CanalInstanceWithManager.class) {
                            try {
                                System.setProperty("canal.instance.tsdb.url", parameters.getTsdbJdbcUrl());
                                System.setProperty("canal.instance.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
                                System.setProperty("canal.instance.tsdb.dbPassword", parameters.getTsdbJdbcPassword());
                                System.setProperty("canal.instance.destination", destination);

                                return TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
                            } finally {
                                // reset
                                System.setProperty("canal.instance.destination", "");
                                System.setProperty("canal.instance.tsdb.url", "");
                                System.setProperty("canal.instance.tsdb.dbUsername", "");
                                System.setProperty("canal.instance.tsdb.dbPassword", "");
                            }
                        }
                    }
                });
                mysqlEventParser.setTsdbJdbcUrl(parameters.getTsdbJdbcUrl());
                mysqlEventParser.setTsdbJdbcUserName(parameters.getTsdbJdbcUserName());
                mysqlEventParser.setTsdbJdbcPassword(parameters.getTsdbJdbcPassword());
                mysqlEventParser.setTsdbSpringXml(tsdbSpringXml);
                mysqlEventParser.setEnableTsdb(tsdbEnable);
            }
            eventParser = mysqlEventParser;
        } else if (type.isLocalBinlog()) {
            LocalBinlogEventParser localBinlogEventParser = new LocalBinlogEventParser();
            localBinlogEventParser.setDestination(destination);
            localBinlogEventParser.setBufferSize(parameters.getReceiveBufferSize());
            localBinlogEventParser.setConnectionCharset(parameters.getConnectionCharset());
            localBinlogEventParser.setDirectory(parameters.getLocalBinlogDirectory());
            localBinlogEventParser.setProfilingEnabled(false);
            localBinlogEventParser.setDetectingEnable(parameters.getDetectingEnable());
            localBinlogEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
            localBinlogEventParser.setFilterTableError(parameters.getFilterTableError());
            localBinlogEventParser.setParallel(parameters.getParallel());
            // 数据库信息,反查表结构时需要
            if (!CollectionUtils.isEmpty(dbAddresses)) {
                localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                    parameters.getDbUsername(),
                    parameters.getDbPassword(),
                    parameters.getDefaultDatabaseName(),
                    parameters.getSslInfo()));
            }

            eventParser = localBinlogEventParser;
        } else if (type.isOracle()) {
            throw new CanalException("unsupport SourcingType for " + type);
        } else {
            throw new CanalException("unsupport SourcingType for " + type);
        }

        // add transaction support at 2012-12-06
        if (eventParser instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
            abstractEventParser.setTransactionSize(parameters.getTransactionSize());
            abstractEventParser.setLogPositionManager(initLogPositionManager());
            abstractEventParser.setAlarmHandler(getAlarmHandler());
            abstractEventParser.setEventSink(getEventSink());

            if (StringUtils.isNotEmpty(filter)) {
                AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(filter);
                abstractEventParser.setEventFilter(aviaterFilter);
            }

            // 设置黑名单
            if (StringUtils.isNotEmpty(parameters.getBlackFilter())) {
                AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(parameters.getBlackFilter());
                abstractEventParser.setEventBlackFilter(aviaterFilter);
            }
        }
        if (eventParser instanceof MysqlEventParser) {
            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;

            // 初始化haController,绑定与eventParser的关系,haController会控制eventParser
            CanalHAController haController = initHaController();
            mysqlEventParser.setHaController(haController);
        }
        return eventParser;
    }