private async _request()

in src/Transport.ts [328:674]


  private async _request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta, otelSpan?: Span): Promise<TransportResult<TResponse, TContext>>
  private async _request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions, otelSpan?: Span): Promise<TResponse>
  private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise<any> {
    const connectionParams: ConnectionRequestParams = {
      method: params.method,
      path: params.path
    }

    const meta: TransportResult['meta'] = {
      context: null,
      request: {
        params: connectionParams,
        options,
        id: options.id ?? this[kGenerateRequestId](params, options)
      },
      name: this[kName],
      connection: null,
      attempts: 0,
      aborted: false
    }

    const returnMeta = options.meta ?? false

    if (this[kContext] != null && options.context != null) {
      meta.context = Object.assign({}, this[kContext], options.context)
    } else if (this[kContext] !== null) {
      meta.context = this[kContext]
    } else if (options.context != null) {
      meta.context = options.context
    }

    const result: TransportResult = {
      // the default body value can't be `null`
      // as it's a valid JSON value
      body: undefined,
      statusCode: 0,
      headers: {},
      meta,
      get warnings () {
        if (this.headers?.warning == null) {
          return null
        }
        const { warning } = this.headers
        // if multiple HTTP headers have the same name, Undici represents them as an array
        const warnings: string[] = Array.isArray(warning) ? warning : [warning]
        return warnings
          .flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/))
          .filter((warning) => warning.match(/^\d\d\d Elasticsearch-/))
      }
    }

    // We should not retry if we are sending a stream body, because we should store in memory
    // a copy of the stream to be able to send it again, but since we don't know in advance
    // the size of the stream, we risk to take too much memory.
    // Furthermore, copying every time the stream is very a expensive operation.
    const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries])
    const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression]
    const signal = options.signal
    const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize]
    const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize]

    const errorOptions: ErrorOptions = {
      redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction]
    }

    this[kDiagnostic].emit('serialization', null, result)
    const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers))

    if (options.opaqueId !== undefined) {
      headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string'
        ? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line
        : options.opaqueId
    }

    // handle json body
    if (params.body != null) {
      if (shouldSerialize(params.body)) {
        try {
          connectionParams.body = this[kSerializer].serialize(params.body)
        } catch (err: any) {
          this[kDiagnostic].emit('request', err, result)
          throw err
        }
        headers['content-type'] = headers['content-type'] ?? this[kJsonContentType]
        headers.accept = headers.accept ?? this[kJsonContentType]
      } else {
        if (params.body !== '') {
          headers['content-type'] = headers['content-type'] ?? 'text/plain'
          headers.accept = headers.accept ?? this[kAcceptHeader]
        }
        connectionParams.body = params.body
      }

    // handle ndjson body
    } else if (params.bulkBody != null) {
      if (shouldSerialize(params.bulkBody)) {
        try {
          connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array<Record<string, any>>)
        } catch (err: any) {
          this[kDiagnostic].emit('request', err, result)
          throw err
        }
      } else {
        connectionParams.body = params.bulkBody
      }

      if (connectionParams.body !== '') {
        headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType]
        headers.accept = headers.accept ?? this[kJsonContentType]
      }
    }

    // serializes the querystring
    if (options.querystring == null) {
      connectionParams.querystring = this[kSerializer].qserialize(params.querystring)
    } else {
      connectionParams.querystring = this[kSerializer].qserialize(
        Object.assign({}, params.querystring, options.querystring)
      )
    }

    // handle compression
    if (connectionParams.body !== '' && connectionParams.body != null) {
      if (isStream(connectionParams.body)) {
        if (compression) {
          headers['content-encoding'] = 'gzip'
          connectionParams.body = connectionParams.body.pipe(createGzip())
        }
      } else if (compression) {
        try {
          connectionParams.body = await gzip(connectionParams.body)
        } catch (err: any) {
          /* istanbul ignore next */
          this[kDiagnostic].emit('request', err, result)
          /* istanbul ignore next */
          throw err
        }
        headers['content-encoding'] = 'gzip'
        headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line
      } else {
        headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line
      }
    }

    headers.accept = headers.accept ?? this[kAcceptHeader]
    connectionParams.headers = headers
    while (meta.attempts <= maxRetries) {
      try {
        if (signal?.aborted) { // eslint-disable-line
          throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions)
        }

        meta.connection = this.getConnection({
          requestId: meta.request.id,
          context: meta.context
        })
        if (meta.connection === null) {
          throw new NoLivingConnectionsError('There are no living connections', result, errorOptions)
        }

        // generate required OpenTelemetry attributes from the request URL
        const requestUrl = meta.connection.url
        otelSpan?.setAttributes({
          'url.full': requestUrl.toString(),
          'server.address': requestUrl.hostname
        })
        if (requestUrl.port === '') {
          if (requestUrl.protocol === 'https:') {
            otelSpan?.setAttribute('server.port', 443)
          } else if (requestUrl.protocol === 'http:') {
            otelSpan?.setAttribute('server.port', 80)
          }
        } else if (requestUrl.port !== '9200') {
          otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10))
        }

        this[kDiagnostic].emit('request', null, result)

        // set timeout defaults
        let timeout = options.requestTimeout ?? this[kRequestTimeout] ?? undefined
        if (timeout != null) timeout = toMs(timeout)

        // perform the actual http request
        let { statusCode, headers, body } = await meta.connection.request(connectionParams, {
          requestId: meta.request.id,
          name: this[kName],
          context: meta.context,
          maxResponseSize,
          maxCompressedResponseSize,
          signal,
          timeout,
          ...(options.asStream === true ? { asStream: true } : null)
        })
        result.statusCode = statusCode
        result.headers = headers

        if (headers['x-found-handling-cluster'] != null) {
          otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster'])
        }

        if (headers['x-found-handling-instance'] != null) {
          otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance'])
        }

        if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) {
          /* eslint-disable @typescript-eslint/prefer-ts-expect-error */
          // @ts-ignore
          throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions)
          /* eslint-enable @typescript-eslint/prefer-ts-expect-error */
        }

        if (options.asStream === true) {
          result.body = body
          this[kDiagnostic].emit('response', null, result)
          return returnMeta ? result : body
        }

        const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase()
        if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) {
          body = await unzip(body)
        }

        if (Buffer.isBuffer(body) && !isBinary(headers['content-type'] ?? '')) {
          body = body.toString()
        }

        const isHead = params.method === 'HEAD'
        // we should attempt the payload deserialization only if:
        //    - a `content-type` is defined and is equal to `application/json`
        //    - the request is not a HEAD request
        //    - the payload is not an empty string
        if (headers['content-type'] !== undefined &&
            (headers['content-type']?.includes('application/json') ||
             headers['content-type']?.includes('application/vnd.elasticsearch+json')) &&
             !isHead && body !== '') { // eslint-disable-line
          result.body = this[kSerializer].deserialize(body as string)
        } else {
          // cast to boolean if the request method was HEAD and there was no error
          result.body = isHead && statusCode < 400 ? true : body
        }

        // we should ignore the statusCode if the user has configured the `ignore` field with
        // the statusCode we just got or if the request method is HEAD and the statusCode is 404
        const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) ||
          (isHead && statusCode === 404)

        if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) {
          // if the statusCode is 502/3/4 we should run our retry strategy
          // and mark the connection as dead
          this[kConnectionPool].markDead(meta.connection)
          // retry logic
          if (meta.attempts < maxRetries) {
            meta.attempts++
            debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
            continue
          }
        } else {
          // everything has worked as expected, let's mark
          // the connection as alive (or confirm it)
          this[kConnectionPool].markAlive(meta.connection)
        }

        if (!ignoreStatusCode && statusCode >= 400) {
          throw new ResponseError(result, errorOptions)
        } else {
          // cast to boolean if the request method was HEAD
          if (isHead && statusCode === 404) {
            result.body = false
          }
          this[kDiagnostic].emit('response', null, result)
          return returnMeta ? result : result.body
        }
      } catch (error: any) {
        switch (error.name) {
          // should not retry
          case 'ProductNotSupportedError':
          case 'NoLivingConnectionsError':
          case 'DeserializationError':
          case 'ResponseError':
            this[kDiagnostic].emit('response', error, result)
            throw error
          case 'RequestAbortedError': {
            meta.aborted = true
            // Wrap the error to get a clean stack trace
            const wrappedError = new RequestAbortedError(error.message, result, errorOptions)
            this[kDiagnostic].emit('response', wrappedError, result)
            throw wrappedError
          }
          // should maybe retry
          // @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true
          case 'TimeoutError':
            if (!this[kRetryOnTimeout]) {
              const wrappedError = new TimeoutError(error.message, result, errorOptions)
              this[kDiagnostic].emit('response', wrappedError, result)
              throw wrappedError
            }
          // should retry
          // eslint-disable-next-line no-fallthrough
          case 'ConnectionError': {
            // if there is an error in the connection
            // let's mark the connection as dead
            this[kConnectionPool].markDead(meta.connection as Connection)

            if (this[kSniffOnConnectionFault]) {
              this.sniff({
                reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
                requestId: meta.request.id,
                context: meta.context
              })
            }

            // retry logic
            if (meta.attempts < maxRetries) {
              meta.attempts++
              debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)

              // don't use exponential backoff until retrying on each node
              if (meta.attempts >= this[kConnectionPool].size) {
                // exponential backoff on retries, with jitter
                const backoff = options.retryBackoff ?? this[kRetryBackoff]
                const backoffWait = backoff(0, 4, meta.attempts)
                if (backoffWait > 0) {
                  await setTimeout(backoffWait * 1000)
                }
              }

              continue
            }

            // Wrap the error to get a clean stack trace
            const wrappedError = error.name === 'TimeoutError'
              ? new TimeoutError(error.message, result, errorOptions)
              : new ConnectionError(error.message, result, errorOptions)
            this[kDiagnostic].emit('response', wrappedError, result)
            throw wrappedError
          }

          // edge cases, such as bad compression
          default:
            this[kDiagnostic].emit('response', error, result)
            throw error
        }
      }
    }

    return returnMeta ? result : result.body
  }