function dispatcher()

in Clients/AmbrosiaJS/Ambrosia-Node/src/ICTest.ts [133:379]


    function dispatcher(message: Messages.DispatchedMessage): void
    {
        const loggingPrefix: string = "Dispatcher";

        // Special case: This is a very high-frequency message [used during perf-testing], so we handle it immediately
        if (message.type === Messages.DispatchedMessageType.RPC)
        {
            let rpc: Messages.IncomingRPC = message as Messages.IncomingRPC;
            if (rpc.methodID === 200)
            {
                // Fork perf test
                let buffer: Buffer = Buffer.from(rpc.getRawParams().buffer); // Use the ArrayBuffer to create a view [so no data is copied]
                let iteration: number = buffer.readInt32LE(8); // We always need to read this [it changes with every message]
                if (_maxPerfIteration === -1)
                {
                    _maxPerfIteration = buffer.readInt32LE(12); // This is only sent with the first message
                }
                else
                {
                    if (iteration === _maxPerfIteration)
                    {
                        let startTime: number = Number(buffer.readBigInt64LE(0)); // This is only sent with the last message
                        let elapsedMs: number = Date.now() - startTime;
                        let requestsPerSecond: number = (_maxPerfIteration / elapsedMs) * 1000;
                        Utils.log(`startTime: ${Utils.getTime(startTime)}, iteration: ${iteration}, elapsedMs: ${elapsedMs}, RPS = ${requestsPerSecond.toFixed(2)}`, null, Utils.LoggingLevel.Minimal);
                    }
                    else
                    {
                        // if (iteration % 5000 === 0)
                        // {
                        //     Utils.log(`Received message #${iteration}`, null, Utils.LoggingLevel.Minimal);
                        // }
                    }
                }
                return;
            }
        }

        try
        {
            switch (message.type)
            {
                case Messages.DispatchedMessageType.RPC:
                    let rpc: Messages.IncomingRPC = message as Messages.IncomingRPC;

                    if (Utils.canLog(Utils.LoggingLevel.Verbose)) // We add this check because this is a high-volume code path, and rpc.makeDisplayParams() is expensive
                    {
                        Utils.log(`Received ${Messages.RPCType[rpc.rpcType]} RPC call for ${rpc.methodID === IC.POST_METHOD_ID ? `post method '${IC.getPostMethodName(rpc)}' (version ${IC.getPostMethodVersion(rpc)})` : `method ID ${rpc.methodID}`} with params ${rpc.makeDisplayParams()}`, loggingPrefix);
                    }
                    
                    switch (rpc.methodID)
                    {
                        // Note: To get to this point, the post method has been verified as published
                        case IC.POST_METHOD_ID:
                            try
                            {
                                let methodName: string = IC.getPostMethodName(rpc);
                                let methodVersion: number = IC.getPostMethodVersion(rpc); // Use this to do version-specific method behavior
                        
                                switch (methodName)
                                {
                                    case "ComputePI":
                                        let digits: { count: number } = IC.getPostMethodArg(rpc, "digits?") ?? { count: 10 };
                                        let pi: number = Number.parseFloat(Math.PI.toFixed(digits.count));
                                        IC.postResult<number>(rpc, pi);
                                        break;
                                    case "joinNames":
                                        let namesSet: Set<string> = IC.getPostMethodArg(rpc, "namesSet");
                                        let namesArray: string[] = IC.getPostMethodArg(rpc, "namesArray");
                                        IC.postResult<string>(rpc, [...namesSet, ...namesArray].map(v => v || "null").join(","));
                                        break;
                                    case "postTimeoutTest":
                                        let resultDelayInMs: number = IC.getPostMethodArg(rpc, "resultDelayInMs?") ?? -1;
                                        if (resultDelayInMs > 0)
                                        {
                                            // Simulate a delay at the destination instance [although this an imperfect simulation since it delays the send, not the receive]
                                            setTimeout(() => IC.postResult<void>(rpc), resultDelayInMs);
                                        }
                                        else
                                        {
                                            // To [perfectly] simulate an infinite "delay" at the destination we simply don't call IC.postResult()
                                        }
                                        break;
                                    default:
                                        let errorMsg: string = `Post method '${methodName}' is not implemented`;
                                        Utils.log(`(${errorMsg})`, loggingPrefix)
                                        IC.postError(rpc, new Error(errorMsg));
                                        break;
                                }
                            }
                            catch (error: unknown)
                            {
                                const err: Error = Utils.makeError(error);
                                Utils.log(err);
                                IC.postError(rpc, err);
                            }
                            break;

                        case 3:
                            let name: string = rpc.getJsonParam("name");
                            greetingsMethod(name);
                            break;
    
                        case 33:
                            let opName: string = rpc.getJsonParam("opName");
                            switch (opName)
                            {
                                case "attachToBadInstance":
                                    // This is to help investigate bug #187
                                    IC.callFork("serverTen", 3, { name: "Foo!" });
                                    break;
                                case "sendLargeMessage":
                                    const sizeInKB: number = parseInt(rpc.getJsonParam("sizeInKB"));
                                    IC.echo_Post("x".repeat(1024 * sizeInKB), "sendLargeMessage");
                                    break;
                                case "requestCheckpoint":
                                    IC.requestCheckpoint();
                                    break;
                                case "reportVersion":
                                    IC.callFork(config.icInstanceName, 204, StringEncoding.toUTF8Bytes(Root.languageBindingVersion()));
                                    break;
                                case "implicitBatch":
                                    IC.callFork(config.icInstanceName, 3, { name: "BatchedMsg1" });
                                    IC.callFork(config.icInstanceName, 3, { name: "BatchedMsg2" });
                                    break;
                                case "explicitBatch":
                                    IC.queueFork(config.icInstanceName, 3, { name: "John" });
                                    IC.queueFork(config.icInstanceName, 3, { name: "Paul" });
                                    IC.queueFork(config.icInstanceName, 3, { name: "George" });
                                    IC.queueFork(config.icInstanceName, 3, { name: "Ringo" });
                                    IC.flushQueue();
                                    break;
                
                                case "runForkPerfTest":
                                    // To get the best performance:
                                    // 1) Set the 'outputLoggingLevel' to 'Minimal', and 'outputLogDestination' to 'File'.
                                    // 2) The dispatcher() function should NOT be async.
                                    // 3) Messages should be batched using RPCBatch.
                                    // 4) The IC binary must be a release build (not debug).
                                    // 5) Run the test OUTSIDE of the debugger.
                                    if (Configuration.loadedConfig().lbOptions.outputLoggingLevel !== Utils.LoggingLevel.Minimal)
                                    {
                                        Utils.log("Incorrect configuration for test: 'outputLoggingLevel' must be 'Minimal'");
                                        break;
                                    }
                                    const maxIteration: number = 1000000;
                                    const batchSize: number = 10000;
                                    Utils.log(`Starting Fork performance test [${maxIteration.toLocaleString()} messages in batches of ${batchSize.toLocaleString()}]...`, null, Utils.LoggingLevel.Minimal);
                                    let startTime: number = Date.now();
                                    let buffer: Buffer = Buffer.alloc(16);
                                    let methodID: number = 200;
                                    _maxPerfIteration = -1;

                                    sendBatch(0, batchSize, maxIteration);

                                    function sendBatch(startID: number, batchSize: number, messagesRemaining: number): void
                                    {
                                        batchSize = Math.min(batchSize, messagesRemaining);

                                        for (let i = startID; i < startID + batchSize; i++)
                                        {
                                            if (i === maxIteration - 1)
                                            {
                                                buffer.writeBigInt64LE(BigInt(startTime), 0);
                                            }
                                            buffer.writeInt32LE(i + 1, 8);
                                            if (i === 0)
                                            {
                                                buffer.writeInt32LE(maxIteration, 12);
                                            }
                                            IC.queueFork(config.icInstanceName, methodID, buffer);
                                        }

                                        IC.flushQueue();
                                        // Utils.log(`Sent batch of ${batchSize} messages`, null, Utils.LoggingLevel.Minimal);

                                        messagesRemaining -= batchSize;
                                        if (messagesRemaining > 0)
                                        {
                                            setImmediate(sendBatch, startID + batchSize, batchSize, messagesRemaining);
                                        }
                                    }
                                    break;
                                default:
                                    Utils.log(`Error: Unknown Impulse operation '${opName}'`);
                                    break;
                            }
                            break;
                        
                        case 204:
                            let rawParams: Uint8Array = rpc.getRawParams();
                            let lbVersion = StringEncoding.fromUTF8Bytes(rawParams);
                            // let lbVersion: string = rpc.jsonParams["languageBindingVersion"];
                            reportVersion(lbVersion);
                            break;

                        default:
                            Utils.log(`(No method is associated with methodID ${rpc.methodID})`, loggingPrefix)
                            break;
                    }
                    break;

                case Messages.DispatchedMessageType.AppEvent:
                    let appEvent: Messages.AppEvent = message as Messages.AppEvent;

                    switch (appEvent.eventType)
                    {
                        case Messages.AppEventType.ICConnected:
                            // Note: Types and methods are published in this handler so that they're available regardless of the 'icHostingMode'
                            publishEntities(); 
                            break;
                        case Messages.AppEventType.ICStarting:
                            break;
                        case Messages.AppEventType.ICStopped:
                            const exitCode: number = appEvent.args[0];
                            stopICTest();
                            break;
                        case Messages.AppEventType.BecomingPrimary:
                            Utils.log("Normal app processing can begin", loggingPrefix);
                            _canAcceptKeyStrokes = true;
                            break;
                        case Messages.AppEventType.UpgradeState:
                            {
                                const upgradeMode: Messages.AppUpgradeMode = appEvent.args[0];
                                _appState = _appState.upgrade<AppStateVNext>(AppStateVNext);
                                break;
                            }
                        case Messages.AppEventType.UpgradeCode:
                            {
                                const upgradeMode: Messages.AppUpgradeMode = appEvent.args[0];
                                IC.upgrade(messageDispatcher, checkpointProducer, checkpointConsumer, postResultDispatcher); // A no-op code upgrade
                                break;
                            }
                        case Messages.AppEventType.FirstStart:
                            IC.echo_Post(Date.now(), "now");
                            break;
                    }
                    break;
            }
        }
        catch (error: unknown)
        {
            let messageName: string = (message.type === Messages.DispatchedMessageType.AppEvent) ? `AppEvent:${Messages.AppEventType[(message as Messages.AppEvent).eventType]}` : Messages.DispatchedMessageType[message.type];
            Utils.log(`Error: Failed to process ${messageName} message`);
            Utils.log(Utils.makeError(error));
        }
    }