function createResponseStream()

in src/ResponseStream.js [123:285]


function createResponseStream(options) {
  let status = STATUS_READY;
  let req = undefined;

  const headers = {
    [HEADER_RESPONSE_MODE]: VALUE_STREAMING,
    Trailer: [TRAILER_NAME_ERROR_TYPE, TRAILER_NAME_ERROR_BODY],
    'Content-Type': options.contentType
      ? options.contentType
      : DEFAULT_CONTENT_TYPE,
  };

  let responseDoneResolve;
  let responseDoneReject;
  let responseDonePromise = new Promise((resolve, reject) => {
    responseDoneResolve = resolve;
    responseDoneReject = reject;
  });

  let headersDoneResolve;
  let headersDoneReject;
  let headersDonePromise = new Promise((resolve, reject) => {
    headersDoneResolve = resolve;
    headersDoneReject = reject;
  });

  const agent = options.httpOptions.agent;
  agent.createConnection = (opts, connectionListener) => {
    return net.createConnection(
      {
        ...opts,
        highWaterMark: options?.httpOptions?.highWaterMark,
      },
      connectionListener,
    );
  };

  req = options.httpOptions.http.request(
    {
      http: options.httpOptions.http,
      contentType: options.httpOptions.contentType,
      method: options.httpOptions.method,
      hostname: options.httpOptions.hostname,
      port: options.httpOptions.port,
      path: options.httpOptions.path,
      headers,
      agent: agent,
    },
    (res) => {
      headersDoneResolve({
        statusCode: res.statusCode,
        statusMessage: res.statusMessage,
        headers: res.headers,
      });

      let buf = undefined;

      res.on('data', (chunk) => {
        buf = typeof buf === 'undefined' ? chunk : Buffer.concat([buf, chunk]);
      });

      res.on('aborted', (err) => {
        if (responseDoneReject) {
          responseDoneReject(err);
        }
        req.destroy(err);
      });

      res.on('end', () => {
        vvverbose('rapid response', buf ? buf.toString() : 'buf undefined');

        responseDoneResolve(buf);
      });
    },
  );

  req.on('error', (err) => {
    if (headersDoneReject) {
      headersDoneReject(err);
    }

    if (responseDoneReject) {
      responseDoneReject(err);
    }

    req.destroy(err);
  });

  const origEnd = req.end.bind(req);
  req.end = function (cb) {
    origEnd(cb);
  };

  req.setContentType = function (contentType) {
    if (status !== STATUS_READY) {
      throw new InvalidStreamingOperation('Cannot set content-type, too late.');
    }
    req.setHeader('Content-Type', contentType);
  };

  const origWrite = req.write.bind(req);
  req.write = function (chunk, encoding, callback) {
    vvverbose(
      'ResponseStream::write',
      chunk.length,
      'callback:',
      typeof callback,
    );

    if (
      typeof chunk !== 'string' &&
      !Buffer.isBuffer(chunk) &&
      chunk?.constructor !== Uint8Array
    ) {
      chunk = JSON.stringify(chunk);
    }

    if (
      status === STATUS_READY &&
      typeof this._onBeforeFirstWrite === 'function'
    ) {
      this._onBeforeFirstWrite((ch) => origWrite(ch));
    }

    // First write shall open the connection.
    const ret = origWrite(chunk, encoding, callback);

    vvverbose('ResponseStream::origWrite', ret);
    vvverbose('ResponseStream::write outputData len', this.outputData.length);
    vvverbose('ResponseStream::write outputSize', this.outputSize);

    if (status === STATUS_READY) {
      status = STATUS_WRITE_CALLED;
    }

    return ret;
  };

  const request = writableStreamOnly(req);

  // This will close the stream after sending the error.
  addFailWeakProp(request, function (err, callback) {
    verbose('ResponseStream::fail err:', err);

    const error = toRapidResponse(err);

    // Send error as trailers.
    req.addTrailers({
      [TRAILER_NAME_ERROR_TYPE]: error.errorType,
      [TRAILER_NAME_ERROR_BODY]: Buffer.from(JSON.stringify(error)).toString(
        'base64',
      ),
    });

    req.end(callback);
  });

  return {
    request,
    headersDone: headersDonePromise, // Not required to be awaited.
    responseDone: responseDonePromise,
  };
}