in lib/apm-client/http-apm-client/index.js [1043:1383]
function getChoppedStreamHandler(client, onerror) {
// Make a request to the apm-server intake API.
// https://www.elastic.co/guide/en/apm/server/current/events-api.html
//
// In normal operation this works as follows:
// - The StreamChopper (`this._chopper`) calls this function with a newly
// created Gzip stream, to which it writes encoded event data.
// - It `gzipStream.end()`s the stream when:
// (a) approximately `apiRequestSize` of data have been written,
// (b) `apiRequestTime` seconds have passed, or
// (c) `_chopper.chop()` is explicitly called via `client.flush()`,
// e.g. used by the Node.js APM agent after `client.sendError()`.
// - This function makes the HTTP POST to the apm-server, pipes the gzipStream
// to it, and waits for the completion of the request and the apm-server
// response.
// - Then it calls the given `next` callback to signal StreamChopper that
// another chopped stream can be created, when there is more the send.
//
// Of course, things can go wrong. Here are the known ways this pipeline can
// conclude.
// - intake response success - A successful response from the APM server. This
// is the normal operation case described above.
// - gzipStream error - An "error" event on the gzip stream.
// - intake request error - An "error" event on the intake HTTP request, e.g.
// ECONNREFUSED or ECONNRESET.
// - intakeResTimeout - A timer started *after* we are finished sending data
// to the APM server by which we require a response (including its body). By
// default this is 10s -- a very long time to allow for a slow or far
// apm-server. If we hit this, APM server is problematic anyway, so the
// delay doesn't add to the problems.
// - serverTimeout - An idle timeout value (default 30s) set on the socket.
// This is a catch-all fallback for an otherwised wedged connection. If this
// is being hit, there is some major issue in the application (possibly a
// bug in the APM agent).
// - process completion - The Client takes pains to always `.unref()` its
// handles to never keep a using process open if it is ready to exit. When
// the process is ready to exit, the following happens:
// - The "beforeExit" handler above will call `client._gracefulExit()` ...
// - ... which calls `client._ref()` to *hold the process open* to
// complete this request, and `client.end()` to end the `gzipStream` so
// this request can complete soon.
// - We then expect this request to complete quickly and the process will
// then finish exiting. A subtlety is if the APM server is not responding
// then we'll wait on the shorter `intakeResTimeoutOnEnd` (by default 1s).
return function makeIntakeRequest(gzipStream, next) {
const reqId = crypto.randomBytes(16).toString('hex');
const log = client._log.child({ reqId });
const startTime = process.hrtime();
const timeline = [];
let bytesWritten = 0;
let intakeRes;
let intakeReqSocket = null;
let intakeResTimer = null;
let intakeRequestGracefulExitCalled = false;
const intakeResTimeout = client._conf.intakeResTimeout;
const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd;
// `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`.
client._activeIntakeReq = true;
// Handle conclusion of this intake request. Each "part" is expected to call
// `completePart()` at least once -- multiple calls are okay for cases like
// the "error" and "close" events on a stream being called. When a part
// errors or all parts are completed, then we can conclude.
let concluded = false;
const completedFromPart = {
gzipStream: false,
intakeReq: false,
intakeRes: false,
};
let numToComplete = Object.keys(completedFromPart).length;
const completePart = (part, err) => {
log.trace({ err, concluded }, 'completePart %s', part);
timeline.push([
deltaMs(startTime),
`completePart ${part}`,
err && err.message,
]);
assert(part in completedFromPart, `'${part}' is in completedFromPart`);
if (concluded) {
return;
}
// If this is the final part to complete, then we are ready to conclude.
let allPartsCompleted = false;
if (!completedFromPart[part]) {
completedFromPart[part] = true;
numToComplete--;
if (numToComplete === 0) {
allPartsCompleted = true;
}
}
if (!err && !allPartsCompleted) {
return;
}
// Conclude.
concluded = true;
if (err) {
// There was an error: clean up resources.
// Note that in Node v8, destroying the gzip stream results in it
// emitting an "error" event as follows. No harm, however.
// Error: gzip stream error: zlib binding closed
// at Gzip._transform (zlib.js:369:15)
// ...
destroyStream(gzipStream);
intakeReq.destroy();
if (intakeResTimer) {
log.trace('cancel intakeResTimer');
clearTimeout(intakeResTimer);
intakeResTimer = null;
}
}
client._intakeRequestGracefulExitFn = null;
client.sent = client._numEventsEnqueued;
client._activeIntakeReq = false;
const backoffDelayMs = client._getBackoffDelay(!!err);
if (err) {
log.trace(
{ timeline, bytesWritten, backoffDelayMs, err },
'conclude intake request: error',
);
onerror(err);
} else {
log.trace(
{ timeline, bytesWritten, backoffDelayMs },
'conclude intake request: success',
);
}
if (client._onIntakeReqConcluded) {
client._onIntakeReqConcluded();
client._onIntakeReqConcluded = null;
}
if (backoffDelayMs > 0) {
setTimeout(next, backoffDelayMs).unref();
} else {
setImmediate(next);
}
};
// Provide a function on the client for it to signal this intake request
// to gracefully shutdown, i.e. finish up quickly.
client._intakeRequestGracefulExitFn = () => {
intakeRequestGracefulExitCalled = true;
if (intakeReqSocket) {
log.trace('_intakeRequestGracefulExitFn: re-ref intakeReqSocket');
intakeReqSocket.ref();
}
if (intakeResTimer) {
log.trace(
'_intakeRequestGracefulExitFn: reset intakeResTimer to short timeout',
);
clearTimeout(intakeResTimer);
intakeResTimer = setTimeout(() => {
completePart(
'intakeRes',
new Error(
'intake response timeout: APM server did not respond ' +
`within ${
intakeResTimeoutOnEnd / 1000
}s of graceful exit signal`,
),
);
}, intakeResTimeoutOnEnd).unref();
}
};
// Start the request and set its timeout.
const intakeReq = client._transportRequest(client._conf.requestIntake);
if (Number.isFinite(client._conf.serverTimeout)) {
intakeReq.setTimeout(client._conf.serverTimeout);
}
// TODO: log intakeReq and intakeRes when
// https://github.com/elastic/ecs-logging-nodejs/issues/67 is implemented.
log.trace('intake request start');
// Handle events on the intake request.
// https://nodejs.org/api/http.html#http_http_request_options_callback docs
// emitted events on the req and res objects for different scenarios.
intakeReq.on('timeout', () => {
log.trace('intakeReq "timeout"');
// `.destroy(err)` will result in an "error" event.
intakeReq.destroy(
new Error(
`APM Server response timeout (${client._conf.serverTimeout}ms)`,
),
);
});
intakeReq.on('socket', function (socket) {
intakeReqSocket = socket;
// Unref the socket for this request so that the Client does not keep
// the node process running if it otherwise would be done. (This is
// tested by the "unref-client" test in test/side-effects.js.)
//
// The HTTP keep-alive agent will unref sockets when unused, and ref them
// during a request. Given that the normal makeIntakeRequest behaviour
// is to keep a request open for up to 10s (`apiRequestTime`), we must
// manually unref the socket.
//
// The exception is when in a Lambda environment, where we *do* want to
// keep the node process running to complete this intake request.
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
// interprets as "the Lambda handler callback was never called".
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
log.trace('intakeReq "socket": unref it');
intakeReqSocket.unref();
}
});
intakeReq.on('response', (intakeRes_) => {
intakeRes = intakeRes_;
log.trace(
{ statusCode: intakeRes.statusCode, reqFinished: intakeReq.finished },
'intakeReq "response"',
);
let err;
const chunks = [];
if (!intakeReq.finished) {
// Premature response from APM server. Typically this is for errors
// like "queue is full", for which the response body will be parsed
// below. However, set an `err` as a fallback for the unexpected case
// that is with a 2xx response.
if (intakeRes.statusCode >= 200 && intakeRes.statusCode < 300) {
err = new Error(
`premature apm-server response with statusCode=${intakeRes.statusCode}`,
);
}
// There is no point (though no harm) in sending more data to the APM
// server. In case reading the error response body takes a while, pause
// the gzip stream until it is destroyed in `completePart()`.
gzipStream.pause();
}
// Handle events on the intake response.
intakeRes.on('error', (intakeResErr) => {
// I am not aware of a way to get an "error" event on the
// IncomingMessage (see also https://stackoverflow.com/q/53691119), but
// handling it here is preferable to an uncaughtException.
intakeResErr = wrapError(intakeResErr, 'intake response error event');
completePart('intakeRes', intakeResErr);
});
intakeRes.on('data', (chunk) => {
chunks.push(chunk);
});
// intakeRes.on('close', () => { log.trace('intakeRes "close"') })
// intakeRes.on('aborted', () => { log.trace('intakeRes "aborted"') })
intakeRes.on('end', () => {
log.trace('intakeRes "end"');
if (intakeResTimer) {
clearTimeout(intakeResTimer);
intakeResTimer = null;
}
if (intakeRes.statusCode < 200 || intakeRes.statusCode > 299) {
err = processIntakeErrorResponse(intakeRes, Buffer.concat(chunks));
}
completePart('intakeRes', err);
});
});
// intakeReq.on('abort', () => { log.trace('intakeReq "abort"') })
// intakeReq.on('close', () => { log.trace('intakeReq "close"') })
intakeReq.on('finish', () => {
log.trace('intakeReq "finish"');
completePart('intakeReq');
});
intakeReq.on('error', (err) => {
log.trace('intakeReq "error"');
completePart('intakeReq', err);
});
// Handle events on the gzip stream.
gzipStream.on('data', (chunk) => {
bytesWritten += chunk.length;
});
gzipStream.on('error', (gzipErr) => {
log.trace('gzipStream "error"');
gzipErr = wrapError(gzipErr, 'gzip stream error');
completePart('gzipStream', gzipErr);
});
gzipStream.on('finish', () => {
// If the apm-server is not reading its input and the gzip data is large
// enough to fill buffers, then the gzip stream will emit "finish", but
// not "end". Therefore, this "finish" event is the best indicator that
// the ball is now in the apm-server's court.
//
// We now start a timer waiting on the response, provided we still expect
// one (we don't if the request has already errored out, e.g.
// ECONNREFUSED) and it hasn't already completed (e.g. if it replied
// quickly with "queue is full").
log.trace('gzipStream "finish"');
if (!completedFromPart.intakeReq && !completedFromPart.intakeRes) {
const timeout =
client._writableState.ending || intakeRequestGracefulExitCalled
? intakeResTimeoutOnEnd
: intakeResTimeout;
log.trace({ timeout }, 'start intakeResTimer');
intakeResTimer = setTimeout(() => {
completePart(
'intakeRes',
new Error(
'intake response timeout: APM server did not respond ' +
`within ${timeout / 1000}s of gzip stream finish`,
),
);
}, timeout).unref();
}
});
// Watch the gzip "end" event for its completion, because the "close" event
// that we would prefer to use, *does not get emitted* for the
// `client.sendSpan(callback) + client.flush()` test case with
// *node v12-only*.
gzipStream.on('end', () => {
log.trace('gzipStream "end"');
completePart('gzipStream');
});
// gzipStream.on('close', () => { log.trace('gzipStream "close"') })
// Hook up writing data to a file (only intended for local debugging).
// Append the intake data to `payloadLogFile`, if given. This is only
// intended for local debugging because it can have a significant perf
// impact.
if (client._conf.payloadLogFile) {
const payloadLogStream = fs.createWriteStream(
client._conf.payloadLogFile,
{ flags: 'a' },
);
gzipStream.pipe(zlib.createGunzip()).pipe(payloadLogStream);
}
// Send the metadata object (always first) and hook up the streams.
assert(client._encodedMetadata, 'client._encodedMetadata is set');
gzipStream.write(client._encodedMetadata);
gzipStream.pipe(intakeReq);
};
}