plugins/amqp/general_consumer.go (81 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package amqp import ( "fmt" "os" "strconv" "sync/atomic" "github.com/rabbitmq/amqp091-go" "github.com/apache/skywalking-go/plugins/core/operator" "github.com/apache/skywalking-go/plugins/core/tracing" ) const ( ConsumerComponentID = 145 amqpConsumerPrefix = "AMQP/" amqpConsumerSuffix = "/Consumer" tagMQConsumerTag = "mq.consumer_tag" tagMQReplyTo = "mq.reply_to" tagMQCorrelationID = "mq.correlation_id" tagMQArgs = "mq.args" consumerTagLengthMax = 0xFF ) var consumerSeq uint64 var queueConsumerTagMapping = make(map[string]string) func GeneralConsumersSendAfterInvoke(invocation operator.Invocation, results ...interface{}) error { if foundConsumer := results[0].(bool); !foundConsumer { return nil } consumerTag, _ := invocation.Args()[0].(string) delivery, _ := invocation.Args()[1].(*Delivery) operationName := amqpConsumerPrefix + queueConsumerTagMapping[consumerTag] + "/" + consumerTag + amqpConsumerSuffix channel, _ := delivery.Acknowledger.(*nativeChannel) peer := getPeerInfo(channel.connection) span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) { header, _ := delivery.Headers[headerKey].(string) return header, nil }, tracing.WithLayer(tracing.SpanLayerMQ), tracing.WithComponent(ConsumerComponentID), tracing.WithTag(tracing.TagMQBroker, peer), tracing.WithTag(tracing.TagMQQueue, queueConsumerTagMapping[consumerTag]), tracing.WithTag(tracing.TagMQMsgID, delivery.MessageId), tracing.WithTag(tagMQConsumerTag, consumerTag), tracing.WithTag(tagMQCorrelationID, delivery.CorrelationId), tracing.WithTag(tagMQReplyTo, delivery.ReplyTo), tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", delivery.Headers)), ) if err != nil { return err } span.SetPeer(peer) span.End() return nil } func GeneralConsumerBeforeInvoke(invocation operator.Invocation, args amqp091.Table) error { queue := invocation.Args()[0].(string) consumerTag := invocation.Args()[1].(string) if consumerTag == "" { consumerTag = uniqueConsumerTag() } queueConsumerTagMapping[consumerTag] = queue return nil } func GeneralConsumerCloseBeforeInvoke(invocation operator.Invocation) error { consumers, _ := invocation.CallerInstance().(*nativeConsumers) consumers.Lock() defer consumers.Unlock() for consumerTag := range consumers.chans { delete(queueConsumerTagMapping, consumerTag) } return nil } func uniqueConsumerTag() string { return commandNameBasedUniqueConsumerTag(os.Args[0]) } func commandNameBasedUniqueConsumerTag(commandName string) string { tagPrefix := "ctag-" tagInfix := commandName tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10) if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax { tagInfix = "streadway/amqp" } return tagPrefix + tagInfix + tagSuffix }