function wrapEachMessage()

in lib/instrumentation/modules/kafkajs.js [99:184]


  function wrapEachMessage(origEachMessage) {
    return async function (payload) {
      const { topic, message } = payload;

      if (shouldIgnoreTopic(topic, config)) {
        return origEachMessage.apply(this, arguments);
      }

      // For distributed tracing this instrumentation is going to check
      // the headers defined by opentelemetry and ignore the propietary
      // `elasticaapmtraceparent` header
      // https://github.com/elastic/apm/blob/main/specs/agents/tracing-distributed-tracing.md#binary-fields
      const traceparent = message.headers && message.headers.traceparent;
      const tracestate = message.headers && message.headers.tracestate;
      const opts = {};

      // According to `kafkajs` types a header value might be
      // a string or Buffer
      // https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/types/index.d.ts#L148
      if (typeof traceparent === 'string') {
        opts.childOf = traceparent;
      } else if (traceparent instanceof Buffer) {
        opts.childOf = traceparent.toString('utf-8');
      }

      if (typeof tracestate === 'string') {
        opts.tracestate = tracestate;
      } else if (tracestate instanceof Buffer) {
        opts.tracestate = tracestate.toString('utf-8');
      }

      const trans = ins.startTransaction(
        `${NAME} RECEIVE from ${topic}`,
        TYPE,
        opts,
      );

      const messageCtx = { queue: { name: topic } };
      if (
        config.captureBody === 'all' ||
        config.captureBody === 'transactions'
      ) {
        messageCtx.body = message.value?.toString();
      }

      if (message.headers && config.captureHeaders) {
        // Make sure there is no sensitive data
        // and transform non-redacted buffers
        messageCtx.headers = redactKeysFromObject(
          message.headers,
          config.sanitizeFieldNamesRegExp,
        );
        Object.keys(messageCtx.headers).forEach((key) => {
          const value = messageCtx.headers[key];
          if (value instanceof Buffer) {
            messageCtx.headers[key] = value.toString('utf-8');
          }
        });
      }

      if (message.timestamp) {
        messageCtx.age = {
          ms: Date.now() - Number(message.timestamp),
        };
      }

      trans.setMessageContext(messageCtx);

      let result, err;
      try {
        result = await origEachMessage.apply(this, arguments);
      } catch (ex) {
        // Save the error for use in `finally` below, but re-throw it to
        // not impact code flow.
        err = ex;
        throw ex;
      } finally {
        trans.setOutcome(
          err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
        );
        trans.end();
      }

      return result;
    };
  }