func()

in v2/processor.go [202:239]


func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
	logger := getLogger(ctx)
	receiverName := receiverEx.name
	receiver := receiverEx.sbReceiver
	logger.Info(fmt.Sprintf("starting processor %s", receiverName))
	messages, err := receiver.ReceiveMessages(ctx, p.options.MaxReceiveCount, nil)
	if err != nil {
		return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
	}
	logger.Info(fmt.Sprintf("processor %s received %d messages - initial", receiverName, len(messages)))
	processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
	for _, msg := range messages {
		p.process(ctx, receiverEx, msg)
	}
	for ctx.Err() == nil {
		select {
		case <-time.After(*p.options.ReceiveInterval):
			maxMessages := min(p.options.MaxReceiveCount, p.options.MaxConcurrency-len(p.concurrencyTokens))
			if ctx.Err() != nil || maxMessages == 0 {
				break
			}
			messages, err := receiver.ReceiveMessages(ctx, maxMessages, nil)
			if err != nil {
				return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
			}
			logger.Info(fmt.Sprintf("processor %s received %d messages from processor loop", receiverName, len(messages)))
			processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
			for _, msg := range messages {
				p.process(ctx, receiverEx, msg)
			}
		case <-ctx.Done():
			logger.Info(fmt.Sprintf("context done, stop receiving from processor %s", receiverName))
			break
		}
	}
	logger.Info(fmt.Sprintf("exiting processor %s", receiverName))
	return fmt.Errorf("processor %s stopped: %w", receiverName, ctx.Err())
}