function SQS()

in src/plugins/AWS2SQSPlugin.ts [40:167]


    function SQS(this: any) {
      const sqs = _SQS.apply(this, arguments);

      function instrumentSend(name: string, addTraceId: any): void {
        const _func = sqs[name];

        sqs[name] = function (params: any, callback: any) {
          const queueUrl = params.QueueUrl;
          const operation = `AWS/SQS/${name}/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`;
          const span = ContextManager.current.newExitSpan(operation, Component.AWS_SQS, Component.HTTP);

          span.component = Component.AWS_SQS;
          span.layer = SpanLayer.MQ;

          return execute(span, this, _func, addTraceId(params, span), callback, 'mqBroker');
        };
      }

      instrumentSend('sendMessage', (params: any, span: Span) => {
        params = Object.assign({}, params);
        params.MessageAttributes = params.MessageAttributes ? Object.assign({}, params.MessageAttributes) : {};
        params.MessageAttributes.__revdTraceId = {
          DataType: 'String',
          StringValue: `${span.inject().value}/${hostname()}`,
        };

        return params;
      });

      instrumentSend('sendMessageBatch', (params: any, span: Span) => {
        const traceId = { __revdTraceId: { DataType: 'String', StringValue: `${span.inject().value}/${hostname()}` } };
        params = Object.assign({}, params);
        params.Entries = params.Entries.map(
          (e: any) =>
            (e = Object.assign({}, e, {
              MessageAttributes: e.MessageAttributes ? Object.assign({}, e.MessageAttributes, traceId) : traceId,
            })),
        );

        return params;
      });

      const _receiveMessage = sqs.receiveMessage;

      sqs.receiveMessage = function (params: any, callback: any) {
        params = Object.assign({}, params);
        const _MessageAttributeNames = params.MessageAttributeNames;
        params.MessageAttributeNames = _MessageAttributeNames
          ? _MessageAttributeNames.concat(['__revdTraceId'])
          : ['__revdTraceId'];

        delete params.MaxNumberOfMessages; // limit to 1 message in order to be able to link all Exit and Entry spans

        const queueUrl = params.QueueUrl;
        const operation = `AWS/SQS/receiveMessage/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`;
        const span = ContextManager.current.newExitSpan(`${operation}<check>`, Component.AWS_SQS, Component.HTTP);

        span.component = Component.AWS_SQS;
        span.layer = SpanLayer.MQ;

        // should always be called on success only, with no err
        function beforeCB(this: any, span: Span, err: any, res: any): Span {
          if (res.Messages?.length) {
            const delall = !_MessageAttributeNames || !_MessageAttributeNames.length;
            let traceId;

            // should only be 1
            for (let msg of res.Messages) {
              if (msg.MessageAttributes !== undefined || !config.awsSQSCheckBody)
                traceId = msg.MessageAttributes?.__revdTraceId?.StringValue;
              else {
                try {
                  msg = JSON.parse(msg.Body);
                  traceId = msg.MessageAttributes?.__revdTraceId?.Value;
                } catch {
                  // NOOP
                }
              }

              if (traceId) {
                if (delall) {
                  delete msg.MD5OfMessageAttributes;
                  delete msg.MessageAttributes;
                } else {
                  delete msg.MessageAttributes.__revdTraceId;

                  if (!Object.keys(msg.MessageAttributes).length) {
                    delete msg.MD5OfMessageAttributes;
                    delete msg.MessageAttributes;
                  }
                }
              }
            }

            let peer = 'Unknown';
            let carrier: ContextCarrier | undefined = undefined;

            if (traceId) {
              const idx = traceId.lastIndexOf('/');

              if (idx !== -1) {
                peer = traceId.slice(idx + 1);
                traceId = traceId.slice(0, idx);
                carrier = ContextCarrier.from({ sw8: traceId });
              }
            }

            span.stop();

            span = ContextManager.current.newEntrySpan(operation, carrier);

            span.component = Component.AWS_SQS;
            span.layer = SpanLayer.MQ;
            span.peer = peer;

            span.tag(Tag.mqBroker(queueUrl));

            span.start();
          }

          return span;
        }

        return execute(span, this, _receiveMessage, params, callback, 'mqBroker', beforeCB);
      };

      return sqs;
    }