in src/Transport.ts [330:674]
private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise<any> {
const connectionParams: ConnectionRequestParams = {
method: params.method,
path: params.path
}
const meta: TransportResult['meta'] = {
context: null,
request: {
params: connectionParams,
options,
id: options.id ?? this[kGenerateRequestId](params, options)
},
name: this[kName],
connection: null,
attempts: 0,
aborted: false
}
const returnMeta = options.meta ?? false
if (this[kContext] != null && options.context != null) {
meta.context = Object.assign({}, this[kContext], options.context)
} else if (this[kContext] !== null) {
meta.context = this[kContext]
} else if (options.context != null) {
meta.context = options.context
}
const result: TransportResult = {
// the default body value can't be `null`
// as it's a valid JSON value
body: undefined,
statusCode: 0,
headers: {},
meta,
get warnings () {
if (this.headers?.warning == null) {
return null
}
const { warning } = this.headers
// if multiple HTTP headers have the same name, Undici represents them as an array
const warnings: string[] = Array.isArray(warning) ? warning : [warning]
return warnings
.flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/))
.filter((warning) => warning.match(/^\d\d\d Elasticsearch-/))
}
}
// We should not retry if we are sending a stream body, because we should store in memory
// a copy of the stream to be able to send it again, but since we don't know in advance
// the size of the stream, we risk to take too much memory.
// Furthermore, copying every time the stream is very a expensive operation.
const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries])
const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression]
const signal = options.signal
const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize]
const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize]
const errorOptions: ErrorOptions = {
redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction]
}
this[kDiagnostic].emit('serialization', null, result)
const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers))
if (options.opaqueId !== undefined) {
headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string'
? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line
: options.opaqueId
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body)) {
try {
connectionParams.body = this[kSerializer].serialize(params.body)
} catch (err: any) {
this[kDiagnostic].emit('request', err, result)
throw err
}
headers['content-type'] = headers['content-type'] ?? this[kJsonContentType]
headers.accept = headers.accept ?? this[kJsonContentType]
} else {
if (params.body !== '') {
headers['content-type'] = headers['content-type'] ?? 'text/plain'
headers.accept = headers.accept ?? this[kAcceptHeader]
}
connectionParams.body = params.body
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody)) {
try {
connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array<Record<string, any>>)
} catch (err: any) {
this[kDiagnostic].emit('request', err, result)
throw err
}
} else {
connectionParams.body = params.bulkBody
}
if (connectionParams.body !== '') {
headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType]
headers.accept = headers.accept ?? this[kJsonContentType]
}
}
// serializes the querystring
if (options.querystring == null) {
connectionParams.querystring = this[kSerializer].qserialize(params.querystring)
} else {
connectionParams.querystring = this[kSerializer].qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handle compression
if (connectionParams.body !== '' && connectionParams.body != null) {
if (isStream(connectionParams.body)) {
if (compression) {
headers['content-encoding'] = 'gzip'
connectionParams.body = connectionParams.body.pipe(createGzip())
}
} else if (compression) {
try {
connectionParams.body = await gzip(connectionParams.body)
} catch (err: any) {
/* istanbul ignore next */
this[kDiagnostic].emit('request', err, result)
/* istanbul ignore next */
throw err
}
headers['content-encoding'] = 'gzip'
headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line
} else {
headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line
}
}
headers.accept = headers.accept ?? this[kAcceptHeader]
connectionParams.headers = headers
while (meta.attempts <= maxRetries) {
try {
if (signal?.aborted) { // eslint-disable-line
throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions)
}
meta.connection = this.getConnection({
requestId: meta.request.id,
context: meta.context
})
if (meta.connection === null) {
throw new NoLivingConnectionsError('There are no living connections', result, errorOptions)
}
// generate required OpenTelemetry attributes from the request URL
const requestUrl = meta.connection.url
otelSpan?.setAttributes({
'url.full': requestUrl.toString(),
'server.address': requestUrl.hostname
})
if (requestUrl.port === '') {
if (requestUrl.protocol === 'https:') {
otelSpan?.setAttribute('server.port', 443)
} else if (requestUrl.protocol === 'http:') {
otelSpan?.setAttribute('server.port', 80)
}
} else if (requestUrl.port !== '9200') {
otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10))
}
this[kDiagnostic].emit('request', null, result)
// set timeout defaults
let timeout = options.requestTimeout ?? this[kRequestTimeout] ?? undefined
if (timeout != null) timeout = toMs(timeout)
// perform the actual http request
let { statusCode, headers, body } = await meta.connection.request(connectionParams, {
requestId: meta.request.id,
name: this[kName],
context: meta.context,
maxResponseSize,
maxCompressedResponseSize,
signal,
timeout,
...(options.asStream === true ? { asStream: true } : null)
})
result.statusCode = statusCode
result.headers = headers
if (headers['x-found-handling-cluster'] != null) {
otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster'])
}
if (headers['x-found-handling-instance'] != null) {
otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance'])
}
if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) {
/* eslint-disable @typescript-eslint/prefer-ts-expect-error */
// @ts-ignore
throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions)
/* eslint-enable @typescript-eslint/prefer-ts-expect-error */
}
if (options.asStream === true) {
result.body = body
this[kDiagnostic].emit('response', null, result)
return returnMeta ? result : body
}
const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase()
if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) {
body = await unzip(body)
}
if (Buffer.isBuffer(body) && !isBinary(headers['content-type'] ?? '')) {
body = body.toString()
}
const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (headers['content-type'] !== undefined &&
(headers['content-type']?.includes('application/json') ||
headers['content-type']?.includes('application/vnd.elasticsearch+json')) &&
!isHead && body !== '') { // eslint-disable-line
result.body = this[kSerializer].deserialize(body as string)
} else {
// cast to boolean if the request method was HEAD and there was no error
result.body = isHead && statusCode < 400 ? true : body
}
// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) ||
(isHead && statusCode === 404)
if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this[kConnectionPool].markDead(meta.connection)
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
continue
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this[kConnectionPool].markAlive(meta.connection)
}
if (!ignoreStatusCode && statusCode >= 400) {
throw new ResponseError(result, errorOptions)
} else {
// cast to boolean if the request method was HEAD
if (isHead && statusCode === 404) {
result.body = false
}
this[kDiagnostic].emit('response', null, result)
return returnMeta ? result : result.body
}
} catch (error: any) {
switch (error.name) {
// should not retry
case 'ProductNotSupportedError':
case 'NoLivingConnectionsError':
case 'DeserializationError':
case 'ResponseError':
this[kDiagnostic].emit('response', error, result)
throw error
case 'RequestAbortedError': {
meta.aborted = true
// Wrap the error to get a clean stack trace
const wrappedError = new RequestAbortedError(error.message, result, errorOptions)
this[kDiagnostic].emit('response', wrappedError, result)
throw wrappedError
}
// should maybe retry
// @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true
case 'TimeoutError':
if (!this[kRetryOnTimeout]) {
const wrappedError = new TimeoutError(error.message, result, errorOptions)
this[kDiagnostic].emit('response', wrappedError, result)
throw wrappedError
}
// should retry
// eslint-disable-next-line no-fallthrough
case 'ConnectionError': {
// if there is an error in the connection
// let's mark the connection as dead
this[kConnectionPool].markDead(meta.connection as Connection)
if (this[kSniffOnConnectionFault]) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id,
context: meta.context
})
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
// don't use exponential backoff until retrying on each node
if (meta.attempts >= this[kConnectionPool].size) {
// exponential backoff on retries, with jitter
const backoff = options.retryBackoff ?? this[kRetryBackoff]
const backoffWait = backoff(0, 4, meta.attempts)
if (backoffWait > 0) {
await setTimeout(backoffWait * 1000)
}
}
continue
}
// Wrap the error to get a clean stack trace
const wrappedError = error.name === 'TimeoutError'
? new TimeoutError(error.message, result, errorOptions)
: new ConnectionError(error.message, result, errorOptions)
this[kDiagnostic].emit('response', wrappedError, result)
throw wrappedError
}
// edge cases, such as bad compression
default:
this[kDiagnostic].emit('response', error, result)
throw error
}
}
}
return returnMeta ? result : result.body
}