function sqsMiddlewareFactory()

in lib/instrumentation/modules/@aws-sdk/client-sqs.js [40:200]


function sqsMiddlewareFactory(client, agent) {
  return [
    {
      middleware: (next, context) => async (args) => {
        const ins = agent._instrumentation;
        const log = agent.logger;
        const span = ins.currSpan();
        const input = args.input;

        // W3C trace-context propagation.
        const commandName = context.commandName.replace('Command', '');
        const runContext = ins.currRunContext();
        const parentSpan =
          span || runContext.currSpan() || runContext.currTransaction();

        if (parentSpan) {
          const toPropagate = [];

          if (commandName === 'SendMessage' && input.MessageAttributes) {
            toPropagate.push(input.MessageAttributes);
          } else if (
            commandName === 'SendMessageBatch' &&
            Array.isArray(input.Entries)
          ) {
            for (const e of input.Entries) {
              if (e && e.MessageAttributes) {
                toPropagate.push(e.MessageAttributes);
              }
            }
          }

          // Though our spec only mentions a 10-message-attribute limit for *SQS*, we'll
          // do the same limit here, because
          // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
          // mentions the 10-message-attribute limit for SQS subscriptions.
          toPropagate.forEach((msgAttrs) => {
            const attrsCount = Object.keys(msgAttrs).length + 2;
            if (attrsCount > MAX_SQS_MESSAGE_ATTRIBUTES) {
              log.warn(
                { QueueUrl: input.QueueUrl },
                'cannot propagate trace-context with SQS message, too many MessageAttributes',
              );
              return;
            }
            parentSpan.propagateTraceContextHeaders(
              msgAttrs,
              function (msgAttrs, name, value) {
                if (name.startsWith('elastic-')) {
                  return;
                }
                msgAttrs[name] = { DataType: 'String', StringValue: value };
              },
            );
          });
        }

        // Ensure there is a span from the wrapped `client.send()`.
        if (!span || !(span.type === TYPE && span.subtype === SUBTYPE)) {
          return await next(args);
        }

        // Action is not equal to command/operation name, we have to map it
        span.action = OPERATIONS_TO_ACTIONS[commandName] || 'unknown';

        const queueName = getQueueNameFromCommand(args);
        let toFrom = 'from';
        if (span.action === 'send' || span.action === 'send_batch') {
          toFrom = 'to';
        }
        span.name = `SQS ${span.action.toUpperCase()} ${toFrom} ${queueName}`;

        let err;
        let result;
        let response;
        let statusCode;
        try {
          result = await next(args);
          response = result && result.response;
          statusCode = response && response.statusCode;
        } catch (ex) {
          // Save the error for use in `finally` below, but re-throw it to
          // not impact code flow.
          err = ex;

          // This code path happens with a GetObject conditional request
          // that returns a 304 Not Modified.
          statusCode = err && err.$metadata && err.$metadata.httpStatusCode;
          throw ex;
        } finally {
          if (statusCode) {
            span._setOutcomeFromHttpStatusCode(statusCode);
          } else {
            span._setOutcomeFromErrorCapture(OUTCOME_FAILURE);
          }
          if (err && (!statusCode || statusCode >= 400)) {
            agent.captureError(err, { skipOutcome: true });
          }

          // Destination context.
          const region = await client.config.region();
          const service = { type: SUBTYPE };
          const destCtx = { service };

          if (context[elasticAPMStash]) {
            destCtx.address = context[elasticAPMStash].hostname;
            destCtx.port = context[elasticAPMStash].port;
          }

          if (region) {
            destCtx.cloud = { region };
          }

          span._setDestinationContext(destCtx);

          // Message context
          span.setMessageContext({ queue: { name: queueName } });

          const receiveMsgData =
            span.action === 'poll' && result && result.output;
          if (receiveMsgData) {
            // Links
            const links = getSpanLinksFromResponseData(result && result.output);
            if (links) {
              span.addLinks(links);
            }

            // Metrics
            recordMetrics(queueName, receiveMsgData, agent);
          }

          span.end();
        }

        return result;
      },
      options: { step: 'initialize', priority: 'high', name: 'elasticAPMSpan' },
    },
    {
      middleware: (next, context) => async (args) => {
        const req = args.request;
        let port = req.port;

        // Resolve port for HTTP(S) protocols
        if (port === undefined) {
          if (req.protocol === 'https:') {
            port = 443;
          } else if (req.protocol === 'http:') {
            port = 80;
          }
        }

        context[elasticAPMStash] = {
          hostname: req.hostname,
          port,
        };
        return next(args);
      },
      options: { step: 'finalizeRequest', name: 'elasticAPMHTTPInfo' },
    },
  ];
}