in src/setupEventStream.ts [47:125]
async function handleMessage(inMsg: rpc.StreamingMessage): Promise<void> {
const outMsg: rpc.IStreamingMessage = {
requestId: inMsg.requestId,
};
let eventHandler: EventHandler | undefined;
let request: SupportedRequest | undefined;
try {
const eventName = inMsg.content;
switch (eventName) {
case 'functionEnvironmentReloadRequest':
eventHandler = new FunctionEnvironmentReloadHandler();
break;
case 'functionLoadRequest':
eventHandler = new FunctionLoadHandler();
break;
case 'invocationRequest':
eventHandler = new InvocationHandler();
break;
case 'workerInitRequest':
eventHandler = new WorkerInitHandler();
break;
case 'workerTerminate':
// Worker terminate request is a special request which gracefully shuts down worker
// It doesn't have a response so we don't have an EventHandler class for it
await terminateWorker(nonNullProp(inMsg, eventName));
return;
case 'workerStatusRequest':
// Worker sends the host empty response to evaluate the worker's latency
// The response doesn't even allow a `result` property, which is why we don't implement an EventHandler class
outMsg.workerStatusResponse = {};
worker.eventStream.write(outMsg);
return;
case 'functionsMetadataRequest':
eventHandler = new FunctionsMetadataHandler();
break;
case 'closeSharedMemoryResourcesRequest':
case 'fileChangeEventRequest':
case 'functionLoadRequestCollection':
case 'invocationCancel':
case 'startStream':
case 'workerHeartbeat':
// Not yet implemented
return;
default:
throw new AzFuncSystemError(`Worker had no handler for message '${eventName}'`);
}
request = nonNullProp(inMsg, eventName);
const response = await eventHandler.handleEvent(request);
response.result = { status: rpc.StatusResult.Status.Success };
outMsg[eventHandler.responseName] = response;
} catch (err) {
const error = ensureErrorType(err);
if (error.isAzureFunctionsSystemError && !error.loggedOverRpc) {
worker.log({
message: error.message,
level: LogLevel.Error,
logCategory: LogCategory.System,
});
}
if (eventHandler && request) {
const response = eventHandler.getDefaultResponse(request);
response.result = {
status: rpc.StatusResult.Status.Failure,
exception: {
message: error.message,
stackTrace: error.stack,
},
};
outMsg[eventHandler.responseName] = response;
}
}
if (eventHandler) {
worker.eventStream.write(outMsg);
}
}