plugins/segmentio-kafka/reader_interceptor.go (48 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package segmentiokafka import ( "strings" "github.com/segmentio/kafka-go" "github.com/apache/skywalking-go/plugins/core/operator" "github.com/apache/skywalking-go/plugins/core/tracing" ) const ( kafkaReaderPrefix = "Kafka/" kafkaReaderSuffix = "/Consumer" kafkaReaderComponentID = 41 semicolon = ";" ) type ReaderInterceptor struct { } func (r *ReaderInterceptor) BeforeInvoke(invocation operator.Invocation) error { return nil } func (r *ReaderInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { reader := invocation.CallerInstance().(*kafka.Reader) brokers := strings.Join(reader.Config().Brokers, semicolon) message := result[0].(kafka.Message) topic := message.Topic operationName := kafkaReaderPrefix + topic + kafkaReaderSuffix span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) { for _, header := range message.Headers { if header.Key == headerKey { return string(header.Value), nil } } return "", nil }, tracing.WithLayer(tracing.SpanLayerMQ), tracing.WithComponent(kafkaReaderComponentID), tracing.WithTag(tracing.TagMQBroker, brokers), tracing.WithTag(tracing.TagMQTopic, topic), ) if err != nil { return err } if err, ok := result[1].(error); ok { span.Tag(tracing.TagMQStatus, err.Error()) span.Error(err.Error()) } span.SetPeer(brokers) span.End() return nil }