in plugins/input/canal/input_canal.go [690:853]
func (sc *ServiceCanal) Start(c pipeline.Collector) error {
sc.lastErrorCount = 0
sc.shutdown = make(chan struct{}, 1)
sc.waitGroup.Add(1)
defer sc.waitGroup.Done()
sc.collector = c
shouldShutdown, err := sc.newCanal()
if err != nil {
logger.Error(sc.context.GetRuntimeContext(),
"CANAL_START_ALARM", "service_canal plugin only supports ROW mode", err)
return err
}
if shouldShutdown {
logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because shutdown is signaled during newCanal")
return nil
}
// Check if the GTID mode is available on server, if not, use binlog-file mode.
// Although we have added retry logic here, we can not deal with such case:
// sc.canal turns into invalid. In such case, this loop will run continuously and
// user will receive alarms (rare case, ignore it now).
for {
shouldRetry := false
sc.isGTIDEnabled, shouldRetry, err = sc.getGTIDMode()
if nil == err {
break
}
err = fmt.Errorf("Check GTID mode failed, error: %v", err)
logger.Warning(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", err.Error())
if shouldRetry {
if util.RandomSleep(time.Second*5, 0.1, sc.shutdown) {
sc.canal.Close()
logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because shutdown during getGTIDMode")
return nil
}
}
}
if sc.isGTIDEnabled {
logger.Info(sc.context.GetRuntimeContext(), "GTID mode is enabled, use it as checkpoint")
} else {
logger.Info(sc.context.GetRuntimeContext(), "GTID mode is not supported or disabled, use binlog-file as checkpoint")
}
// Initialize checkpoint, if server does not support or disable GTID mode,
// clear GTID in checkpoint.
sc.initCheckPoint()
if !sc.isGTIDEnabled && sc.checkpoint.GTID != "" {
sc.checkpoint.GTID = ""
}
logger.Infof(sc.context.GetRuntimeContext(), "Checkpoint initialized: %v", sc.checkpoint)
// Construct start synchronization position according to GTID or binlog-file.
// Start with GTID if it exists and is valid, otherwise use binlog-file to
// construct synchronization position.
// If both of them are missing, get from server if StartFromBegining is not set.
var gtid mysql.GTIDSet
var startPos mysql.Position
if sc.checkpoint.GTID != "" {
gtid, err = mysql.ParseGTIDSet(sc.Flavor, sc.checkpoint.GTID)
if err != nil {
logger.Error(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", "Parse GTID error, clear it",
sc.checkpoint.GTID, err)
gtid = nil
sc.checkpoint.GTID = ""
}
}
if nil == gtid && sc.checkpoint.FileName != "" {
startPos.Name = sc.checkpoint.FileName
startPos.Pos = sc.checkpoint.Offset
}
if nil == gtid && 0 == len(startPos.Name) && !sc.StartFromBegining {
if sc.isGTIDEnabled {
gtid, err = sc.getLatestGTID()
if err != nil {
logger.Warning(sc.context.GetRuntimeContext(), "CANAL_START_ALARM", "Call getLatestGTID failed, error", err)
}
}
if gtid == nil {
startPos = sc.GetBinlogLatestPos()
}
logger.Infof(sc.context.GetRuntimeContext(), "Get latest checkpoint GTID: %v Position: %v", gtid, startPos)
}
if gtid != nil {
go sc.runCanalByGTID(gtid)
} else {
go sc.runCanal(startPos)
}
ForBlock:
for {
select {
case <-sc.shutdown:
sc.canal.Close() // will trigger OnPosSynced with force=true
logger.Info(sc.context.GetRuntimeContext(), "service_canal quit because of shutting down, checkpoint",
sc.checkpoint)
<-sc.lastErrorChan
return nil
case err = <-sc.lastErrorChan:
sc.canal.Close() // will trigger OnPosSynced with force=true
if nil == err {
logger.Info(sc.context.GetRuntimeContext(), "Canal returns normally, break loop")
break ForBlock
}
// Sleep a while and process error.
if util.RandomSleep(time.Second*5, 0.1, sc.shutdown) {
logger.Info(sc.context.GetRuntimeContext(), "Shutdown is signaled during sleep, break loop")
break ForBlock
}
errStr := err.Error()
logger.Error(sc.context.GetRuntimeContext(), "CANAL_RUNTIME_ALARM", "Restart canal because of error", err)
// Get latest position from server and restart.
//
// Risk of losing data: before plugin reconnects to new master after HA switching,
// more requests were processed and gtid_executed has been updated on new master.
// In such case, there is a gap between local checkpoint and latest checkpoint got
// from server (data lossing).
//
// TODO: instead of getting from server, use local checkpoint.
// This solution has to deal with cases that local checkpoint is invalid to
// avoid infinite looping.
// In detail, if user's configuration is wrong or local checkpoint is corrupted,
// canal will return error and enter this logic again.
var gtid mysql.GTIDSet
var startPos mysql.Position
if strings.Contains(errStr, "ERROR 1236") {
logger.Infof(sc.context.GetRuntimeContext(), "Reset binlog with config %v, GTID mode status: %v",
sc.config, sc.isGTIDEnabled)
if sc.isGTIDEnabled {
gtid, err = sc.getLatestGTID()
if err != nil {
logger.Warning(sc.context.GetRuntimeContext(), "CANAL_RUNTIME_ALARM",
"getLatestGTID failed duration restarting", err)
}
}
if nil == gtid {
startPos = sc.GetBinlogLatestPos()
}
}
shouldShutdown, err = sc.newCanal()
if err != nil {
logger.Info(sc.context.GetRuntimeContext(), "newCanal returns error, break loop", err)
break ForBlock
}
if shouldShutdown {
logger.Info(sc.context.GetRuntimeContext(), "Shutdown is signaled during newCanal, break loop")
break ForBlock
}
if gtid != nil {
go sc.runCanalByGTID(gtid)
} else {
go sc.runCanal(startPos)
}
continue ForBlock
}
}
logger.Info(sc.context.GetRuntimeContext(), "Break from loop, waiting for shutting down, checkpoint", sc.checkpoint)
<-sc.shutdown
return err
}