func()

in internal/satellite/module/sender/sender.go [110:146]


func (s *Sender) store(ctx context.Context, partition int, wg *sync.WaitGroup) {
	defer wg.Done()
	defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store routine closed")
	childCtx, _ := context.WithCancel(ctx) // nolint
	flushTime := s.config.FlushTime
	if flushTime <= 0 {
		flushTime = defaultSenderFlushTime
	}
	timeTicker := time.NewTicker(time.Duration(flushTime) * time.Millisecond)
	for {
		// blocking output when disconnecting.
		if atomic.LoadInt32(&s.blocking) == 1 {
			time.Sleep(100 * time.Millisecond)
			log.Logger.WithField("pipe", s.config.PipeName).
				Debugf("the client connection is disconnect, blocking the buffer")
			continue
		}
		select {
		case <-childCtx.Done():
			return
		case <-timeTicker.C:
			if s.buffers[partition].Len() >= s.config.MinFlushEvents {
				s.flushChannel[partition] <- s.buffers[partition]
				s.buffers[partition] = buffer.NewBatchBuffer(s.config.MaxBufferSize)
			}
		case e := <-s.inputs[partition]:
			if e == nil {
				continue
			}
			s.buffers[partition].Add(e)
			if s.buffers[partition].Len() == s.config.MaxBufferSize {
				s.flushChannel[partition] <- s.buffers[partition]
				s.buffers[partition] = buffer.NewBatchBuffer(s.config.MaxBufferSize)
			}
		}
	}
}