in streamer/streamer.go [224:310]
func (s *Streamer) start(cfg *config.AppConfig) bool {
var err error
s.clusterLock = lock.Create(state.GetDBAddr())
defer s.clusterLock.Close()
h, _ := os.Hostname()
w := log.GenWorkerID()
s.workerID = h + "." + w
s.row, err = state.GetTableTask(s.workerID, cfg.LockExpireTimeout)
if err != nil {
if util.MySQLError(err, mysql.ER_LOCK_WAIT_TIMEOUT) {
return true
}
log.E(err)
return false
}
if !s.lockTable() {
log.Debugf("Finished streamer: No free tables to work on")
return false
}
s.log = log.WithFields(log.Fields{"worker_id": w, "service": s.row.Service, "db": s.row.DB, "table": s.row.Table, "version": s.row.Version})
s.metrics = metrics.NewStreamerMetrics(s.getTag())
s.log.Debugf("Started event streamer")
s.metrics.NumWorkers.Inc()
defer func() {
if err != nil {
s.metrics.Errors.Inc(1)
}
s.metrics.NumWorkers.Dec()
}()
// Event Streamer worker has successfully acquired a lock on a table. Proceed further
// Each Event Streamer handles events from all partitions from Input buffer for a table
s.topic, err = cfg.GetOutputTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
if log.E(err) {
return false
}
s.stateUpdateInterval = cfg.StateUpdateInterval
s.log.Debugf("Will be streaming to output topic: %v", s.topic)
consumer, res := s.setupChangelogConsumer(cfg)
if !res {
return false
}
s.outEncoder, err = encoder.Create(s.row.OutputFormat, s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
if log.EL(s.log, err) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return false
}
if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.streamFromConsistentSnapshot(cfg.ThrottleTargetMB, cfg.ThrottleTargetIOPS) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return false
}
s.clusterLock.Close() // ClusterConcurrency is limited for the duration of snapshot only
if cfg.ChangelogBuffer {
//Transit format encoder, aka envelope encoder
//It must be per table to be able to decode schematized events
s.envEncoder, err = encoder.Create(encoder.Internal.Type(), s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
if log.EL(s.log, err) {
log.E(consumer.CloseOnFailure())
return false
}
if !s.streamTable(consumer) {
s.metrics.Errors.Inc(1)
}
}
s.log.Debugf("Finished streamer")
return true
}