in Clients/AmbrosiaJS/Ambrosia-Node/src/ICTest.ts [133:379]
function dispatcher(message: Messages.DispatchedMessage): void
{
const loggingPrefix: string = "Dispatcher";
// Special case: This is a very high-frequency message [used during perf-testing], so we handle it immediately
if (message.type === Messages.DispatchedMessageType.RPC)
{
let rpc: Messages.IncomingRPC = message as Messages.IncomingRPC;
if (rpc.methodID === 200)
{
// Fork perf test
let buffer: Buffer = Buffer.from(rpc.getRawParams().buffer); // Use the ArrayBuffer to create a view [so no data is copied]
let iteration: number = buffer.readInt32LE(8); // We always need to read this [it changes with every message]
if (_maxPerfIteration === -1)
{
_maxPerfIteration = buffer.readInt32LE(12); // This is only sent with the first message
}
else
{
if (iteration === _maxPerfIteration)
{
let startTime: number = Number(buffer.readBigInt64LE(0)); // This is only sent with the last message
let elapsedMs: number = Date.now() - startTime;
let requestsPerSecond: number = (_maxPerfIteration / elapsedMs) * 1000;
Utils.log(`startTime: ${Utils.getTime(startTime)}, iteration: ${iteration}, elapsedMs: ${elapsedMs}, RPS = ${requestsPerSecond.toFixed(2)}`, null, Utils.LoggingLevel.Minimal);
}
else
{
// if (iteration % 5000 === 0)
// {
// Utils.log(`Received message #${iteration}`, null, Utils.LoggingLevel.Minimal);
// }
}
}
return;
}
}
try
{
switch (message.type)
{
case Messages.DispatchedMessageType.RPC:
let rpc: Messages.IncomingRPC = message as Messages.IncomingRPC;
if (Utils.canLog(Utils.LoggingLevel.Verbose)) // We add this check because this is a high-volume code path, and rpc.makeDisplayParams() is expensive
{
Utils.log(`Received ${Messages.RPCType[rpc.rpcType]} RPC call for ${rpc.methodID === IC.POST_METHOD_ID ? `post method '${IC.getPostMethodName(rpc)}' (version ${IC.getPostMethodVersion(rpc)})` : `method ID ${rpc.methodID}`} with params ${rpc.makeDisplayParams()}`, loggingPrefix);
}
switch (rpc.methodID)
{
// Note: To get to this point, the post method has been verified as published
case IC.POST_METHOD_ID:
try
{
let methodName: string = IC.getPostMethodName(rpc);
let methodVersion: number = IC.getPostMethodVersion(rpc); // Use this to do version-specific method behavior
switch (methodName)
{
case "ComputePI":
let digits: { count: number } = IC.getPostMethodArg(rpc, "digits?") ?? { count: 10 };
let pi: number = Number.parseFloat(Math.PI.toFixed(digits.count));
IC.postResult<number>(rpc, pi);
break;
case "joinNames":
let namesSet: Set<string> = IC.getPostMethodArg(rpc, "namesSet");
let namesArray: string[] = IC.getPostMethodArg(rpc, "namesArray");
IC.postResult<string>(rpc, [...namesSet, ...namesArray].map(v => v || "null").join(","));
break;
case "postTimeoutTest":
let resultDelayInMs: number = IC.getPostMethodArg(rpc, "resultDelayInMs?") ?? -1;
if (resultDelayInMs > 0)
{
// Simulate a delay at the destination instance [although this an imperfect simulation since it delays the send, not the receive]
setTimeout(() => IC.postResult<void>(rpc), resultDelayInMs);
}
else
{
// To [perfectly] simulate an infinite "delay" at the destination we simply don't call IC.postResult()
}
break;
default:
let errorMsg: string = `Post method '${methodName}' is not implemented`;
Utils.log(`(${errorMsg})`, loggingPrefix)
IC.postError(rpc, new Error(errorMsg));
break;
}
}
catch (error: unknown)
{
const err: Error = Utils.makeError(error);
Utils.log(err);
IC.postError(rpc, err);
}
break;
case 3:
let name: string = rpc.getJsonParam("name");
greetingsMethod(name);
break;
case 33:
let opName: string = rpc.getJsonParam("opName");
switch (opName)
{
case "attachToBadInstance":
// This is to help investigate bug #187
IC.callFork("serverTen", 3, { name: "Foo!" });
break;
case "sendLargeMessage":
const sizeInKB: number = parseInt(rpc.getJsonParam("sizeInKB"));
IC.echo_Post("x".repeat(1024 * sizeInKB), "sendLargeMessage");
break;
case "requestCheckpoint":
IC.requestCheckpoint();
break;
case "reportVersion":
IC.callFork(config.icInstanceName, 204, StringEncoding.toUTF8Bytes(Root.languageBindingVersion()));
break;
case "implicitBatch":
IC.callFork(config.icInstanceName, 3, { name: "BatchedMsg1" });
IC.callFork(config.icInstanceName, 3, { name: "BatchedMsg2" });
break;
case "explicitBatch":
IC.queueFork(config.icInstanceName, 3, { name: "John" });
IC.queueFork(config.icInstanceName, 3, { name: "Paul" });
IC.queueFork(config.icInstanceName, 3, { name: "George" });
IC.queueFork(config.icInstanceName, 3, { name: "Ringo" });
IC.flushQueue();
break;
case "runForkPerfTest":
// To get the best performance:
// 1) Set the 'outputLoggingLevel' to 'Minimal', and 'outputLogDestination' to 'File'.
// 2) The dispatcher() function should NOT be async.
// 3) Messages should be batched using RPCBatch.
// 4) The IC binary must be a release build (not debug).
// 5) Run the test OUTSIDE of the debugger.
if (Configuration.loadedConfig().lbOptions.outputLoggingLevel !== Utils.LoggingLevel.Minimal)
{
Utils.log("Incorrect configuration for test: 'outputLoggingLevel' must be 'Minimal'");
break;
}
const maxIteration: number = 1000000;
const batchSize: number = 10000;
Utils.log(`Starting Fork performance test [${maxIteration.toLocaleString()} messages in batches of ${batchSize.toLocaleString()}]...`, null, Utils.LoggingLevel.Minimal);
let startTime: number = Date.now();
let buffer: Buffer = Buffer.alloc(16);
let methodID: number = 200;
_maxPerfIteration = -1;
sendBatch(0, batchSize, maxIteration);
function sendBatch(startID: number, batchSize: number, messagesRemaining: number): void
{
batchSize = Math.min(batchSize, messagesRemaining);
for (let i = startID; i < startID + batchSize; i++)
{
if (i === maxIteration - 1)
{
buffer.writeBigInt64LE(BigInt(startTime), 0);
}
buffer.writeInt32LE(i + 1, 8);
if (i === 0)
{
buffer.writeInt32LE(maxIteration, 12);
}
IC.queueFork(config.icInstanceName, methodID, buffer);
}
IC.flushQueue();
// Utils.log(`Sent batch of ${batchSize} messages`, null, Utils.LoggingLevel.Minimal);
messagesRemaining -= batchSize;
if (messagesRemaining > 0)
{
setImmediate(sendBatch, startID + batchSize, batchSize, messagesRemaining);
}
}
break;
default:
Utils.log(`Error: Unknown Impulse operation '${opName}'`);
break;
}
break;
case 204:
let rawParams: Uint8Array = rpc.getRawParams();
let lbVersion = StringEncoding.fromUTF8Bytes(rawParams);
// let lbVersion: string = rpc.jsonParams["languageBindingVersion"];
reportVersion(lbVersion);
break;
default:
Utils.log(`(No method is associated with methodID ${rpc.methodID})`, loggingPrefix)
break;
}
break;
case Messages.DispatchedMessageType.AppEvent:
let appEvent: Messages.AppEvent = message as Messages.AppEvent;
switch (appEvent.eventType)
{
case Messages.AppEventType.ICConnected:
// Note: Types and methods are published in this handler so that they're available regardless of the 'icHostingMode'
publishEntities();
break;
case Messages.AppEventType.ICStarting:
break;
case Messages.AppEventType.ICStopped:
const exitCode: number = appEvent.args[0];
stopICTest();
break;
case Messages.AppEventType.BecomingPrimary:
Utils.log("Normal app processing can begin", loggingPrefix);
_canAcceptKeyStrokes = true;
break;
case Messages.AppEventType.UpgradeState:
{
const upgradeMode: Messages.AppUpgradeMode = appEvent.args[0];
_appState = _appState.upgrade<AppStateVNext>(AppStateVNext);
break;
}
case Messages.AppEventType.UpgradeCode:
{
const upgradeMode: Messages.AppUpgradeMode = appEvent.args[0];
IC.upgrade(messageDispatcher, checkpointProducer, checkpointConsumer, postResultDispatcher); // A no-op code upgrade
break;
}
case Messages.AppEventType.FirstStart:
IC.echo_Post(Date.now(), "now");
break;
}
break;
}
}
catch (error: unknown)
{
let messageName: string = (message.type === Messages.DispatchedMessageType.AppEvent) ? `AppEvent:${Messages.AppEventType[(message as Messages.AppEvent).eventType]}` : Messages.DispatchedMessageType[message.type];
Utils.log(`Error: Failed to process ${messageName} message`);
Utils.log(Utils.makeError(error));
}
}