in src/checkoutservice/main.go [492:557]
func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) {
message, err := proto.Marshal(result)
if err != nil {
log.Errorf("Failed to marshal message to protobuf: %+v", err)
return
}
msg := sarama.ProducerMessage{
Topic: kafka.Topic,
Value: sarama.ByteEncoder(message),
}
// Inject tracing info into message
span := createProducerSpan(ctx, &msg)
defer span.End()
// Send message and handle response
startTime := time.Now()
select {
case cs.KafkaProducerClient.Input() <- &msg:
log.Infof("Message sent to Kafka: %v", msg)
select {
case successMsg := <-cs.KafkaProducerClient.Successes():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", true),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))),
)
log.Infof("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime))
case errMsg := <-cs.KafkaProducerClient.Errors():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, errMsg.Err.Error())
log.Errorf("Failed to write message: %v", errMsg.Err)
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error())
log.Warnf("Context canceled before success message received: %v", ctx.Err())
}
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error())
log.Errorf("Failed to send message to Kafka within context deadline: %v", ctx.Err())
return
}
ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
}