lib/instrumentation/modules/aws-sdk/sqs.js (288 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and other contributors where applicable. * Licensed under the BSD 2-Clause License; you may not use this file except in * compliance with the BSD 2-Clause License. */ 'use strict'; const { URL } = require('url'); const { MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT, } = require('../../../constants'); const OPERATIONS_TO_ACTIONS = { deleteMessage: 'delete', deleteMessageBatch: 'delete_batch', receiveMessage: 'poll', sendMessageBatch: 'send_batch', sendMessage: 'send', unknown: 'unknown', }; const OPERATIONS = Object.keys(OPERATIONS_TO_ACTIONS); const TYPE = 'messaging'; const SUBTYPE = 'sqs'; const queueMetrics = new Map(); const MAX_SQS_MESSAGE_ATTRIBUTES = 10; // Returns Message Queue action from AWS SDK method name function getActionFromRequest(request) { request = request || {}; const operation = request.operation ? request.operation : 'unknown'; const action = OPERATIONS_TO_ACTIONS[operation]; return action; } // Returns preposition to use in span name // // POLL from ... // SEND to ... function getToFromFromOperation(operation) { let result = 'from'; if (operation === 'sendMessage' || operation === 'sendMessageBatch') { result = 'to'; } return result; } // Parses queue/topic name from AWS queue URL function getQueueNameFromRequest(request) { const unknown = 'unknown'; if (!request || !request.params || !request.params.QueueUrl) { return unknown; } try { const url = new URL(request.params.QueueUrl); return url.pathname.split('/').pop(); } catch (e) { return unknown; } } // Parses region name from AWS service configuration function getRegionFromRequest(request) { const region = request && request.service && request.service.config && request.service.config.region; return region || ''; } function getAddressFromRequest(request) { return ( request && request.service && request.service.endpoint && request.service.endpoint.hostname ); } function getPortFromRequest(request) { return ( request && request.service && request.service.endpoint && request.service.endpoint.port ); } // Creates message destination context suitable for setDestinationContext function getMessageDestinationContextFromRequest(request) { const destination = { address: getAddressFromRequest(request), port: getPortFromRequest(request), cloud: { region: getRegionFromRequest(request), }, }; return destination; } // create message context suitable for setMessageContext function getMessageContextFromRequest(request) { const message = { queue: { name: getQueueNameFromRequest(request), }, }; return message; } // Record queue related metrics // // Creates metric collector objects on first run, and // updates their data with data from received messages function recordMetrics(queueName, data, agent) { const messages = data && data.Messages; if (!messages || messages.length < 1) { return; } if (!queueMetrics.get(queueName)) { const collector = agent._metrics.createQueueMetricsCollector(queueName); if (!collector) { return; } queueMetrics.set(queueName, collector); } const metrics = queueMetrics.get(queueName); for (const message of messages) { const sentTimestamp = message.Attributes && message.Attributes.SentTimestamp; const delay = new Date().getTime() - sentTimestamp; metrics.updateStats(delay); } } // Creates the span name from request information function getSpanNameFromRequest(request) { const action = getActionFromRequest(request); const toFrom = getToFromFromOperation(request.operation); const queueName = getQueueNameFromRequest(request); const name = `${SUBTYPE.toUpperCase()} ${action.toUpperCase()} ${toFrom} ${queueName}`; return name; } // Extract span links from up to 1000 messages in this batch. // https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#receiving-trace-context // // A span link is created from a `traceparent` message attribute in a message. // `msg.messageAttributes` is of the form: // { <attribute-name>: { DataType: <attr-type>, StringValue: <attr-value>, ... } } // For example: // { traceparent: { DataType: 'String', StringValue: 'test-traceparent' } } function getSpanLinksFromResponseData(data) { if (!data || !data.Messages || data.Messages.length === 0) { return null; } const links = []; const limit = Math.min( data.Messages.length, MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT, ); for (let i = 0; i < limit; i++) { const attrs = data.Messages[i].MessageAttributes; if (!attrs) { continue; } let traceparent; const attrNames = Object.keys(attrs); for (let j = 0; j < attrNames.length; j++) { const attrVal = attrs[attrNames[j]]; if (attrVal.DataType !== 'String') { continue; } const attrNameLc = attrNames[j].toLowerCase(); if (attrNameLc === 'traceparent') { traceparent = attrVal.StringValue; break; } } if (traceparent) { links.push({ context: traceparent }); } } return links; } function shouldIgnoreRequest(request, agent) { const operation = request && request.operation; // are we interested in this operation/method call? if (OPERATIONS.indexOf(operation) === -1) { return true; } // is the named queue on our ignore list? if (agent._conf && agent._conf.ignoreMessageQueuesRegExp) { const queueName = getQueueNameFromRequest(request); for (const rule of agent._conf.ignoreMessageQueuesRegExp) { if (rule.test(queueName)) { return true; } } } return false; } function propagateTraceContextInMessageAttributes( parentSpan, msgParams, log, queueUrl, ) { const msgAttrs = msgParams.MessageAttributes; if ( msgAttrs && Object.keys(msgAttrs).length + 2 > MAX_SQS_MESSAGE_ATTRIBUTES ) { log.warn( { QueueUrl: queueUrl }, 'cannot propagate trace-context with SQS message, too many MessageAttributes', ); } else { parentSpan.propagateTraceContextHeaders( msgAttrs, function (msgAttrs, name, value) { if (name.startsWith('elastic-')) { return; } msgAttrs[name] = { DataType: 'String', StringValue: value }; }, ); } } // Main entrypoint for SQS instrumentation // // Must call (or one of its function calls must call) the // `orig` function/method function instrumentationSqs( orig, origArguments, request, AWS, agent, { version, enabled }, ) { if (shouldIgnoreRequest(request, agent)) { return orig.apply(request, origArguments); } const ins = agent._instrumentation; const log = agent.logger; const action = getActionFromRequest(request); const name = getSpanNameFromRequest(request); const parentRunContext = ins.currRunContext(); const span = ins.createSpan(name, TYPE, SUBTYPE, action, { exitSpan: true }); // W3C trace-context propagation. const parent = span || parentRunContext.currSpan() || parentRunContext.currTransaction(); if (parent) { if (!request.params || typeof request.params !== 'object') { // Pass. This is a sanity guard against bogus user input. } else if (request.operation === 'sendMessage') { const paramsCopy = Object.assign({}, request.params); paramsCopy.MessageAttributes = Object.assign( {}, paramsCopy.MessageAttributes, ); request.params = paramsCopy; propagateTraceContextInMessageAttributes( parent, request.params, log, request.params.QueueUrl, ); } else if ( request.operation === 'sendMessageBatch' && Array.isArray(request.params.Entries) ) { const paramsCopy = Object.assign({}, request.params); paramsCopy.Entries = paramsCopy.Entries.map((msgParams) => { const msgParamsCopy = Object.assign({}, msgParams); msgParamsCopy.MessageAttributes = Object.assign( {}, msgParamsCopy.MessageAttributes, ); propagateTraceContextInMessageAttributes( parent, msgParamsCopy, log, request.params.QueueUrl, ); return msgParamsCopy; }); request.params = paramsCopy; } } if (!span) { return orig.apply(request, origArguments); } span._setDestinationContext(getMessageDestinationContextFromRequest(request)); span.setMessageContext(getMessageContextFromRequest(request)); const onComplete = function (response) { if (response && response.error) { agent.captureError(response.error); } const receiveMsgData = request.operation === 'receiveMessage' && response && response.data; if (receiveMsgData) { const links = getSpanLinksFromResponseData(receiveMsgData); if (links) { span.addLinks(links); } } span.end(); if (receiveMsgData) { recordMetrics(getQueueNameFromRequest(request), receiveMsgData, agent); } }; // Bind onComplete to the span's run context so that `captureError` picks // up the correct currentSpan. const spanRunContext = parentRunContext.enterSpan(span); request.on( 'complete', ins.bindFunctionToRunContext(spanRunContext, onComplete), ); const cb = origArguments[origArguments.length - 1]; if (typeof cb === 'function') { origArguments[origArguments.length - 1] = ins.bindFunctionToRunContext( parentRunContext, cb, ); } return ins.withRunContext(spanRunContext, orig, request, ...origArguments); } module.exports = { instrumentationSqs, // exported for tests getToFromFromOperation, getActionFromRequest, getQueueNameFromRequest, getRegionFromRequest, getMessageDestinationContextFromRequest, shouldIgnoreRequest, };