function getChoppedStreamHandler()

in lib/apm-client/http-apm-client/index.js [1043:1383]


function getChoppedStreamHandler(client, onerror) {
  // Make a request to the apm-server intake API.
  // https://www.elastic.co/guide/en/apm/server/current/events-api.html
  //
  // In normal operation this works as follows:
  // - The StreamChopper (`this._chopper`) calls this function with a newly
  //   created Gzip stream, to which it writes encoded event data.
  // - It `gzipStream.end()`s the stream when:
  //   (a) approximately `apiRequestSize` of data have been written,
  //   (b) `apiRequestTime` seconds have passed, or
  //   (c) `_chopper.chop()` is explicitly called via `client.flush()`,
  //       e.g. used by the Node.js APM agent after `client.sendError()`.
  // - This function makes the HTTP POST to the apm-server, pipes the gzipStream
  //   to it, and waits for the completion of the request and the apm-server
  //   response.
  // - Then it calls the given `next` callback to signal StreamChopper that
  //   another chopped stream can be created, when there is more the send.
  //
  // Of course, things can go wrong. Here are the known ways this pipeline can
  // conclude.
  // - intake response success - A successful response from the APM server. This
  //   is the normal operation case described above.
  // - gzipStream error - An "error" event on the gzip stream.
  // - intake request error - An "error" event on the intake HTTP request, e.g.
  //   ECONNREFUSED or ECONNRESET.
  // - intakeResTimeout - A timer started *after* we are finished sending data
  //   to the APM server by which we require a response (including its body). By
  //   default this is 10s -- a very long time to allow for a slow or far
  //   apm-server. If we hit this, APM server is problematic anyway, so the
  //   delay doesn't add to the problems.
  // - serverTimeout - An idle timeout value (default 30s) set on the socket.
  //   This is a catch-all fallback for an otherwised wedged connection. If this
  //   is being hit, there is some major issue in the application (possibly a
  //   bug in the APM agent).
  // - process completion - The Client takes pains to always `.unref()` its
  //   handles to never keep a using process open if it is ready to exit. When
  //   the process is ready to exit, the following happens:
  //    - The "beforeExit" handler above will call `client._gracefulExit()` ...
  //    - ... which calls `client._ref()` to *hold the process open* to
  //      complete this request, and `client.end()` to end the `gzipStream` so
  //      this request can complete soon.
  //    - We then expect this request to complete quickly and the process will
  //      then finish exiting. A subtlety is if the APM server is not responding
  //      then we'll wait on the shorter `intakeResTimeoutOnEnd` (by default 1s).
  return function makeIntakeRequest(gzipStream, next) {
    const reqId = crypto.randomBytes(16).toString('hex');
    const log = client._log.child({ reqId });
    const startTime = process.hrtime();
    const timeline = [];
    let bytesWritten = 0;
    let intakeRes;
    let intakeReqSocket = null;
    let intakeResTimer = null;
    let intakeRequestGracefulExitCalled = false;
    const intakeResTimeout = client._conf.intakeResTimeout;
    const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd;

    // `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`.
    client._activeIntakeReq = true;

    // Handle conclusion of this intake request. Each "part" is expected to call
    // `completePart()` at least once -- multiple calls are okay for cases like
    // the "error" and "close" events on a stream being called. When a part
    // errors or all parts are completed, then we can conclude.
    let concluded = false;
    const completedFromPart = {
      gzipStream: false,
      intakeReq: false,
      intakeRes: false,
    };
    let numToComplete = Object.keys(completedFromPart).length;
    const completePart = (part, err) => {
      log.trace({ err, concluded }, 'completePart %s', part);
      timeline.push([
        deltaMs(startTime),
        `completePart ${part}`,
        err && err.message,
      ]);
      assert(part in completedFromPart, `'${part}' is in completedFromPart`);

      if (concluded) {
        return;
      }

      // If this is the final part to complete, then we are ready to conclude.
      let allPartsCompleted = false;
      if (!completedFromPart[part]) {
        completedFromPart[part] = true;
        numToComplete--;
        if (numToComplete === 0) {
          allPartsCompleted = true;
        }
      }
      if (!err && !allPartsCompleted) {
        return;
      }

      // Conclude.
      concluded = true;
      if (err) {
        // There was an error: clean up resources.

        // Note that in Node v8, destroying the gzip stream results in it
        // emitting an "error" event as follows. No harm, however.
        //    Error: gzip stream error: zlib binding closed
        //      at Gzip._transform (zlib.js:369:15)
        //      ...
        destroyStream(gzipStream);
        intakeReq.destroy();
        if (intakeResTimer) {
          log.trace('cancel intakeResTimer');
          clearTimeout(intakeResTimer);
          intakeResTimer = null;
        }
      }
      client._intakeRequestGracefulExitFn = null;

      client.sent = client._numEventsEnqueued;
      client._activeIntakeReq = false;
      const backoffDelayMs = client._getBackoffDelay(!!err);
      if (err) {
        log.trace(
          { timeline, bytesWritten, backoffDelayMs, err },
          'conclude intake request: error',
        );
        onerror(err);
      } else {
        log.trace(
          { timeline, bytesWritten, backoffDelayMs },
          'conclude intake request: success',
        );
      }
      if (client._onIntakeReqConcluded) {
        client._onIntakeReqConcluded();
        client._onIntakeReqConcluded = null;
      }

      if (backoffDelayMs > 0) {
        setTimeout(next, backoffDelayMs).unref();
      } else {
        setImmediate(next);
      }
    };

    // Provide a function on the client for it to signal this intake request
    // to gracefully shutdown, i.e. finish up quickly.
    client._intakeRequestGracefulExitFn = () => {
      intakeRequestGracefulExitCalled = true;
      if (intakeReqSocket) {
        log.trace('_intakeRequestGracefulExitFn: re-ref intakeReqSocket');
        intakeReqSocket.ref();
      }
      if (intakeResTimer) {
        log.trace(
          '_intakeRequestGracefulExitFn: reset intakeResTimer to short timeout',
        );
        clearTimeout(intakeResTimer);
        intakeResTimer = setTimeout(() => {
          completePart(
            'intakeRes',
            new Error(
              'intake response timeout: APM server did not respond ' +
                `within ${
                  intakeResTimeoutOnEnd / 1000
                }s of graceful exit signal`,
            ),
          );
        }, intakeResTimeoutOnEnd).unref();
      }
    };

    // Start the request and set its timeout.
    const intakeReq = client._transportRequest(client._conf.requestIntake);
    if (Number.isFinite(client._conf.serverTimeout)) {
      intakeReq.setTimeout(client._conf.serverTimeout);
    }
    // TODO: log intakeReq and intakeRes when
    // https://github.com/elastic/ecs-logging-nodejs/issues/67 is implemented.
    log.trace('intake request start');

    // Handle events on the intake request.
    // https://nodejs.org/api/http.html#http_http_request_options_callback docs
    // emitted events on the req and res objects for different scenarios.
    intakeReq.on('timeout', () => {
      log.trace('intakeReq "timeout"');
      // `.destroy(err)` will result in an "error" event.
      intakeReq.destroy(
        new Error(
          `APM Server response timeout (${client._conf.serverTimeout}ms)`,
        ),
      );
    });

    intakeReq.on('socket', function (socket) {
      intakeReqSocket = socket;
      // Unref the socket for this request so that the Client does not keep
      // the node process running if it otherwise would be done. (This is
      // tested by the "unref-client" test in test/side-effects.js.)
      //
      // The HTTP keep-alive agent will unref sockets when unused, and ref them
      // during a request. Given that the normal makeIntakeRequest behaviour
      // is to keep a request open for up to 10s (`apiRequestTime`), we must
      // manually unref the socket.
      //
      // The exception is when in a Lambda environment, where we *do* want to
      // keep the node process running to complete this intake request.
      // Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
      // interprets as "the Lambda handler callback was never called".
      if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
        log.trace('intakeReq "socket": unref it');
        intakeReqSocket.unref();
      }
    });

    intakeReq.on('response', (intakeRes_) => {
      intakeRes = intakeRes_;
      log.trace(
        { statusCode: intakeRes.statusCode, reqFinished: intakeReq.finished },
        'intakeReq "response"',
      );
      let err;
      const chunks = [];

      if (!intakeReq.finished) {
        // Premature response from APM server. Typically this is for errors
        // like "queue is full", for which the response body will be parsed
        // below. However, set an `err` as a fallback for the unexpected case
        // that is with a 2xx response.
        if (intakeRes.statusCode >= 200 && intakeRes.statusCode < 300) {
          err = new Error(
            `premature apm-server response with statusCode=${intakeRes.statusCode}`,
          );
        }
        // There is no point (though no harm) in sending more data to the APM
        // server. In case reading the error response body takes a while, pause
        // the gzip stream until it is destroyed in `completePart()`.
        gzipStream.pause();
      }

      // Handle events on the intake response.
      intakeRes.on('error', (intakeResErr) => {
        // I am not aware of a way to get an "error" event on the
        // IncomingMessage (see also https://stackoverflow.com/q/53691119), but
        // handling it here is preferable to an uncaughtException.
        intakeResErr = wrapError(intakeResErr, 'intake response error event');
        completePart('intakeRes', intakeResErr);
      });
      intakeRes.on('data', (chunk) => {
        chunks.push(chunk);
      });
      // intakeRes.on('close', () => { log.trace('intakeRes "close"') })
      // intakeRes.on('aborted', () => { log.trace('intakeRes "aborted"') })
      intakeRes.on('end', () => {
        log.trace('intakeRes "end"');
        if (intakeResTimer) {
          clearTimeout(intakeResTimer);
          intakeResTimer = null;
        }
        if (intakeRes.statusCode < 200 || intakeRes.statusCode > 299) {
          err = processIntakeErrorResponse(intakeRes, Buffer.concat(chunks));
        }
        completePart('intakeRes', err);
      });
    });

    // intakeReq.on('abort', () => { log.trace('intakeReq "abort"') })
    // intakeReq.on('close', () => { log.trace('intakeReq "close"') })
    intakeReq.on('finish', () => {
      log.trace('intakeReq "finish"');
      completePart('intakeReq');
    });
    intakeReq.on('error', (err) => {
      log.trace('intakeReq "error"');
      completePart('intakeReq', err);
    });

    // Handle events on the gzip stream.
    gzipStream.on('data', (chunk) => {
      bytesWritten += chunk.length;
    });
    gzipStream.on('error', (gzipErr) => {
      log.trace('gzipStream "error"');
      gzipErr = wrapError(gzipErr, 'gzip stream error');
      completePart('gzipStream', gzipErr);
    });
    gzipStream.on('finish', () => {
      // If the apm-server is not reading its input and the gzip data is large
      // enough to fill buffers, then the gzip stream will emit "finish", but
      // not "end". Therefore, this "finish" event is the best indicator that
      // the ball is now in the apm-server's court.
      //
      // We now start a timer waiting on the response, provided we still expect
      // one (we don't if the request has already errored out, e.g.
      // ECONNREFUSED) and it hasn't already completed (e.g. if it replied
      // quickly with "queue is full").
      log.trace('gzipStream "finish"');
      if (!completedFromPart.intakeReq && !completedFromPart.intakeRes) {
        const timeout =
          client._writableState.ending || intakeRequestGracefulExitCalled
            ? intakeResTimeoutOnEnd
            : intakeResTimeout;
        log.trace({ timeout }, 'start intakeResTimer');
        intakeResTimer = setTimeout(() => {
          completePart(
            'intakeRes',
            new Error(
              'intake response timeout: APM server did not respond ' +
                `within ${timeout / 1000}s of gzip stream finish`,
            ),
          );
        }, timeout).unref();
      }
    });
    // Watch the gzip "end" event for its completion, because the "close" event
    // that we would prefer to use, *does not get emitted* for the
    // `client.sendSpan(callback) + client.flush()` test case with
    // *node v12-only*.
    gzipStream.on('end', () => {
      log.trace('gzipStream "end"');
      completePart('gzipStream');
    });
    // gzipStream.on('close', () => { log.trace('gzipStream "close"') })

    // Hook up writing data to a file (only intended for local debugging).
    // Append the intake data to `payloadLogFile`, if given. This is only
    // intended for local debugging because it can have a significant perf
    // impact.
    if (client._conf.payloadLogFile) {
      const payloadLogStream = fs.createWriteStream(
        client._conf.payloadLogFile,
        { flags: 'a' },
      );
      gzipStream.pipe(zlib.createGunzip()).pipe(payloadLogStream);
    }

    // Send the metadata object (always first) and hook up the streams.
    assert(client._encodedMetadata, 'client._encodedMetadata is set');
    gzipStream.write(client._encodedMetadata);
    gzipStream.pipe(intakeReq);
  };
}