func()

in internal/satellite/module/sender/sender.go [193:222]


func (s *Sender) shutdown0() {
	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
	for _, in := range s.inputs {
		close(in)
	}
	var wg sync.WaitGroup
	finished := make(chan struct{}, 1)
	wg.Add(len(s.flushChannel))
	for partition := range s.buffers {
		go func(p int) {
			defer wg.Done()
			s.consume(s.buffers[p])
		}(partition)
	}
	go func() {
		wg.Wait()
		close(finished)
	}()

	ticker := time.NewTicker(module.ShutdownHookTime)
	select {
	case <-ticker.C:
		for _, buffer := range s.buffers {
			s.consume(buffer)
		}
		return
	case <-finished:
		return
	}
}