src/setupEventStream.ts (102 lines of code) (raw):

// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc'; import { AzFuncSystemError, ensureErrorType } from './errors'; import { EventHandler, SupportedRequest } from './eventHandlers/EventHandler'; import { FunctionEnvironmentReloadHandler } from './eventHandlers/FunctionEnvironmentReloadHandler'; import { FunctionLoadHandler } from './eventHandlers/FunctionLoadHandler'; import { FunctionsMetadataHandler } from './eventHandlers/FunctionsMetadataHandler'; import { InvocationHandler } from './eventHandlers/InvocationHandler'; import { terminateWorker } from './eventHandlers/terminateWorker'; import { WorkerInitHandler } from './eventHandlers/WorkerInitHandler'; import { systemError } from './utils/Logger'; import { nonNullProp } from './utils/nonNull'; import { worker } from './WorkerContext'; import LogCategory = rpc.RpcLog.RpcLogCategory; import LogLevel = rpc.RpcLog.Level; /** * Configures handlers for incoming gRPC messages on the client * * This should have a way to handle all incoming gRPC messages. * This includes all incoming StreamingMessage types (exclude *Response types and RpcLog type) */ export function setupEventStream(): void { worker.eventStream.on('data', (msg) => { void handleMessage(msg); }); worker.eventStream.on('error', function (err) { systemError(`Worker encountered event stream error: `, err); throw new AzFuncSystemError(err); }); // wrap event stream write to validate message correctness const oldWrite = worker.eventStream.write; worker.eventStream.write = function checkWrite(msg) { const msgError = rpc.StreamingMessage.verify(msg); if (msgError) { systemError(`Worker malformed message`, msgError); throw new AzFuncSystemError(msgError); } oldWrite.apply(worker.eventStream, [msg]); }; } async function handleMessage(inMsg: rpc.StreamingMessage): Promise<void> { const outMsg: rpc.IStreamingMessage = { requestId: inMsg.requestId, }; let eventHandler: EventHandler | undefined; let request: SupportedRequest | undefined; try { const eventName = inMsg.content; switch (eventName) { case 'functionEnvironmentReloadRequest': eventHandler = new FunctionEnvironmentReloadHandler(); break; case 'functionLoadRequest': eventHandler = new FunctionLoadHandler(); break; case 'invocationRequest': eventHandler = new InvocationHandler(); break; case 'workerInitRequest': eventHandler = new WorkerInitHandler(); break; case 'workerTerminate': // Worker terminate request is a special request which gracefully shuts down worker // It doesn't have a response so we don't have an EventHandler class for it await terminateWorker(nonNullProp(inMsg, eventName)); return; case 'workerStatusRequest': // Worker sends the host empty response to evaluate the worker's latency // The response doesn't even allow a `result` property, which is why we don't implement an EventHandler class outMsg.workerStatusResponse = {}; worker.eventStream.write(outMsg); return; case 'functionsMetadataRequest': eventHandler = new FunctionsMetadataHandler(); break; case 'closeSharedMemoryResourcesRequest': case 'fileChangeEventRequest': case 'functionLoadRequestCollection': case 'invocationCancel': case 'startStream': case 'workerHeartbeat': // Not yet implemented return; default: throw new AzFuncSystemError(`Worker had no handler for message '${eventName}'`); } request = nonNullProp(inMsg, eventName); const response = await eventHandler.handleEvent(request); response.result = { status: rpc.StatusResult.Status.Success }; outMsg[eventHandler.responseName] = response; } catch (err) { const error = ensureErrorType(err); if (error.isAzureFunctionsSystemError && !error.loggedOverRpc) { worker.log({ message: error.message, level: LogLevel.Error, logCategory: LogCategory.System, }); } if (eventHandler && request) { const response = eventHandler.getDefaultResponse(request); response.result = { status: rpc.StatusResult.Status.Failure, exception: { message: error.message, stackTrace: error.stack, }, }; outMsg[eventHandler.responseName] = response; } } if (eventHandler) { worker.eventStream.write(outMsg); } }