func()

in streamer/streamer.go [224:310]


func (s *Streamer) start(cfg *config.AppConfig) bool {
	var err error

	s.clusterLock = lock.Create(state.GetDBAddr())
	defer s.clusterLock.Close()

	h, _ := os.Hostname()
	w := log.GenWorkerID()
	s.workerID = h + "." + w

	s.row, err = state.GetTableTask(s.workerID, cfg.LockExpireTimeout)
	if err != nil {
		if util.MySQLError(err, mysql.ER_LOCK_WAIT_TIMEOUT) {
			return true
		}
		log.E(err)
		return false
	}

	if !s.lockTable() {
		log.Debugf("Finished streamer: No free tables to work on")
		return false
	}

	s.log = log.WithFields(log.Fields{"worker_id": w, "service": s.row.Service, "db": s.row.DB, "table": s.row.Table, "version": s.row.Version})
	s.metrics = metrics.NewStreamerMetrics(s.getTag())

	s.log.Debugf("Started event streamer")

	s.metrics.NumWorkers.Inc()
	defer func() {
		if err != nil {
			s.metrics.Errors.Inc(1)
		}
		s.metrics.NumWorkers.Dec()
	}()

	// Event Streamer worker has successfully acquired a lock on a table. Proceed further
	// Each Event Streamer handles events from all partitions from Input buffer for a table
	s.topic, err = cfg.GetOutputTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
	if log.E(err) {
		return false
	}
	s.stateUpdateInterval = cfg.StateUpdateInterval

	s.log.Debugf("Will be streaming to output topic: %v", s.topic)

	consumer, res := s.setupChangelogConsumer(cfg)
	if !res {
		return false
	}

	s.outEncoder, err = encoder.Create(s.row.OutputFormat, s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
	if log.EL(s.log, err) {
		if consumer != nil {
			log.E(consumer.CloseOnFailure())
		}
		return false
	}

	if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.streamFromConsistentSnapshot(cfg.ThrottleTargetMB, cfg.ThrottleTargetIOPS) {
		if consumer != nil {
			log.E(consumer.CloseOnFailure())
		}
		return false
	}

	s.clusterLock.Close() // ClusterConcurrency is limited for the duration of snapshot only

	if cfg.ChangelogBuffer {
		//Transit format encoder, aka envelope encoder
		//It must be per table to be able to decode schematized events
		s.envEncoder, err = encoder.Create(encoder.Internal.Type(), s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
		if log.EL(s.log, err) {
			log.E(consumer.CloseOnFailure())
			return false
		}

		if !s.streamTable(consumer) {
			s.metrics.Errors.Inc(1)
		}
	}

	s.log.Debugf("Finished streamer")

	return true
}