in src/Runtime.js [93:157]
async handleOnceStreaming() {
let { bodyJson, headers } = await this.client.nextInvocation();
let invokeContext = new InvokeContext(headers);
invokeContext.updateLoggingContext();
let streamingContext = StreamingContext.build(
this.client,
invokeContext.invokeId,
this.scheduleIteration.bind(this),
this.handlerMetadata?.highWaterMark
? { highWaterMark: this.handlerMetadata.highWaterMark }
: undefined,
);
const {
responseStream,
rapidResponse,
scheduleNext,
fail: ctxFail,
} = streamingContext.createStream();
delete streamingContext.createStream;
try {
this._setErrorCallbacks(invokeContext.invokeId);
this._setStreamingExitListener(invokeContext.invokeId, responseStream);
const ctx = invokeContext.attachEnvironmentData(streamingContext);
verbose('Runtime::handleOnceStreaming', 'invoking handler');
const event = JSON.parse(bodyJson);
const handlerResult = this.handler(event, responseStream, ctx);
verbose('Runtime::handleOnceStreaming', 'handler returned');
if (!_isPromise(handlerResult)) {
verbose('Runtime got non-promise response');
ctxFail('Streaming does not support non-async handlers.', scheduleNext);
return;
}
const result = await handlerResult;
if (typeof result !== 'undefined') {
console.warn('Streaming handlers ignore return values.');
}
verbose('Runtime::handleOnceStreaming result is awaited.');
// await for the rapid response if present.
if (rapidResponse) {
const res = await rapidResponse;
vverbose('RAPID response', res);
}
if (!responseStream.writableFinished) {
ctxFail('Response stream is not finished.', scheduleNext);
return;
}
// Handler returned and response has ended.
scheduleNext();
} catch (err) {
verbose('Runtime::handleOnceStreaming::finally stream destroyed');
ctxFail(err, scheduleNext);
}
}