in src/plugins/AWS2SQSPlugin.ts [40:167]
function SQS(this: any) {
const sqs = _SQS.apply(this, arguments);
function instrumentSend(name: string, addTraceId: any): void {
const _func = sqs[name];
sqs[name] = function (params: any, callback: any) {
const queueUrl = params.QueueUrl;
const operation = `AWS/SQS/${name}/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`;
const span = ContextManager.current.newExitSpan(operation, Component.AWS_SQS, Component.HTTP);
span.component = Component.AWS_SQS;
span.layer = SpanLayer.MQ;
return execute(span, this, _func, addTraceId(params, span), callback, 'mqBroker');
};
}
instrumentSend('sendMessage', (params: any, span: Span) => {
params = Object.assign({}, params);
params.MessageAttributes = params.MessageAttributes ? Object.assign({}, params.MessageAttributes) : {};
params.MessageAttributes.__revdTraceId = {
DataType: 'String',
StringValue: `${span.inject().value}/${hostname()}`,
};
return params;
});
instrumentSend('sendMessageBatch', (params: any, span: Span) => {
const traceId = { __revdTraceId: { DataType: 'String', StringValue: `${span.inject().value}/${hostname()}` } };
params = Object.assign({}, params);
params.Entries = params.Entries.map(
(e: any) =>
(e = Object.assign({}, e, {
MessageAttributes: e.MessageAttributes ? Object.assign({}, e.MessageAttributes, traceId) : traceId,
})),
);
return params;
});
const _receiveMessage = sqs.receiveMessage;
sqs.receiveMessage = function (params: any, callback: any) {
params = Object.assign({}, params);
const _MessageAttributeNames = params.MessageAttributeNames;
params.MessageAttributeNames = _MessageAttributeNames
? _MessageAttributeNames.concat(['__revdTraceId'])
: ['__revdTraceId'];
delete params.MaxNumberOfMessages; // limit to 1 message in order to be able to link all Exit and Entry spans
const queueUrl = params.QueueUrl;
const operation = `AWS/SQS/receiveMessage/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`;
const span = ContextManager.current.newExitSpan(`${operation}<check>`, Component.AWS_SQS, Component.HTTP);
span.component = Component.AWS_SQS;
span.layer = SpanLayer.MQ;
// should always be called on success only, with no err
function beforeCB(this: any, span: Span, err: any, res: any): Span {
if (res.Messages?.length) {
const delall = !_MessageAttributeNames || !_MessageAttributeNames.length;
let traceId;
// should only be 1
for (let msg of res.Messages) {
if (msg.MessageAttributes !== undefined || !config.awsSQSCheckBody)
traceId = msg.MessageAttributes?.__revdTraceId?.StringValue;
else {
try {
msg = JSON.parse(msg.Body);
traceId = msg.MessageAttributes?.__revdTraceId?.Value;
} catch {
// NOOP
}
}
if (traceId) {
if (delall) {
delete msg.MD5OfMessageAttributes;
delete msg.MessageAttributes;
} else {
delete msg.MessageAttributes.__revdTraceId;
if (!Object.keys(msg.MessageAttributes).length) {
delete msg.MD5OfMessageAttributes;
delete msg.MessageAttributes;
}
}
}
}
let peer = 'Unknown';
let carrier: ContextCarrier | undefined = undefined;
if (traceId) {
const idx = traceId.lastIndexOf('/');
if (idx !== -1) {
peer = traceId.slice(idx + 1);
traceId = traceId.slice(0, idx);
carrier = ContextCarrier.from({ sw8: traceId });
}
}
span.stop();
span = ContextManager.current.newEntrySpan(operation, carrier);
span.component = Component.AWS_SQS;
span.layer = SpanLayer.MQ;
span.peer = peer;
span.tag(Tag.mqBroker(queueUrl));
span.start();
}
return span;
}
return execute(span, this, _receiveMessage, params, callback, 'mqBroker', beforeCB);
};
return sqs;
}