async handleOnceStreaming()

in src/Runtime.js [93:157]


  async handleOnceStreaming() {
    let { bodyJson, headers } = await this.client.nextInvocation();

    let invokeContext = new InvokeContext(headers);
    invokeContext.updateLoggingContext();

    let streamingContext = StreamingContext.build(
      this.client,
      invokeContext.invokeId,
      this.scheduleIteration.bind(this),
      this.handlerMetadata?.highWaterMark
        ? { highWaterMark: this.handlerMetadata.highWaterMark }
        : undefined,
    );

    const {
      responseStream,
      rapidResponse,
      scheduleNext,
      fail: ctxFail,
    } = streamingContext.createStream();
    delete streamingContext.createStream;

    try {
      this._setErrorCallbacks(invokeContext.invokeId);
      this._setStreamingExitListener(invokeContext.invokeId, responseStream);

      const ctx = invokeContext.attachEnvironmentData(streamingContext);

      verbose('Runtime::handleOnceStreaming', 'invoking handler');
      const event = JSON.parse(bodyJson);
      const handlerResult = this.handler(event, responseStream, ctx);
      verbose('Runtime::handleOnceStreaming', 'handler returned');

      if (!_isPromise(handlerResult)) {
        verbose('Runtime got non-promise response');
        ctxFail('Streaming does not support non-async handlers.', scheduleNext);

        return;
      }

      const result = await handlerResult;
      if (typeof result !== 'undefined') {
        console.warn('Streaming handlers ignore return values.');
      }
      verbose('Runtime::handleOnceStreaming result is awaited.');

      // await for the rapid response if present.
      if (rapidResponse) {
        const res = await rapidResponse;
        vverbose('RAPID response', res);
      }

      if (!responseStream.writableFinished) {
        ctxFail('Response stream is not finished.', scheduleNext);
        return;
      }

      // Handler returned and response has ended.
      scheduleNext();
    } catch (err) {
      verbose('Runtime::handleOnceStreaming::finally stream destroyed');
      ctxFail(err, scheduleNext);
    }
  }