in internal/client.go [413:539]
func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
atomic.AddInt32(&c.instanceCount, 1)
c.once.Do(func() {
if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
}
go primitive.WithRecover(func() {
op := func() {
c.GetNameSrv().UpdateNameServerAddress()
}
time.Sleep(10 * time.Second)
op()
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
op()
case <-c.done:
rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
})
// schedule update route info
go primitive.WithRecover(func() {
// delay
op := func() {
c.UpdateTopicRouteInfo()
}
time.Sleep(10 * time.Millisecond)
op()
ticker := time.NewTicker(_PullNameServerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
op()
case <-c.done:
rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
})
go primitive.WithRecover(func() {
op := func() {
c.GetNameSrv().cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
}
time.Sleep(time.Second)
op()
ticker := time.NewTicker(_HeartbeatBrokerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
op()
case <-c.done:
rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
})
// schedule persist offset
go primitive.WithRecover(func() {
op := func() {
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
err := consumer.PersistConsumerOffset()
if err != nil {
rlog.Error("persist offset failed", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
return true
})
}
time.Sleep(10 * time.Second)
op()
ticker := time.NewTicker(_PersistOffsetInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
op()
case <-c.done:
rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
})
go primitive.WithRecover(func() {
ticker := time.NewTicker(_RebalanceInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.RebalanceIfNotPaused()
case <-c.done:
rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
})
})
}