in internal/satellite/module/sender/sender.go [109:145]
func (s *Sender) store(ctx context.Context, partition int, wg *sync.WaitGroup) {
defer wg.Done()
defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store routine closed")
childCtx, _ := context.WithCancel(ctx) // nolint
flushTime := s.config.FlushTime
if flushTime <= 0 {
flushTime = defaultSenderFlushTime
}
timeTicker := time.NewTicker(time.Duration(flushTime) * time.Millisecond)
for {
// blocking output when disconnecting.
if atomic.LoadInt32(&s.blocking) == 1 {
time.Sleep(100 * time.Millisecond)
log.Logger.WithField("pipe", s.config.PipeName).
Debugf("the client connection is disconnect, blocking the buffer")
continue
}
select {
case <-childCtx.Done():
return
case <-timeTicker.C:
if s.buffers[partition].Len() >= s.config.MinFlushEvents {
s.flushChannel[partition] <- s.buffers[partition]
s.buffers[partition] = buffer.NewBatchBuffer(s.config.MaxBufferSize)
}
case e := <-s.inputs[partition]:
if e == nil {
continue
}
s.buffers[partition].Add(e)
if s.buffers[partition].Len() == s.config.MaxBufferSize {
s.flushChannel[partition] <- s.buffers[partition]
s.buffers[partition] = buffer.NewBatchBuffer(s.config.MaxBufferSize)
}
}
}
}