lib/apm-client/http-apm-client/index.js (1,384 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/
'use strict';
const assert = require('assert');
const crypto = require('crypto');
const fs = require('fs');
const http = require('http');
const https = require('https');
const util = require('util');
const { performance } = require('perf_hooks');
const { URL } = require('url');
const zlib = require('zlib');
const HttpAgentKeepAlive = require('agentkeepalive');
const HttpsAgentKeepAlive = HttpAgentKeepAlive.HttpsAgent;
const Filters = require('object-filter-sequence');
const querystring = require('querystring');
const Writable = require('readable-stream').Writable;
const getContainerInfo = require('./container-info');
const eos = require('end-of-stream');
const semver = require('semver');
const streamToBuffer = require('fast-stream-to-buffer');
const StreamChopper = require('stream-chopper');
const { detectHostname } = require('./detect-hostname');
const ndjson = require('./ndjson');
const { NoopLogger } = require('./logging');
const truncate = require('./truncate');
const { getCentralConfigIntervalS } = require('./central-config');
module.exports = {
HttpApmClient: Client,
};
// These symbols are used as markers in the client stream to indicate special
// flush handling.
const kFlush = Symbol('flush');
const kLambdaEndFlush = Symbol('lambdaEndFlush');
function isFlushMarker(obj) {
return obj === kFlush || obj === kLambdaEndFlush;
}
const requiredOpts = ['agentName', 'agentVersion', 'serviceName', 'userAgent'];
// Get handles on uninstrumented functions for making HTTP(S) requests before
// the APM agent has a chance to wrap them. This allows the Client to make
// requests to APM server without interfering with the APM agent's tracing
// of the user application.
const httpGet = http.get;
const httpRequest = http.request;
const httpsGet = https.get;
const httpsRequest = https.request;
const containerInfo = getContainerInfo.sync();
const isLambdaExecutionEnvironment = !!process.env.AWS_LAMBDA_FUNCTION_NAME;
// All sockets on the agent are unreffed when they are created. This means that
// when the user process's event loop is done, and these are the only handles
// left, the process 'beforeExit' event will be emitted. By listening for this
// we can make sure to end the requests properly before process exit. This way
// we don't keep the process running until the `time` timeout happens.
//
// An exception to this is AWS Lambda which, in some cases (sync function
// handlers that use a callback), will wait for 'beforeExit' to freeze the
// Lambda instance VM *for later re-use*. This means we never want to shutdown
// the `Client` on 'beforeExit'.
const clientsToAutoEnd = [];
if (!isLambdaExecutionEnvironment) {
process.once('beforeExit', function () {
clientsToAutoEnd.forEach(function (client) {
if (!client) {
// Clients remove themselves from the array when they end.
return;
}
client._gracefulExit();
});
});
}
util.inherits(Client, Writable);
Client.encoding = Object.freeze({
METADATA: Symbol('metadata'),
TRANSACTION: Symbol('transaction'),
SPAN: Symbol('span'),
ERROR: Symbol('error'),
METRICSET: Symbol('metricset'),
});
function Client(opts) {
if (!(this instanceof Client)) return new Client(opts);
Writable.call(this, { objectMode: true });
this._corkTimer = null;
this._agent = null;
this._activeIntakeReq = false;
this._onIntakeReqConcluded = null;
this._transport = null;
this._configTimer = null;
this._backoffReconnectCount = 0;
this._intakeRequestGracefulExitFn = null; // set in makeIntakeRequest
this._encodedMetadata = null;
this._cloudMetadata = null;
this._extraMetadata = null;
this._metadataFilters = new Filters();
// _lambdaActive indicates if a Lambda function invocation is active. It is
// only meaningful if `isLambdaExecutionEnvironment`.
this._lambdaActive = false;
// Whether to forward `.lambdaRegisterTransaction()` calls to the Lambda
// extension. This will be set false if a previous attempt failed.
this._lambdaShouldRegisterTransactions = true;
// Internal runtime stats for developer debugging/tuning.
this._numEvents = 0; // number of events given to the client
this._numEventsDropped = 0; // number of events dropped because overloaded
this._numEventsEnqueued = 0; // number of events written through to chopper
this.sent = 0; // number of events sent to APM server (not necessarily accepted)
this._slowWriteBatch = {
// data on slow or the slowest _writeBatch
numOver10Ms: 0,
// Data for the slowest _writeBatch:
encodeTimeMs: 0,
fullTimeMs: 0,
numEvents: 0,
numBytes: 0,
};
this.config(opts);
this._log = this._conf.logger || new NoopLogger();
// `_apmServerVersion` is one of:
// - `undefined`: the version has not yet been fetched
// - `null`: the APM server version is unknown, could not be determined
// - a semver.SemVer instance
this._apmServerVersion = this._conf.apmServerVersion
? new semver.SemVer(this._conf.apmServerVersion)
: undefined;
if (!this._apmServerVersion) {
this._fetchApmServerVersion();
}
const numExtraMdOpts = [
this._conf.cloudMetadataFetcher,
this._conf.expectExtraMetadata,
this._conf.extraMetadata,
].reduce((accum, curr) => (curr ? accum + 1 : accum), 0);
if (numExtraMdOpts > 1) {
throw new Error(
'it is an error to configure a Client with more than one of "cloudMetadataFetcher", "expectExtraMetadata", or "extraMetadata"',
);
} else if (this._conf.cloudMetadataFetcher) {
// Start stream in corked mode, uncork when cloud metadata is fetched and
// assigned. Also, the _maybeUncork will not uncork until _encodedMetadata
// is set.
this._log.trace('corking (cloudMetadataFetcher)');
this.cork();
this._fetchAndEncodeMetadata(() => {
// _fetchAndEncodeMetadata will have set/memoized the encoded
// metadata to the _encodedMetadata property.
// This reverses the cork() call in the constructor above. "Maybe" uncork,
// in case the client has been destroyed before this callback is called.
this._maybeUncork();
this._log.trace('uncorked (cloudMetadataFetcher)');
// the `cloud-metadata` event allows listeners to know when the
// agent has finished fetching and encoding its metadata for the
// first time
this.emit('cloud-metadata', this._encodedMetadata);
});
} else if (this._conf.expectExtraMetadata) {
// Uncorking will happen in the expected `.setExtraMetadata()` call.
this._log.trace('corking (expectExtraMetadata)');
this.cork();
} else if (this._conf.extraMetadata) {
this.setExtraMetadata(this._conf.extraMetadata);
} else {
this._resetEncodedMetadata();
}
this._chopper = new StreamChopper({
size: this._conf.size,
time: this._conf.time,
type: StreamChopper.overflow,
transform() {
return zlib.createGzip({
level: zlib.constants.Z_BEST_SPEED,
});
},
});
const onIntakeError = (err) => {
if (this.destroyed === false) {
this.emit('request-error', err);
}
};
this._chopper.on('stream', getChoppedStreamHandler(this, onIntakeError));
// We don't expect the chopper stream to end until the client is ending.
// Make sure to clean up if this does happen unexpectedly.
const fail = () => {
if (this._writableState.ending === false) this.destroy();
};
eos(this._chopper, fail);
this._index = clientsToAutoEnd.length;
clientsToAutoEnd.push(this);
// The 'beforeExit' event is significant in Lambda invocation completion
// handling, so we log it for debugging.
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) {
process.prependListener('beforeExit', () => {
this._log.trace('process "beforeExit"');
});
}
if (this._conf.centralConfig) {
this._pollConfig();
}
}
// Return current internal stats.
Client.prototype._getStats = function () {
return {
numEvents: this._numEvents,
numEventsDropped: this._numEventsDropped,
numEventsEnqueued: this._numEventsEnqueued,
numEventsSent: this.sent,
slowWriteBatch: this._slowWriteBatch,
backoffReconnectCount: this._backoffReconnectCount,
};
};
Client.prototype.config = function (opts) {
this._conf = Object.assign(this._conf || {}, opts);
this._conf.globalLabels = normalizeGlobalLabels(this._conf.globalLabels);
const missing = requiredOpts.filter((name) => !this._conf[name]);
if (missing.length > 0)
throw new Error('Missing required option(s): ' + missing.join(', '));
// default values
if (!this._conf.size && this._conf.size !== 0) this._conf.size = 750 * 1024;
if (!this._conf.time && this._conf.time !== 0) this._conf.time = 10000;
if (!this._conf.serverTimeout && this._conf.serverTimeout !== 0)
this._conf.serverTimeout = 15000;
if (!this._conf.serverUrl) this._conf.serverUrl = 'http://127.0.0.1:8200';
if (!this._conf.truncateKeywordsAt) this._conf.truncateKeywordsAt = 1024;
if (!this._conf.truncateStringsAt) this._conf.truncateStringsAt = 1024;
if (!this._conf.truncateCustomKeysAt) this._conf.truncateCustomKeysAt = 1024;
if (!this._conf.truncateLongFieldsAt) this._conf.truncateLongFieldsAt = 10000;
// The deprecated `truncateErrorMessagesAt` will be honored if specified.
if (!this._conf.bufferWindowTime) this._conf.bufferWindowTime = 20;
if (!this._conf.bufferWindowSize) this._conf.bufferWindowSize = 50;
if (!this._conf.maxQueueSize) this._conf.maxQueueSize = 1024;
if (!this._conf.intakeResTimeout) this._conf.intakeResTimeout = 10000;
if (!this._conf.intakeResTimeoutOnEnd)
this._conf.intakeResTimeoutOnEnd = 1000;
this._conf.keepAlive = this._conf.keepAlive !== false;
this._conf.centralConfig = this._conf.centralConfig || false;
if (!('keepAliveMsecs' in this._conf)) this._conf.keepAliveMsecs = 1000;
if (!('maxSockets' in this._conf)) this._conf.maxSockets = Infinity;
if (!('maxFreeSockets' in this._conf)) this._conf.maxFreeSockets = 256;
if (!('freeSocketTimeout' in this._conf)) this._conf.freeSocketTimeout = 4000;
// processed values
this._conf.serverUrl = new URL(this._conf.serverUrl);
this._conf.detectedHostname = detectHostname();
if (containerInfo) {
if (!this._conf.containerId && containerInfo.containerId) {
this._conf.containerId = containerInfo.containerId;
}
if (!this._conf.kubernetesPodUID && containerInfo.podId) {
this._conf.kubernetesPodUID = containerInfo.podId;
}
if (!this._conf.kubernetesPodName && containerInfo.podId) {
// https://kubernetes.io/docs/concepts/workloads/pods/#working-with-pods
// suggests a pod name should just be the shorter "DNS label", and my
// guess is k8s defaults a pod name to just the *short* hostname, not
// the FQDN.
this._conf.kubernetesPodName = this._conf.detectedHostname.split(
'.',
1,
)[0];
}
}
let AgentKeepAlive;
switch (this._conf.serverUrl.protocol) {
case 'http:':
this._transport = http;
this._transportRequest = httpRequest;
this._transportGet = httpGet;
AgentKeepAlive = HttpAgentKeepAlive;
break;
case 'https:':
this._transport = https;
this._transportRequest = httpsRequest;
this._transportGet = httpsGet;
AgentKeepAlive = HttpsAgentKeepAlive;
break;
default:
throw new Error('Unknown protocol ' + this._conf.serverUrl.protocol);
}
// Only reset `this._agent` if the serverUrl has changed to avoid
// unnecessarily abandoning keep-alive connections.
if (!this._agent || (opts && 'serverUrl' in opts)) {
if (this._agent) {
this._agent.destroy();
}
this._agent = new AgentKeepAlive({
keepAlive: this._conf.keepAlive,
keepAliveMsecs: this._conf.keepAliveMsecs,
freeSocketTimeout: this._conf.freeSocketTimeout,
timeout: this._conf.serverTimeout,
maxSockets: this._conf.maxSockets,
maxFreeSockets: this._conf.maxFreeSockets,
});
}
// http request options
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent);
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent);
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(
this._conf,
this._agent,
);
this._conf.requestRegisterTransaction = getRegisterTransactionRequestOptions(
this._conf,
this._agent,
);
// fixes bug where cached/memoized _encodedMetadata wouldn't be
// updated when client was reconfigured
if (this._encodedMetadata) {
this._resetEncodedMetadata();
}
};
/**
* Set extra additional metadata to be sent to APM Server in intake requests.
*
* If the Client was configured with `expectExtraMetadata: true` then will
* uncork the client to allow intake requests to begin.
*
* If this is called multiple times, it is additive.
*/
Client.prototype.setExtraMetadata = function (extraMetadata) {
if (!this._extraMetadata) {
this._extraMetadata = extraMetadata;
} else {
metadataMergeDeep(this._extraMetadata, extraMetadata);
}
this._resetEncodedMetadata();
if (this._conf.expectExtraMetadata) {
this._log.trace('maybe uncork (expectExtraMetadata)');
this._maybeUncork();
}
};
/**
* Add a filter function used to filter the "metadata" object sent to APM
* server. See the APM Agent `addMetadataFilter` documentation for details.
* https://www.elastic.co/guide/en/apm/agent/nodejs/current/agent-api.html#apm-add-metadata-filter
*/
Client.prototype.addMetadataFilter = function (fn) {
assert.strictEqual(typeof fn, 'function', 'fn arg must be a function');
this._metadataFilters.push(fn);
if (this._encodedMetadata) {
this._resetEncodedMetadata();
}
};
/**
* (Re)set `_encodedMetadata` from this._conf, this._cloudMetadata,
* this._extraMetadata and possible this._metadataFilters.
*/
Client.prototype._resetEncodedMetadata = function () {
// Make a deep clone so that the originals are not modified when (a) adding
// `.cloud` and (b) filtering. This isn't perf-sensitive code, so this JSON
// cycle for cloning should suffice.
let metadata = metadataFromConf(this._conf, this);
if (this._cloudMetadata) {
metadata.cloud = deepClone(this._cloudMetadata);
}
if (this._extraMetadata) {
metadataMergeDeep(metadata, deepClone(this._extraMetadata));
}
// Possible filters from APM agent's `apm.addMetadataFilter()`.
if (this._metadataFilters && this._metadataFilters.length > 0) {
metadata = this._metadataFilters.process(metadata);
}
// This is the only code path that should set `_encodedMetadata`.
this._encodedMetadata = this._encode({ metadata }, Client.encoding.METADATA);
if (!this._encodedMetadata) {
// The APM client cannot function without encoded metadata. Handling this
// could be improved (e.g. log details and disable the APM agent). However,
// this suffices for now as we have never observed a metadata encoding
// failure.
throw new Error(
'could not encode metadata (trace-level logging will include details)',
);
}
this._log.trace(
{ _encodedMetadata: this._encodedMetadata },
'_resetEncodedMetadata',
);
};
Client.prototype._pollConfig = function () {
const opts = this._conf.requestConfig;
if (this._conf.lastConfigEtag) {
opts.headers['If-None-Match'] = this._conf.lastConfigEtag;
}
const req = this._transportGet(opts, (res) => {
res.on('error', (err) => {
// Not sure this event can ever be emitted, but just in case
res.destroy(err);
});
this._scheduleNextConfigPoll(getMaxAge(res));
// Spec: https://github.com/elastic/apm/blob/main/specs/agents/configuration.md#dealing-with-errors
if (res.statusCode === 304) {
this._log.trace('_pollConfig: no new central config since last poll');
res.resume();
return;
} else if (res.statusCode === 403) {
this._log.debug('_pollConfig: central config not enabled in APM Server');
res.resume();
return;
} else if (res.statusCode === 404) {
// Either a very old APM server, or early fully-managed (aka serverless).
this._log.debug(
'_pollConfig: APM server does not support central config',
);
res.resume();
return;
}
streamToBuffer(res, (err, buf) => {
if (err) {
this.emit('request-error', processConfigErrorResponse(res, buf, err));
return;
}
if (res.statusCode === 200) {
// 200: New config available (or no config for the given service.name / service.environment)
const etag = res.headers.etag;
if (etag) this._conf.lastConfigEtag = etag;
let config;
try {
config = JSON.parse(buf);
} catch (parseErr) {
this.emit(
'request-error',
processConfigErrorResponse(res, buf, parseErr),
);
return;
}
this.emit('config', config);
} else {
this.emit('request-error', processConfigErrorResponse(res, buf));
}
});
});
req.on('error', (err) => {
this._scheduleNextConfigPoll();
this.emit('request-error', err);
});
};
Client.prototype._scheduleNextConfigPoll = function (seconds) {
if (this._configTimer !== null) return;
const delayS = getCentralConfigIntervalS(seconds);
this._configTimer = setTimeout(() => {
this._configTimer = null;
this._pollConfig();
}, delayS * 1000);
this._configTimer.unref();
};
// re-ref the open socket handles
Client.prototype._ref = function () {
Object.keys(this._agent.sockets).forEach((remote) => {
this._agent.sockets[remote].forEach(function (socket) {
socket.ref();
});
});
};
Client.prototype._write = function (obj, enc, cb) {
if (isFlushMarker(obj)) {
this._writeFlush(obj, cb);
} else {
const t = process.hrtime();
const chunk = this._encode(obj, enc);
if (!chunk) {
return;
}
this._numEventsEnqueued++;
this._chopper.write(chunk, cb);
this._log.trace(
{
fullTimeMs: deltaMs(t),
numEvents: 1,
numBytes: chunk.length,
},
'_write: encode object',
);
}
};
Client.prototype._writev = function (objs, cb) {
// Limit the size of individual writes to manageable batches, primarily to
// limit large sync pauses due to `_encode`ing in `_writeBatch`. This value
// is not particularly well tuned. It was selected to get sync pauses under
// 10ms on a developer machine.
const MAX_WRITE_BATCH_SIZE = 32;
let offset = 0;
const processBatch = () => {
if (this.destroyed) {
cb();
return;
}
let flushIdx = -1;
const limit = Math.min(objs.length, offset + MAX_WRITE_BATCH_SIZE);
for (let i = offset; i < limit; i++) {
if (isFlushMarker(objs[i].chunk)) {
flushIdx = i;
break;
}
}
if (
offset === 0 &&
flushIdx === -1 &&
objs.length <= MAX_WRITE_BATCH_SIZE
) {
// A shortcut if there is no flush marker and the whole `objs` fits in a batch.
this._writeBatch(objs, cb);
} else if (flushIdx === -1) {
// No flush marker in this batch.
this._writeBatch(
objs.slice(offset, limit),
limit === objs.length ? cb : processBatch,
);
offset = limit;
} else if (flushIdx > offset) {
// There are some events in the queue before a flush marker.
this._writeBatch(objs.slice(offset, flushIdx), processBatch);
offset = flushIdx;
} else if (flushIdx === objs.length - 1) {
// The next item is a flush marker, and it is the *last* item in the queue.
this._writeFlush(objs[flushIdx].chunk, cb);
} else {
// The next item in the queue is a flush.
this._writeFlush(objs[flushIdx].chunk, processBatch);
offset++;
}
};
processBatch();
};
// Write a batch of events (excluding specially handled "flush" events) to
// the stream chopper.
Client.prototype._writeBatch = function (objs, cb) {
const t = process.hrtime();
const chunks = [];
for (var i = 0; i < objs.length; i++) {
const obj = objs[i];
const encoded = this._encode(obj.chunk, obj.encoding);
if (encoded) {
chunks.push(encoded);
}
}
if (chunks.length === 0) {
return;
}
const chunk = chunks.join('');
const encodeTimeMs = deltaMs(t);
this._numEventsEnqueued += chunks.length;
this._chopper.write(chunk, cb);
const fullTimeMs = deltaMs(t);
if (fullTimeMs > this._slowWriteBatch.fullTimeMs) {
this._slowWriteBatch.encodeTimeMs = encodeTimeMs;
this._slowWriteBatch.fullTimeMs = fullTimeMs;
this._slowWriteBatch.numEvents = objs.length;
this._slowWriteBatch.numBytes = chunk.length;
}
if (fullTimeMs > 10) {
this._slowWriteBatch.numOver10Ms++;
}
this._log.trace(
{
encodeTimeMs,
fullTimeMs,
numEvents: chunks.length,
numBytes: chunk.length,
},
'_writeBatch',
);
};
Client.prototype._writeFlush = function (flushMarker, cb) {
this._log.trace(
{
activeIntakeReq: this._activeIntakeReq,
lambdaEnd: flushMarker === kLambdaEndFlush,
},
'_writeFlush',
);
let onFlushed = cb;
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) {
onFlushed = () => {
// Signal the Elastic AWS Lambda extension that it is done passing data
// for this invocation, then call `cb()` so the wrapped Lambda handler
// can finish.
this._signalLambdaEnd(cb);
};
}
if (this._activeIntakeReq) {
this._onIntakeReqConcluded = onFlushed;
this._chopper.chop();
} else {
this._chopper.chop(onFlushed);
}
};
Client.prototype._maybeCork = function () {
if (!this._writableState.corked) {
if (isLambdaExecutionEnvironment && !this._lambdaActive) {
this.cork();
} else if (this._conf.bufferWindowTime !== -1) {
this.cork();
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh();
} else {
this._corkTimer = setTimeout(() => {
this.uncork();
}, this._conf.bufferWindowTime);
}
}
} else if (this._writableState.length >= this._conf.bufferWindowSize) {
this._maybeUncork();
}
};
Client.prototype._maybeUncork = function () {
if (!this._encodedMetadata) {
// The client must remain corked until cloud metadata has been
// fetched-or-skipped.
return;
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) {
// In a Lambda env, we must only uncork when an invocation is active,
// otherwise we could start an intake request just before the VM is frozen.
return;
}
if (this._writableState.corked) {
// Wait till next tick, so that the current write that triggered the call
// to `_maybeUncork` have time to be added to the queue. If we didn't do
// this, that last write would trigger a single call to `_write`.
process.nextTick(() => {
if (
this.destroyed === false &&
!(isLambdaExecutionEnvironment && !this._lambdaActive)
) {
this.uncork();
}
});
if (this._corkTimer) {
clearTimeout(this._corkTimer);
this._corkTimer = null;
}
}
};
Client.prototype._encode = function (obj, enc) {
let thing;
let truncFunc;
let outAttr;
switch (enc) {
case Client.encoding.SPAN:
thing = obj.span;
truncFunc = truncate.span;
outAttr = 'span';
break;
case Client.encoding.TRANSACTION:
thing = obj.transaction;
truncFunc = truncate.transaction;
outAttr = 'transaction';
break;
case Client.encoding.METADATA:
thing = obj.metadata;
truncFunc = truncate.metadata;
outAttr = 'metadata';
break;
case Client.encoding.ERROR:
thing = obj.error;
truncFunc = truncate.error;
outAttr = 'error';
break;
case Client.encoding.METRICSET:
thing = obj.metricset;
truncFunc = truncate.metricset;
outAttr = 'metricset';
break;
}
const out = {};
try {
out[outAttr] = truncFunc(thing, this._conf);
} catch (err) {
this._log.warn(
{
err,
// Only log full problematic object at TRACE level to limit noise.
thing: this._log.isLevelEnabled('trace') ? thing : '[REDACTED]',
thing_id: thing?.id,
thing_name: thing?.name,
},
`could not encode "${outAttr}" object`,
);
return null;
}
return ndjson.serialize(out);
};
Client.prototype.lambdaStart = function () {
this._lambdaActive = true;
};
/**
* Indicate whether the APM agent -- when in a Lambda environment -- should
* bother calling `.lambdaRegisterTransaction(...)`.
*
* @returns {boolean}
*/
Client.prototype.lambdaShouldRegisterTransactions = function () {
return this._lambdaShouldRegisterTransactions;
};
/**
* Tell the local Lambda extension about the just-started transaction. This
* allows the extension to report the transaction in certain error cases
* where the APM agent isn't able to *end* the transaction and report it,
* e.g. if the function is about to timeout, or if the process crashes.
*
* The expected request is as follows, and a 200 status code is expected in
* response:
*
* POST /register/transaction
* Content-Type: application/vnd.elastic.apm.transaction+ndjson
* x-elastic-aws-request-id: ${awsRequestId}
*
* {"metadata":{...}}
* {"transaction":{...partial transaction data...}}
*
* @param {object} trans - a mostly complete APM Transaction object. It should
* have a default `outcome` value. `duration` and `result` (and possibly
* `outcome`) fields will be set by the Elastic Lambda extension if this
* transaction is used.
* @param {import('crypto').UUID} awsRequestId
* @returns {Promise || undefined} So this can, and should, be `await`ed.
* If returning a promise, it will only resolve, never reject.
*/
Client.prototype.lambdaRegisterTransaction = function (trans, awsRequestId) {
if (!isLambdaExecutionEnvironment) {
return;
}
if (!this._lambdaShouldRegisterTransactions) {
return;
}
assert(this._encodedMetadata, '_encodedMetadata is set');
// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000;
const startTime = performance.now();
return new Promise((resolve, reject) => {
this._log.trace(
{ awsRequestId, traceId: trans.trace_id, transId: trans.id },
'lambdaRegisterTransaction start',
);
const finish = (errOrErrMsg) => {
const durationMs = performance.now() - startTime;
if (errOrErrMsg) {
this._log.debug(
{ awsRequestId, err: errOrErrMsg, durationMs },
'lambdaRegisterTransaction unsuccessful',
);
this._lambdaShouldRegisterTransactions = false;
} else {
this._log.trace(
{ awsRequestId, durationMs },
'lambdaRegisterTransaction success',
);
}
resolve(); // always resolve, never reject
};
var out = this._encode({ transaction: trans }, Client.encoding.TRANSACTION);
if (!out) {
finish('could not encode transaction');
return;
}
// Every `POST /register/transaction` request must set the
// `x-elastic-aws-request-id` header. Instead of creating a new options obj
// each time, we just modify in-place.
this._conf.requestRegisterTransaction.headers['x-elastic-aws-request-id'] =
awsRequestId;
const req = this._transportRequest(
this._conf.requestRegisterTransaction,
(res) => {
res.on('error', (err) => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err);
});
res.resume();
if (res.statusCode !== 200) {
finish(`unexpected response status code: ${res.statusCode}`);
return;
}
res.on('end', function () {
finish();
});
},
);
req.setTimeout(TIMEOUT_MS);
req.on('timeout', () => {
req.destroy(
new Error(`timeout (${TIMEOUT_MS}ms) registering lambda transaction`),
);
});
req.on('error', (err) => {
finish(err);
});
req.write(this._encodedMetadata);
req.write(out);
req.end();
});
};
// With the cork/uncork handling on this stream, `this.write`ing on this
// stream when already destroyed will lead to:
// Error: Cannot call write after a stream was destroyed
// when the `_corkTimer` expires.
Client.prototype._isUnsafeToWrite = function () {
return this.destroyed;
};
Client.prototype._shouldDropEvent = function () {
this._numEvents++;
const shouldDrop = this._writableState.length >= this._conf.maxQueueSize;
if (shouldDrop) {
this._numEventsDropped++;
}
return shouldDrop;
};
Client.prototype.sendSpan = function (span, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return;
}
this._maybeCork();
return this.write({ span }, Client.encoding.SPAN, cb);
};
Client.prototype.sendTransaction = function (transaction, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return;
}
this._maybeCork();
return this.write({ transaction }, Client.encoding.TRANSACTION, cb);
};
Client.prototype.sendError = function (error, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return;
}
this._maybeCork();
return this.write({ error }, Client.encoding.ERROR, cb);
};
Client.prototype.sendMetricSet = function (metricset, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return;
}
this._maybeCork();
return this.write({ metricset }, Client.encoding.METRICSET, cb);
};
/**
* If possible, start a flush of currently queued APM events to APM server.
*
* "If possible," because there are some guards on uncorking. See `_maybeUncork`.
*
* @param {Object} opts - Optional.
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true
* tells the client to also handle the end of a Lambda function invocation.
* @param {Function} cb - Optional. `cb()` will be called when the data has
* be sent to APM Server (or failed in the attempt).
*/
Client.prototype.flush = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts;
opts = {};
} else if (!opts) {
opts = {};
}
const lambdaEnd = !!opts.lambdaEnd;
// Write the special "flush" signal. We do this so that the order of writes
// and flushes are kept. If we where to just flush the client right here, the
// internal Writable buffer might still contain data that hasn't yet been
// given to the _write function.
if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) {
// To flush the current data and ensure that subsequently sent events *in
// the same tick* do not start a new intake request, we must uncork
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`.
assert(
this._encodedMetadata,
'client.flush({lambdaEnd:true}) must not be called before metadata has been set',
);
const rv = this.write(kLambdaEndFlush, cb);
this.uncork();
this._lambdaActive = false;
return rv;
} else {
this._maybeUncork();
return this.write(kFlush, cb);
}
};
// A handler that can be called on process "beforeExit" to attempt quick and
// orderly shutdown of the client. It attempts to ensure that the current
// active intake API request to APM server is completed quickly.
Client.prototype._gracefulExit = function () {
this._log.trace('_gracefulExit');
if (this._intakeRequestGracefulExitFn) {
this._intakeRequestGracefulExitFn();
}
// Calling _ref here, instead of relying on the _ref call in `_final`,
// is necessary because `client.end()` does *not* result in the Client's
// `_final()` being called when the process is exiting.
this._ref();
this.end();
};
Client.prototype._final = function (cb) {
this._log.trace('_final');
if (this._configTimer) {
clearTimeout(this._configTimer);
this._configTimer = null;
}
clientsToAutoEnd[this._index] = null; // remove global reference to ease garbage collection
this._ref();
this._chopper.end();
cb();
};
Client.prototype._destroy = function (err, cb) {
this._log.trace({ err }, '_destroy');
if (this._configTimer) {
clearTimeout(this._configTimer);
this._configTimer = null;
}
if (this._corkTimer) {
clearTimeout(this._corkTimer);
this._corkTimer = null;
}
clientsToAutoEnd[this._index] = null; // remove global reference to ease garbage collection
this._chopper.destroy();
this._agent.destroy();
cb(err);
};
// Return the appropriate backoff delay (in milliseconds) before a next possible
// request to APM server.
// Spec: https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
//
// In a Lambda environment, a backoff delay can be harmful: The backoff
// setTimeout is unref'd, to not hold the process open. A subsequent Lambda
// function invocation during that timer will result in no active handles and
// a process "beforeExit" event. That event is interpreted by the Lambda Runtime
// as "the Lambda function callback was never called", and it terminates the
// function and responds with `null`. The solution is to never backoff in a
// Lambda environment -- we expect and assume the Lambda extension is working,
// and pass responsibility for backoff to the extension.
Client.prototype._getBackoffDelay = function (isErr) {
let reconnectCount = this._backoffReconnectCount;
if (isErr && !isLambdaExecutionEnvironment) {
this._backoffReconnectCount++;
} else {
this._backoffReconnectCount = 0;
reconnectCount = 0;
}
// min(reconnectCount++, 6) ** 2 ± 10%
const delayS = Math.pow(Math.min(reconnectCount, 6), 2);
const jitterS = delayS * (0.2 * Math.random() - 0.1);
const delayMs = (delayS + jitterS) * 1000;
return delayMs;
};
function getChoppedStreamHandler(client, onerror) {
// Make a request to the apm-server intake API.
// https://www.elastic.co/guide/en/apm/server/current/events-api.html
//
// In normal operation this works as follows:
// - The StreamChopper (`this._chopper`) calls this function with a newly
// created Gzip stream, to which it writes encoded event data.
// - It `gzipStream.end()`s the stream when:
// (a) approximately `apiRequestSize` of data have been written,
// (b) `apiRequestTime` seconds have passed, or
// (c) `_chopper.chop()` is explicitly called via `client.flush()`,
// e.g. used by the Node.js APM agent after `client.sendError()`.
// - This function makes the HTTP POST to the apm-server, pipes the gzipStream
// to it, and waits for the completion of the request and the apm-server
// response.
// - Then it calls the given `next` callback to signal StreamChopper that
// another chopped stream can be created, when there is more the send.
//
// Of course, things can go wrong. Here are the known ways this pipeline can
// conclude.
// - intake response success - A successful response from the APM server. This
// is the normal operation case described above.
// - gzipStream error - An "error" event on the gzip stream.
// - intake request error - An "error" event on the intake HTTP request, e.g.
// ECONNREFUSED or ECONNRESET.
// - intakeResTimeout - A timer started *after* we are finished sending data
// to the APM server by which we require a response (including its body). By
// default this is 10s -- a very long time to allow for a slow or far
// apm-server. If we hit this, APM server is problematic anyway, so the
// delay doesn't add to the problems.
// - serverTimeout - An idle timeout value (default 30s) set on the socket.
// This is a catch-all fallback for an otherwised wedged connection. If this
// is being hit, there is some major issue in the application (possibly a
// bug in the APM agent).
// - process completion - The Client takes pains to always `.unref()` its
// handles to never keep a using process open if it is ready to exit. When
// the process is ready to exit, the following happens:
// - The "beforeExit" handler above will call `client._gracefulExit()` ...
// - ... which calls `client._ref()` to *hold the process open* to
// complete this request, and `client.end()` to end the `gzipStream` so
// this request can complete soon.
// - We then expect this request to complete quickly and the process will
// then finish exiting. A subtlety is if the APM server is not responding
// then we'll wait on the shorter `intakeResTimeoutOnEnd` (by default 1s).
return function makeIntakeRequest(gzipStream, next) {
const reqId = crypto.randomBytes(16).toString('hex');
const log = client._log.child({ reqId });
const startTime = process.hrtime();
const timeline = [];
let bytesWritten = 0;
let intakeRes;
let intakeReqSocket = null;
let intakeResTimer = null;
let intakeRequestGracefulExitCalled = false;
const intakeResTimeout = client._conf.intakeResTimeout;
const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd;
// `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`.
client._activeIntakeReq = true;
// Handle conclusion of this intake request. Each "part" is expected to call
// `completePart()` at least once -- multiple calls are okay for cases like
// the "error" and "close" events on a stream being called. When a part
// errors or all parts are completed, then we can conclude.
let concluded = false;
const completedFromPart = {
gzipStream: false,
intakeReq: false,
intakeRes: false,
};
let numToComplete = Object.keys(completedFromPart).length;
const completePart = (part, err) => {
log.trace({ err, concluded }, 'completePart %s', part);
timeline.push([
deltaMs(startTime),
`completePart ${part}`,
err && err.message,
]);
assert(part in completedFromPart, `'${part}' is in completedFromPart`);
if (concluded) {
return;
}
// If this is the final part to complete, then we are ready to conclude.
let allPartsCompleted = false;
if (!completedFromPart[part]) {
completedFromPart[part] = true;
numToComplete--;
if (numToComplete === 0) {
allPartsCompleted = true;
}
}
if (!err && !allPartsCompleted) {
return;
}
// Conclude.
concluded = true;
if (err) {
// There was an error: clean up resources.
// Note that in Node v8, destroying the gzip stream results in it
// emitting an "error" event as follows. No harm, however.
// Error: gzip stream error: zlib binding closed
// at Gzip._transform (zlib.js:369:15)
// ...
destroyStream(gzipStream);
intakeReq.destroy();
if (intakeResTimer) {
log.trace('cancel intakeResTimer');
clearTimeout(intakeResTimer);
intakeResTimer = null;
}
}
client._intakeRequestGracefulExitFn = null;
client.sent = client._numEventsEnqueued;
client._activeIntakeReq = false;
const backoffDelayMs = client._getBackoffDelay(!!err);
if (err) {
log.trace(
{ timeline, bytesWritten, backoffDelayMs, err },
'conclude intake request: error',
);
onerror(err);
} else {
log.trace(
{ timeline, bytesWritten, backoffDelayMs },
'conclude intake request: success',
);
}
if (client._onIntakeReqConcluded) {
client._onIntakeReqConcluded();
client._onIntakeReqConcluded = null;
}
if (backoffDelayMs > 0) {
setTimeout(next, backoffDelayMs).unref();
} else {
setImmediate(next);
}
};
// Provide a function on the client for it to signal this intake request
// to gracefully shutdown, i.e. finish up quickly.
client._intakeRequestGracefulExitFn = () => {
intakeRequestGracefulExitCalled = true;
if (intakeReqSocket) {
log.trace('_intakeRequestGracefulExitFn: re-ref intakeReqSocket');
intakeReqSocket.ref();
}
if (intakeResTimer) {
log.trace(
'_intakeRequestGracefulExitFn: reset intakeResTimer to short timeout',
);
clearTimeout(intakeResTimer);
intakeResTimer = setTimeout(() => {
completePart(
'intakeRes',
new Error(
'intake response timeout: APM server did not respond ' +
`within ${
intakeResTimeoutOnEnd / 1000
}s of graceful exit signal`,
),
);
}, intakeResTimeoutOnEnd).unref();
}
};
// Start the request and set its timeout.
const intakeReq = client._transportRequest(client._conf.requestIntake);
if (Number.isFinite(client._conf.serverTimeout)) {
intakeReq.setTimeout(client._conf.serverTimeout);
}
// TODO: log intakeReq and intakeRes when
// https://github.com/elastic/ecs-logging-nodejs/issues/67 is implemented.
log.trace('intake request start');
// Handle events on the intake request.
// https://nodejs.org/api/http.html#http_http_request_options_callback docs
// emitted events on the req and res objects for different scenarios.
intakeReq.on('timeout', () => {
log.trace('intakeReq "timeout"');
// `.destroy(err)` will result in an "error" event.
intakeReq.destroy(
new Error(
`APM Server response timeout (${client._conf.serverTimeout}ms)`,
),
);
});
intakeReq.on('socket', function (socket) {
intakeReqSocket = socket;
// Unref the socket for this request so that the Client does not keep
// the node process running if it otherwise would be done. (This is
// tested by the "unref-client" test in test/side-effects.js.)
//
// The HTTP keep-alive agent will unref sockets when unused, and ref them
// during a request. Given that the normal makeIntakeRequest behaviour
// is to keep a request open for up to 10s (`apiRequestTime`), we must
// manually unref the socket.
//
// The exception is when in a Lambda environment, where we *do* want to
// keep the node process running to complete this intake request.
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
// interprets as "the Lambda handler callback was never called".
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
log.trace('intakeReq "socket": unref it');
intakeReqSocket.unref();
}
});
intakeReq.on('response', (intakeRes_) => {
intakeRes = intakeRes_;
log.trace(
{ statusCode: intakeRes.statusCode, reqFinished: intakeReq.finished },
'intakeReq "response"',
);
let err;
const chunks = [];
if (!intakeReq.finished) {
// Premature response from APM server. Typically this is for errors
// like "queue is full", for which the response body will be parsed
// below. However, set an `err` as a fallback for the unexpected case
// that is with a 2xx response.
if (intakeRes.statusCode >= 200 && intakeRes.statusCode < 300) {
err = new Error(
`premature apm-server response with statusCode=${intakeRes.statusCode}`,
);
}
// There is no point (though no harm) in sending more data to the APM
// server. In case reading the error response body takes a while, pause
// the gzip stream until it is destroyed in `completePart()`.
gzipStream.pause();
}
// Handle events on the intake response.
intakeRes.on('error', (intakeResErr) => {
// I am not aware of a way to get an "error" event on the
// IncomingMessage (see also https://stackoverflow.com/q/53691119), but
// handling it here is preferable to an uncaughtException.
intakeResErr = wrapError(intakeResErr, 'intake response error event');
completePart('intakeRes', intakeResErr);
});
intakeRes.on('data', (chunk) => {
chunks.push(chunk);
});
// intakeRes.on('close', () => { log.trace('intakeRes "close"') })
// intakeRes.on('aborted', () => { log.trace('intakeRes "aborted"') })
intakeRes.on('end', () => {
log.trace('intakeRes "end"');
if (intakeResTimer) {
clearTimeout(intakeResTimer);
intakeResTimer = null;
}
if (intakeRes.statusCode < 200 || intakeRes.statusCode > 299) {
err = processIntakeErrorResponse(intakeRes, Buffer.concat(chunks));
}
completePart('intakeRes', err);
});
});
// intakeReq.on('abort', () => { log.trace('intakeReq "abort"') })
// intakeReq.on('close', () => { log.trace('intakeReq "close"') })
intakeReq.on('finish', () => {
log.trace('intakeReq "finish"');
completePart('intakeReq');
});
intakeReq.on('error', (err) => {
log.trace('intakeReq "error"');
completePart('intakeReq', err);
});
// Handle events on the gzip stream.
gzipStream.on('data', (chunk) => {
bytesWritten += chunk.length;
});
gzipStream.on('error', (gzipErr) => {
log.trace('gzipStream "error"');
gzipErr = wrapError(gzipErr, 'gzip stream error');
completePart('gzipStream', gzipErr);
});
gzipStream.on('finish', () => {
// If the apm-server is not reading its input and the gzip data is large
// enough to fill buffers, then the gzip stream will emit "finish", but
// not "end". Therefore, this "finish" event is the best indicator that
// the ball is now in the apm-server's court.
//
// We now start a timer waiting on the response, provided we still expect
// one (we don't if the request has already errored out, e.g.
// ECONNREFUSED) and it hasn't already completed (e.g. if it replied
// quickly with "queue is full").
log.trace('gzipStream "finish"');
if (!completedFromPart.intakeReq && !completedFromPart.intakeRes) {
const timeout =
client._writableState.ending || intakeRequestGracefulExitCalled
? intakeResTimeoutOnEnd
: intakeResTimeout;
log.trace({ timeout }, 'start intakeResTimer');
intakeResTimer = setTimeout(() => {
completePart(
'intakeRes',
new Error(
'intake response timeout: APM server did not respond ' +
`within ${timeout / 1000}s of gzip stream finish`,
),
);
}, timeout).unref();
}
});
// Watch the gzip "end" event for its completion, because the "close" event
// that we would prefer to use, *does not get emitted* for the
// `client.sendSpan(callback) + client.flush()` test case with
// *node v12-only*.
gzipStream.on('end', () => {
log.trace('gzipStream "end"');
completePart('gzipStream');
});
// gzipStream.on('close', () => { log.trace('gzipStream "close"') })
// Hook up writing data to a file (only intended for local debugging).
// Append the intake data to `payloadLogFile`, if given. This is only
// intended for local debugging because it can have a significant perf
// impact.
if (client._conf.payloadLogFile) {
const payloadLogStream = fs.createWriteStream(
client._conf.payloadLogFile,
{ flags: 'a' },
);
gzipStream.pipe(zlib.createGunzip()).pipe(payloadLogStream);
}
// Send the metadata object (always first) and hook up the streams.
assert(client._encodedMetadata, 'client._encodedMetadata is set');
gzipStream.write(client._encodedMetadata);
gzipStream.pipe(intakeReq);
};
}
/**
* Some behaviors in the APM depend on the APM Server version. These are
* exposed as `Client#supports...` boolean methods.
*
* These `Client#supports...` method names, if not always the implementation,
* intentionally match those from the Java agent:
* https://github.com/elastic/apm-agent-java/blob/master/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java#L322-L349
*/
Client.prototype.supportsKeepingUnsampledTransaction = function () {
// Default to assuming we are using a pre-8.0 APM Server if we haven't
// yet fetched the version. There is no harm in sending unsampled transactions
// to APM Server >=v8.0.
if (!this._apmServerVersion) {
return true;
} else {
return this._apmServerVersion.major < 8;
}
};
Client.prototype.supportsActivationMethodField = function () {
// APM server 8.7.0 had a bug where continuing to send `activation_method` is
// harmful.
if (!this._apmServerVersion) {
return true; // Optimistically assume APM server isn't v8.7.0.
} else {
return semver.gte(this._apmServerVersion, '8.7.1');
}
};
Client.prototype.supportsConfiguredAndDetectedHostname = function () {
if (!this._apmServerVersion) {
return true; // Optimistically assume APM server is >=7.4.
} else {
return semver.gte(this._apmServerVersion, '7.4.0');
}
};
/**
* Signal to the Elastic AWS Lambda extension that a lambda function execution
* is done.
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing
*
* @param {Function} cb() is called when finished. There are no arguments.
*/
Client.prototype._signalLambdaEnd = function (cb) {
this._log.trace('_signalLambdaEnd start');
const startTime = performance.now();
const finish = (errOrErrMsg) => {
const durationMs = performance.now() - startTime;
if (errOrErrMsg) {
this._log.error(
{ err: errOrErrMsg, durationMs },
'_signalLambdaEnd error',
);
} else {
this._log.trace({ durationMs }, '_signalLambdaEnd success');
}
cb();
};
// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000;
const req = this._transportRequest(
this._conf.requestSignalLambdaEnd,
(res) => {
res.on('error', (err) => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err);
});
res.resume();
if (res.statusCode !== 202) {
finish(`unexpected response status code: ${res.statusCode}`);
return;
}
res.on('end', function () {
finish();
});
},
);
req.setTimeout(TIMEOUT_MS);
req.on('timeout', () => {
req.destroy(
new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`),
);
});
req.on('error', (err) => {
finish(err);
});
req.end();
};
/**
* Fetch the APM Server version and set `this._apmServerVersion`.
* https://www.elastic.co/guide/en/apm/server/current/server-info.html
*
* If fetching/parsing fails then the APM server version will be set to `null`
* to indicate "unknown version".
*/
Client.prototype._fetchApmServerVersion = function () {
const setVerUnknownAndNotify = (errmsg) => {
this._apmServerVersion = null; // means "unknown version"
this._resetEncodedMetadata();
if (isLambdaExecutionEnvironment) {
// In a Lambda environment, where the process can be frozen, it is not
// unusual for this request to hit an error. As long as APM Server version
// fetching is not critical to tracing of Lambda invocations, then it is
// preferable to not add an error message to the users log.
this._log.debug('verfetch: ' + errmsg);
} else {
this.emit('request-error', new Error(errmsg));
}
};
const headers = getHeaders(this._conf);
// Explicitly do *not* pass in `this._agent` -- the keep-alive http.Agent
// used for intake requests -- because the socket.ref() handling in
// `Client#_ref()` conflicts with the socket.unref() below.
const reqOpts = getBasicRequestOptions('GET', '/', headers, this._conf);
reqOpts.timeout = 30000;
const req = this._transportGet(reqOpts, (res) => {
res.on('error', (err) => {
// Not sure this event can ever be emitted, but just in case
res.destroy(err);
});
if (res.statusCode !== 200) {
res.resume();
setVerUnknownAndNotify(
`unexpected status from APM Server information endpoint: ${res.statusCode}`,
);
return;
}
const chunks = [];
res.on('data', (chunk) => {
chunks.push(chunk);
});
res.on('end', () => {
if (chunks.length === 0) {
setVerUnknownAndNotify(
'APM Server information endpoint returned no body, often this indicates authentication ("apiKey" or "secretToken") is incorrect',
);
return;
}
let serverInfo;
try {
serverInfo = JSON.parse(Buffer.concat(chunks));
} catch (parseErr) {
setVerUnknownAndNotify(
`could not parse APM Server information endpoint body: ${parseErr.message}`,
);
return;
}
if (serverInfo) {
// APM Server 7.0.0 dropped the "ok"-level in the info endpoint body.
const verStr = serverInfo.ok
? serverInfo.ok.version
: serverInfo.version;
try {
this._apmServerVersion = new semver.SemVer(verStr);
} catch (verErr) {
setVerUnknownAndNotify(
`could not parse APM Server version "${verStr}": ${verErr.message}`,
);
return;
}
this._resetEncodedMetadata();
this._log.debug(
{ apmServerVersion: verStr },
'fetched APM Server version',
);
} else {
setVerUnknownAndNotify(
`could not determine APM Server version from information endpoint body: ${JSON.stringify(
serverInfo,
)}`,
);
}
});
});
req.on('socket', (socket) => {
// Unref our socket to ensure this request does not keep the process alive.
socket.unref();
});
req.on('timeout', () => {
this._log.trace('_fetchApmServerVersion timeout');
req.destroy(
new Error(`timeout (${reqOpts.timeout}ms) fetching APM Server version`),
);
});
req.on('error', (err) => {
setVerUnknownAndNotify(`error fetching APM Server version: ${err.message}`);
});
};
/**
* Fetches cloud metadata, if any, and encodes metadata (to `_encodedMetadata`).
*
* @param {function} cb - Called, with no arguments, when complete.
*/
Client.prototype._fetchAndEncodeMetadata = function (cb) {
assert(
this._conf.cloudMetadataFetcher,
'_fetchAndEncodeMetadata should not be called without a configured cloudMetadataFetcher',
);
this._conf.cloudMetadataFetcher.getCloudMetadata((err, cloudMetadata) => {
if (err) {
// We ignore this error (other than logging it). A common case, when
// not running on one of the big 3 clouds, is "all callbacks failed",
// which is *fine*. Because it is a common "error" we don't log the
// stack trace.
this._log.trace('getCloudMetadata err: %s', err);
} else if (cloudMetadata) {
this._cloudMetadata = cloudMetadata;
}
this._resetEncodedMetadata();
cb();
});
};
function getIntakeRequestOptions(opts, agent) {
const headers = getHeaders(opts);
headers['Content-Type'] = 'application/x-ndjson';
headers['Content-Encoding'] = 'gzip';
return getBasicRequestOptions(
'POST',
'/intake/v2/events',
headers,
opts,
agent,
);
}
function getSignalLambdaEndRequestOptions(opts, agent) {
const headers = getHeaders(opts);
headers['Content-Length'] = 0;
return getBasicRequestOptions(
'POST',
'/intake/v2/events?flushed=true',
headers,
opts,
agent,
);
}
function getRegisterTransactionRequestOptions(opts, agent) {
const headers = getHeaders(opts);
headers['Content-Type'] = 'application/vnd.elastic.apm.transaction+ndjson';
return getBasicRequestOptions(
'POST',
'/register/transaction',
headers,
opts,
agent,
);
}
function getConfigRequestOptions(opts, agent) {
const path =
'/config/v1/agents?' +
querystring.stringify({
'service.name': opts.serviceName,
'service.environment': opts.environment,
});
const headers = getHeaders(opts);
return getBasicRequestOptions('GET', path, headers, opts, agent);
}
function getBasicRequestOptions(method, defaultPath, headers, opts, agent) {
return {
agent,
rejectUnauthorized: opts.rejectUnauthorized !== false,
ca: opts.serverCaCert,
hostname: opts.serverUrl.hostname,
port: opts.serverUrl.port,
method,
path:
opts.serverUrl.pathname === '/'
? defaultPath
: opts.serverUrl.pathname + defaultPath,
headers,
};
}
function getHeaders(opts) {
const headers = {};
if (opts.secretToken) headers.Authorization = 'Bearer ' + opts.secretToken;
if (opts.apiKey) headers.Authorization = 'ApiKey ' + opts.apiKey;
headers.Accept = 'application/json';
headers['User-Agent'] = opts.userAgent;
return Object.assign(headers, opts.headers);
}
function metadataFromConf(opts, client) {
var payload = {
service: {
name: opts.serviceName,
environment: opts.environment,
runtime: {
name: process.release.name,
version: process.versions.node,
},
language: {
name: 'javascript',
},
agent: {
name: opts.agentName,
version: opts.agentVersion,
},
framework: undefined,
version: undefined,
node: undefined,
},
process: {
pid: process.pid,
ppid: process.ppid,
title: process.title,
argv: process.argv,
},
system: {
architecture: process.arch,
platform: process.platform,
container: undefined,
kubernetes: undefined,
},
labels: opts.globalLabels,
};
// On `system.*hostname` fields:
// - `hostname` was deprecated in APM server v7.4, replaced by the next two.
// - Around Elastic v8.9, ECS changed `host.name` to prefer the FQDN,
// hence APM agents now prefer FQDN for `detected_hostname`.
if (client.supportsConfiguredAndDetectedHostname()) {
payload.system.detected_hostname = opts.detectedHostname;
if (opts.configuredHostname) {
payload.system.configured_hostname = opts.configuredHostname;
}
} else {
payload.system.hostname = opts.configuredHostname || opts.detectedHostname;
}
if (opts.agentActivationMethod && client.supportsActivationMethodField()) {
payload.service.agent.activation_method = opts.agentActivationMethod;
}
if (opts.serviceNodeName) {
payload.service.node = {
configured_name: opts.serviceNodeName,
};
}
if (opts.serviceVersion) payload.service.version = opts.serviceVersion;
if (opts.frameworkName || opts.frameworkVersion) {
payload.service.framework = {
name: opts.frameworkName,
version: opts.frameworkVersion,
};
}
if (opts.containerId) {
payload.system.container = {
id: opts.containerId,
};
}
if (
opts.kubernetesNodeName ||
opts.kubernetesNamespace ||
opts.kubernetesPodName ||
opts.kubernetesPodUID
) {
payload.system.kubernetes = {
namespace: opts.kubernetesNamespace,
node: opts.kubernetesNodeName
? { name: opts.kubernetesNodeName }
: undefined,
pod:
opts.kubernetesPodName || opts.kubernetesPodUID
? { name: opts.kubernetesPodName, uid: opts.kubernetesPodUID }
: undefined,
};
}
return payload;
}
function destroyStream(stream) {
if (
stream instanceof zlib.Gzip ||
stream instanceof zlib.Gunzip ||
stream instanceof zlib.Deflate ||
stream instanceof zlib.DeflateRaw ||
stream instanceof zlib.Inflate ||
stream instanceof zlib.InflateRaw ||
stream instanceof zlib.Unzip
) {
// Zlib streams doesn't have a destroy function in Node.js 6. On top of
// that simply calling destroy on a zlib stream in Node.js 8+ will result
// in a memory leak as the handle isn't closed (an operation normally done
// by calling close). So until that is fixed, we need to manually close the
// handle after destroying the stream.
//
// PR: https://github.com/nodejs/node/pull/23734
if (typeof stream.destroy === 'function') {
// Manually close the stream instead of calling `close()` as that would
// have emitted 'close' again when calling `destroy()`
if (stream._handle && typeof stream._handle.close === 'function') {
stream._handle.close();
stream._handle = null;
}
stream.destroy();
} else if (typeof stream.close === 'function') {
stream.close();
}
} else {
// For other streams we assume calling destroy is enough
if (typeof stream.destroy === 'function') stream.destroy();
// Or if there's no destroy (which Node.js 6 will not have on regular
// streams), emit `close` as that should trigger almost the same effect
else if (typeof stream.emit === 'function') stream.emit('close');
}
}
function oneOf(value, list) {
return list.indexOf(value) >= 0;
}
function normalizeGlobalLabels(labels) {
if (!labels) return;
const result = {};
for (const key of Object.keys(labels)) {
const value = labels[key];
result[key] = oneOf(typeof value, ['string', 'number', 'boolean'])
? value
: value.toString();
}
return result;
}
// https://httpwg.org/specs/rfc9111.html#cache-response-directive.max-age
function getMaxAge(res) {
const header = res.headers['cache-control'];
if (!header) {
return undefined;
}
const match = header.match(/max-age=(\d+)/i);
if (!match) {
return undefined;
}
return parseInt(match[1], 10);
}
// Wrap the given Error object, including the given message.
//
// Dev Note: Various techniques exist to wrap `Error`s in node.js and JavaScript
// to provide a cause chain, e.g. see
// https://www.joyent.com/node-js/production/design/errors
// However, I'm not aware of a de facto "winner". Eventually there may be
// https://github.com/tc39/proposal-error-cause
// For now we will simply prefix the existing error object's `message` property.
// This is simple and preserves the root error `stack`.
function wrapError(err, msg) {
err.message = msg + ': ' + err.message;
return err;
}
function processIntakeErrorResponse(res, buf) {
const err = new Error('Unexpected APM Server response');
err.code = res.statusCode;
if (buf.length > 0) {
// https://www.elastic.co/guide/en/apm/server/current/events-api.html#events-api-errors
const body = buf.toString('utf8');
const contentType = res.headers['content-type'];
if (contentType && contentType.startsWith('application/json')) {
try {
const data = JSON.parse(body);
err.accepted = data.accepted;
err.errors = data.errors;
if (!err.errors) err.response = body;
} catch (e) {
err.response = body;
}
} else {
err.response = body;
}
}
return err;
}
// Construct or decorate an Error instance from a failing response from the
// APM server central config endpoint.
//
// @param {IncomingMessage} res
// @param {Buffer|undefined} buf - Optional. A Buffer holding the response body.
// @param {Error|undefined} err - Optional. A cause Error instance.
function processConfigErrorResponse(res, buf, err) {
// This library doesn't have a pattern for wrapping errors yet, so if
// we already have an Error instance, we will just decorate it. That preserves
// the stack of the root cause error.
const errMsg = 'Unexpected APM Server response when polling config';
if (!err) {
err = new Error(errMsg);
} else {
err.message = errMsg + ': ' + err.message;
}
err.code = res.statusCode;
if (buf && buf.length > 0) {
const body = buf.toString('utf8');
const contentType = res.headers['content-type'];
if (contentType && contentType.startsWith('application/json')) {
try {
const response = JSON.parse(body);
if (typeof response === 'string') {
err.response = response;
} else if (
typeof response === 'object' &&
response !== null &&
typeof response.error === 'string'
) {
err.response = response.error;
} else {
err.response = body;
}
} catch (e) {
err.response = body;
}
} else {
err.response = body;
}
}
return err;
}
// Return the time difference (in milliseconds) between the given time `t`
// (a 2-tuple as returned by `process.hrtime()`) and now.
function deltaMs(t) {
const d = process.hrtime(t);
return d[0] * 1e3 + d[1] / 1e6;
}
/**
* Performs a deep merge of `source` into `target`. Mutates `target` only but
* not its objects. Objects are merged, Arrays are not.
*
* @author inspired by [eden](https://gist.github.com/ahtcx/0cd94e62691f539160b32ecda18af3d6#gistcomment-2930530)
*/
function metadataMergeDeep(target, source) {
const isObject = (obj) =>
obj && typeof obj === 'object' && !Array.isArray(obj);
if (!isObject(target) || !isObject(source)) {
return source;
}
Object.keys(source).forEach((key) => {
const targetValue = target[key];
const sourceValue = source[key];
if (isObject(targetValue) && isObject(sourceValue)) {
target[key] = metadataMergeDeep(
Object.assign({}, targetValue),
sourceValue,
);
} else {
target[key] = sourceValue;
}
});
return target;
}
function deepClone(obj) {
return JSON.parse(JSON.stringify(obj));
}