func()

in internal/client.go [413:539]


func (c *rmqClient) Start() {
	//ctx, cancel := context.WithCancel(context.Background())
	//c.cancel = cancel
	atomic.AddInt32(&c.instanceCount, 1)
	c.once.Do(func() {
		if !c.option.Credentials.IsEmpty() {
			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
		}
		go primitive.WithRecover(func() {
			op := func() {
				c.GetNameSrv().UpdateNameServerAddress()
			}
			time.Sleep(10 * time.Second)
			op()

			ticker := time.NewTicker(2 * time.Minute)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		// schedule update route info
		go primitive.WithRecover(func() {
			// delay
			op := func() {
				c.UpdateTopicRouteInfo()
			}
			time.Sleep(10 * time.Millisecond)
			op()

			ticker := time.NewTicker(_PullNameServerInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		go primitive.WithRecover(func() {
			op := func() {
				c.GetNameSrv().cleanOfflineBroker()
				c.SendHeartbeatToAllBrokerWithLock()
			}

			time.Sleep(time.Second)
			op()

			ticker := time.NewTicker(_HeartbeatBrokerInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		// schedule persist offset
		go primitive.WithRecover(func() {
			op := func() {
				c.consumerMap.Range(func(key, value interface{}) bool {
					consumer := value.(InnerConsumer)
					err := consumer.PersistConsumerOffset()
					if err != nil {
						rlog.Error("persist offset failed", map[string]interface{}{
							rlog.LogKeyUnderlayError: err,
						})
					}
					return true
				})
			}
			time.Sleep(10 * time.Second)
			op()

			ticker := time.NewTicker(_PersistOffsetInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		go primitive.WithRecover(func() {
			ticker := time.NewTicker(_RebalanceInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					c.RebalanceIfNotPaused()
				case <-c.done:
					rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})
	})
}