in v2/processor.go [202:239]
func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
logger := getLogger(ctx)
receiverName := receiverEx.name
receiver := receiverEx.sbReceiver
logger.Info(fmt.Sprintf("starting processor %s", receiverName))
messages, err := receiver.ReceiveMessages(ctx, p.options.MaxReceiveCount, nil)
if err != nil {
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
logger.Info(fmt.Sprintf("processor %s received %d messages - initial", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, receiverEx, msg)
}
for ctx.Err() == nil {
select {
case <-time.After(*p.options.ReceiveInterval):
maxMessages := min(p.options.MaxReceiveCount, p.options.MaxConcurrency-len(p.concurrencyTokens))
if ctx.Err() != nil || maxMessages == 0 {
break
}
messages, err := receiver.ReceiveMessages(ctx, maxMessages, nil)
if err != nil {
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
logger.Info(fmt.Sprintf("processor %s received %d messages from processor loop", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, receiverEx, msg)
}
case <-ctx.Done():
logger.Info(fmt.Sprintf("context done, stop receiving from processor %s", receiverName))
break
}
}
logger.Info(fmt.Sprintf("exiting processor %s", receiverName))
return fmt.Errorf("processor %s stopped: %w", receiverName, ctx.Err())
}