in internal/satellite/module/sender/sender.go [193:222]
func (s *Sender) shutdown0() {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
for _, in := range s.inputs {
close(in)
}
var wg sync.WaitGroup
finished := make(chan struct{}, 1)
wg.Add(len(s.flushChannel))
for partition := range s.buffers {
go func(p int) {
defer wg.Done()
s.consume(s.buffers[p])
}(partition)
}
go func() {
wg.Wait()
close(finished)
}()
ticker := time.NewTicker(module.ShutdownHookTime)
select {
case <-ticker.C:
for _, buffer := range s.buffers {
s.consume(buffer)
}
return
case <-finished:
return
}
}