func()

in pkg/run/run.go [348:433]


func (g *Group) Run(ctx context.Context) (err error) {
	// run config registration and flag parsing stages
	if interrupted, errRun := g.RunConfig(); interrupted || errRun != nil {
		return errRun
	}
	defer func() {
		if err != nil {
			g.log.Fatal().Err(err).Stack().Msg("unexpected exit")
		}
	}()
	rr := make([]databasev1.Role, 0, len(g.rr))
	for idx := range g.rr {
		if g.rr[idx] == nil || g.rr[idx].Role() == databasev1.Role_ROLE_UNSPECIFIED {
			continue
		}
		rr = append(rr, g.rr[idx].Role())
	}
	// Sort and deduplicate roles
	sort.Slice(rr, func(i, j int) bool {
		return rr[i] < rr[j]
	})
	deduplicateRoles := func(rr []databasev1.Role) []databasev1.Role {
		if len(rr) < 2 {
			return rr
		}
		// deduplicate roles
		rrDedup := make([]databasev1.Role, 0, len(rr))
		for i := 0; i < len(rr)-1; i++ {
			if rr[i] != rr[i+1] {
				rrDedup = append(rrDedup, rr[i])
			}
		}
		rrDedup = append(rrDedup, rr[len(rr)-1])
		return rrDedup
	}
	rr = deduplicateRoles(rr)

	g.log.Info().Interface("roles", rr).Msg("the node will run as")

	if ctx == nil {
		ctx = context.Background()
	}
	// execute pre run stage and exit on error
	for idx := range g.p {
		// a PreRunner might have been deregistered during Run
		if g.p[idx] == nil {
			continue
		}
		startTime := time.Now()
		if err := g.p[idx].PreRun(context.WithValue(ctx, common.ContextNodeRolesKey, rr)); err != nil {
			return errors.WithMessage(err, fmt.Sprintf("pre-run module[%s]", g.p[idx].Name()))
		}
		g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", g.p[idx].Name()).Msg("pre-run completed")
	}

	swg := &sync.WaitGroup{}
	swg.Add(len(g.s))
	go func() {
		swg.Wait()
		close(g.readyCh)
	}()
	// feed our registered services to our internal run.Group
	for idx := range g.s {
		// a Service might have been deregistered during Run
		s := g.s[idx]
		if s == nil {
			continue
		}

		g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("serve")
		g.r.Add(func() error {
			notify := s.Serve()
			swg.Done()
			<-notify
			return nil
		}, func(_ error) {
			g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stopping")
			startTime := time.Now()
			s.GracefulStop()
			g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", s.Name()).Msg("stopped")
		})
	}

	// start registered services and block
	return g.r.Run()
}