in pkg/run/run.go [348:433]
func (g *Group) Run(ctx context.Context) (err error) {
// run config registration and flag parsing stages
if interrupted, errRun := g.RunConfig(); interrupted || errRun != nil {
return errRun
}
defer func() {
if err != nil {
g.log.Fatal().Err(err).Stack().Msg("unexpected exit")
}
}()
rr := make([]databasev1.Role, 0, len(g.rr))
for idx := range g.rr {
if g.rr[idx] == nil || g.rr[idx].Role() == databasev1.Role_ROLE_UNSPECIFIED {
continue
}
rr = append(rr, g.rr[idx].Role())
}
// Sort and deduplicate roles
sort.Slice(rr, func(i, j int) bool {
return rr[i] < rr[j]
})
deduplicateRoles := func(rr []databasev1.Role) []databasev1.Role {
if len(rr) < 2 {
return rr
}
// deduplicate roles
rrDedup := make([]databasev1.Role, 0, len(rr))
for i := 0; i < len(rr)-1; i++ {
if rr[i] != rr[i+1] {
rrDedup = append(rrDedup, rr[i])
}
}
rrDedup = append(rrDedup, rr[len(rr)-1])
return rrDedup
}
rr = deduplicateRoles(rr)
g.log.Info().Interface("roles", rr).Msg("the node will run as")
if ctx == nil {
ctx = context.Background()
}
// execute pre run stage and exit on error
for idx := range g.p {
// a PreRunner might have been deregistered during Run
if g.p[idx] == nil {
continue
}
startTime := time.Now()
if err := g.p[idx].PreRun(context.WithValue(ctx, common.ContextNodeRolesKey, rr)); err != nil {
return errors.WithMessage(err, fmt.Sprintf("pre-run module[%s]", g.p[idx].Name()))
}
g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", g.p[idx].Name()).Msg("pre-run completed")
}
swg := &sync.WaitGroup{}
swg.Add(len(g.s))
go func() {
swg.Wait()
close(g.readyCh)
}()
// feed our registered services to our internal run.Group
for idx := range g.s {
// a Service might have been deregistered during Run
s := g.s[idx]
if s == nil {
continue
}
g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("serve")
g.r.Add(func() error {
notify := s.Serve()
swg.Done()
<-notify
return nil
}, func(_ error) {
g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stopping")
startTime := time.Now()
s.GracefulStop()
g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", s.Name()).Msg("stopped")
})
}
// start registered services and block
return g.r.Run()
}