in src/StreamingContext.js [28:83]
module.exports.build = function (client, id, scheduleNext, options) {
let waitForEmptyEventLoop = true;
const scheduleNextNow = () => {
verbose('StreamingContext::scheduleNextNow entered');
if (!waitForEmptyEventLoop) {
scheduleNext();
} else {
BeforeExitListener.set(() => {
setImmediate(() => {
scheduleNext();
});
});
}
};
let isStreamCreated = false;
const streamingContext = {
get callbackWaitsForEmptyEventLoop() {
return waitForEmptyEventLoop;
},
set callbackWaitsForEmptyEventLoop(value) {
waitForEmptyEventLoop = value;
},
createStream: (callback) => {
if (isStreamCreated) {
throw new InvalidStreamingOperation(
'Cannot create stream for the same StreamingContext more than once.',
);
}
const { request: responseStream, responseDone: rapidResponse } =
client.getStreamForInvocationResponse(id, callback, options);
isStreamCreated = true;
vverbose('StreamingContext::createStream stream created');
return {
fail: (err, callback) => {
structuredConsole.logError('Invoke Error', err);
tryCallFail(responseStream, err, callback);
},
responseStream,
rapidResponse,
scheduleNext: () => {
verbose('StreamingContext::createStream scheduleNext');
BeforeExitListener.reset();
scheduleNextNow();
},
};
},
};
return streamingContext;
};