function wrapProducerSendBatch()

in lib/instrumentation/modules/kafkajs.js [368:456]


  function wrapProducerSendBatch(origSendBatch) {
    return async function (batch) {
      let span;
      let topicForContext;
      let shouldIgnoreBatch = true;
      const messages = batch.topicMessages || [];
      const topics = new Set();

      // Remove possible topic duplications
      for (const msg of messages) {
        topics.add(msg.topic);
      }

      for (const t of topics) {
        const topicIgnored = shouldIgnoreTopic(t, config);

        shouldIgnoreBatch = shouldIgnoreBatch && topicIgnored;

        // When a topic is not ignored we keep a copy for context unless
        // we find a 2nd topic also not ignored.
        if (!topicIgnored) {
          if (topicForContext) {
            topicForContext = undefined;
            break;
          }
          topicForContext = t;
        }
      }

      if (!shouldIgnoreBatch) {
        const suffix = topicForContext ? ` to ${topicForContext}` : '';
        span = ins.createSpan(`${NAME} SEND${suffix}`, TYPE, SUBTYPE, 'send', {
          exitSpan: true,
        });
      }

      // W3C trace-context propagation.
      const runContext = ins.currRunContext();
      const parentSpan =
        span || runContext.currSpan() || runContext.currTransaction();

      if (parentSpan && batch.topicMessages) {
        batch.topicMessages.forEach((topicMessage) => {
          topicMessage.messages.forEach((msg) => {
            const newHeaders = Object.assign({}, msg.headers);
            parentSpan.propagateTraceContextHeaders(
              newHeaders,
              function (carrier, name, value) {
                if (name.startsWith('elastic-')) {
                  return;
                }
                carrier[name] = value;
              },
            );
            msg.headers = newHeaders;
          });
        });
      }

      if (!span) {
        return origSendBatch.apply(this, arguments);
      }

      if (topicForContext) {
        // We do not add headers or body because:
        // - `record.messages` is a list
        // - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields)
        span.setMessageContext({ queue: { name: topicForContext } });
      }
      span.setServiceTarget(SUBTYPE, topicForContext);

      let result, err;
      try {
        result = await origSendBatch.apply(this, arguments);
      } catch (ex) {
        // Save the error for use in `finally` below, but re-throw it to
        // not impact code flow.
        err = ex;
        throw ex;
      } finally {
        span.setOutcome(
          err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
        );
        span.end();
      }

      return result;
    };
  }