pkg/rules/amqp091/consume_setup.go (56 lines of code) (raw):
// Copyright (c) 2025 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package amqp091
import (
"context"
_ "unsafe"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/api"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
//go:linkname consumeOnEnter github.com/rabbitmq/amqp091-go.consumeOnEnter
func consumeOnEnter(call api.CallContext,
_ interface{},
tag string,
msg *amqp.Delivery,
) {
request := RabbitRequest{
operationName: "receive",
destinationName: msg.Exchange + ":" + msg.RoutingKey,
messageId: msg.MessageId,
bodySize: int64(len(msg.Body)),
conversationID: msg.CorrelationId,
headers: msg.Headers,
}
ctx := context.Background()
var attributes []attribute.KeyValue
attributes = append(attributes,
semconv.MessagingRabbitmqDestinationRoutingKey(msg.RoutingKey),
semconv.MessagingRabbitmqMessageDeliveryTag(int(msg.DeliveryTag)), attribute.KeyValue{
Key: semconv.MessagingOperationTypeKey,
Value: attribute.StringValue(request.operationName),
}, attribute.KeyValue{
Key: "messaging.rabbitmq.message.consumer_tag",
Value: attribute.StringValue(msg.ConsumerTag),
},
)
ctx = RabbitMQConsumeInstrumenter.Start(ctx, request, trace.WithAttributes(attributes...))
data := make(map[string]interface{})
data["ctx"] = ctx
data["rabbitMQ_consume_request"] = request
call.SetData(data)
}
//go:linkname consumeOnExit github.com/rabbitmq/amqp091-go.consumeOnExit
func consumeOnExit(call api.CallContext, b bool) {
data, ok := call.GetData().(map[string]interface{})
if !ok {
return
}
ctx, ok := data["ctx"].(context.Context)
if !ok {
return
}
request, ok := data["rabbitMQ_consume_request"].(RabbitRequest)
if !ok {
return
}
RabbitMQConsumeInstrumenter.End(ctx, request, nil, nil)
}