func()

in plugins/input/canal/input_canal.go [690:853]


func (sc *ServiceCanal) Start(c pipeline.Collector) error {
	sc.lastErrorCount = 0
	sc.shutdown = make(chan struct{}, 1)
	sc.waitGroup.Add(1)
	defer sc.waitGroup.Done()
	sc.collector = c

	shouldShutdown, err := sc.newCanal()
	if err != nil {
		logger.Error(sc.context.GetRuntimeContext(),
			"CANAL_START_ALARM", "service_canal plugin only supports ROW mode", err)
		return err
	}
	if shouldShutdown {
		logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because shutdown is signaled during newCanal")
		return nil
	}

	// Check if the GTID mode is available on server, if not, use binlog-file mode.
	// Although we have added retry logic here, we can not deal with such case:
	// sc.canal turns into invalid. In such case, this loop will run continuously and
	// user will receive alarms (rare case, ignore it now).
	for {
		shouldRetry := false
		sc.isGTIDEnabled, shouldRetry, err = sc.getGTIDMode()
		if nil == err {
			break
		}
		err = fmt.Errorf("Check GTID mode failed, error: %v", err)
		logger.Warning(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", err.Error())
		if shouldRetry {
			if util.RandomSleep(time.Second*5, 0.1, sc.shutdown) {
				sc.canal.Close()
				logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because shutdown during getGTIDMode")
				return nil
			}
		}
	}
	if sc.isGTIDEnabled {
		logger.Info(sc.context.GetRuntimeContext(), "GTID mode is enabled, use it as checkpoint")
	} else {
		logger.Info(sc.context.GetRuntimeContext(), "GTID mode is not supported or disabled, use binlog-file as checkpoint")
	}

	// Initialize checkpoint, if server does not support or disable GTID mode,
	// clear GTID in checkpoint.
	sc.initCheckPoint()
	if !sc.isGTIDEnabled && sc.checkpoint.GTID != "" {
		sc.checkpoint.GTID = ""
	}
	logger.Infof(sc.context.GetRuntimeContext(), "Checkpoint initialized: %v", sc.checkpoint)

	// Construct start synchronization position according to GTID or binlog-file.
	// Start with GTID if it exists and is valid, otherwise use binlog-file to
	// construct synchronization position.
	// If both of them are missing, get from server if StartFromBegining is not set.
	var gtid mysql.GTIDSet
	var startPos mysql.Position
	if sc.checkpoint.GTID != "" {
		gtid, err = mysql.ParseGTIDSet(sc.Flavor, sc.checkpoint.GTID)
		if err != nil {
			logger.Error(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", "Parse GTID error, clear it",
				sc.checkpoint.GTID, err)
			gtid = nil
			sc.checkpoint.GTID = ""
		}
	}
	if nil == gtid && sc.checkpoint.FileName != "" {
		startPos.Name = sc.checkpoint.FileName
		startPos.Pos = sc.checkpoint.Offset
	}
	if nil == gtid && 0 == len(startPos.Name) && !sc.StartFromBegining {
		if sc.isGTIDEnabled {
			gtid, err = sc.getLatestGTID()
			if err != nil {
				logger.Warning(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", "Call getLatestGTID failed, error", err)
			}
		}
		if gtid == nil {
			startPos = sc.GetBinlogLatestPos()
		}
		logger.Infof(sc.context.GetRuntimeContext(), "Get latest checkpoint GTID: %v Position: %v", gtid, startPos)
	}

	if gtid != nil {
		go sc.runCanalByGTID(gtid)
	} else {
		go sc.runCanal(startPos)
	}

ForBlock:
	for {
		select {
		case <-sc.shutdown:
			sc.canal.Close() // will trigger OnPosSynced with force=true
			logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because of shutting down, checkpoint",
				sc.checkpoint)
			<-sc.lastErrorChan
			return nil
		case err = <-sc.lastErrorChan:
			sc.canal.Close() // will trigger OnPosSynced with force=true

			if nil == err {
				logger.Info(sc.context.GetRuntimeContext(), "Canal returns normally, break loop")
				break ForBlock
			}

			// Sleep a while and process error.
			if util.RandomSleep(time.Second*5, 0.1, sc.shutdown) {
				logger.Info(sc.context.GetRuntimeContext(), "Shutdown is signaled during sleep, break loop")
				break ForBlock
			}
			errStr := err.Error()
			logger.Error(sc.context.GetRuntimeContext(), "CANAL_RUNTIME_ALARM", "Restart canal because of error", err)

			// Get latest position from server and restart.
			//
			// Risk of losing data: before plugin reconnects to new master after HA switching,
			// more requests were processed and gtid_executed has been updated on new master.
			// In such case, there is a gap between local checkpoint and latest checkpoint got
			// from server (data lossing).
			//
			// TODO: instead of getting from server, use local checkpoint.
			// This solution has to deal with cases that local checkpoint is invalid to
			// avoid infinite looping.
			// In detail, if user's configuration is wrong or local checkpoint is corrupted,
			// canal will return error and enter this logic again.
			var gtid mysql.GTIDSet
			var startPos mysql.Position
			if strings.Contains(errStr, "ERROR 1236") {
				logger.Infof(sc.context.GetRuntimeContext(), "Reset binlog with config %v, GTID mode status: %v",
					sc.config, sc.isGTIDEnabled)
				if sc.isGTIDEnabled {
					gtid, err = sc.getLatestGTID()
					if err != nil {
						logger.Warning(sc.context.GetRuntimeContext(), "CANAL_RUNTIME_ALARM",
							"getLatestGTID failed duration restarting", err)
					}
				}
				if nil == gtid {
					startPos = sc.GetBinlogLatestPos()
				}
			}
			shouldShutdown, err = sc.newCanal()
			if err != nil {
				logger.Info(sc.context.GetRuntimeContext(), "newCanal returns error, break loop", err)
				break ForBlock
			}
			if shouldShutdown {
				logger.Info(sc.context.GetRuntimeContext(), "Shutdown is signaled during newCanal, break loop")
				break ForBlock
			}
			if gtid != nil {
				go sc.runCanalByGTID(gtid)
			} else {
				go sc.runCanal(startPos)
			}
			continue ForBlock
		}
	}
	logger.Info(sc.context.GetRuntimeContext(), "Break from loop, waiting for shutting down, checkpoint", sc.checkpoint)
	<-sc.shutdown
	return err
}