in Clients/AmbrosiaJS/Ambrosia-Node/src/Messages.ts [760:1053]
export function processLogPage(receiveBuffer: Buffer, config: Configuration.AmbrosiaConfig, startingMessageNumber: number = 0): number
{
// Parse the header
// Note: committerID can be negative
// let committerID: number = DataFormat.readInt32Fixed(receiveBuffer, 0); // receiveBuffer.readInt32LE(0);
let logPageLength: number = DataFormat.readInt32Fixed(receiveBuffer, 4); // receiveBuffer.readInt32LE(4);
// let checkSum: bigint = DataFormat.readInt64Fixed(receiveBuffer, 8); // receiveBuffer.readBigInt64LE(8);
let pageSequenceID: bigint = DataFormat.readInt64Fixed(receiveBuffer, 16); // receiveBuffer.readBigInt64LE(16);
let pos: number = 24;
let outgoingCheckpoint: Streams.OutgoingCheckpoint;
let checkpointMessage: Uint8Array;
let messageNumberInPage: number = 0;
// Parse/process the message(s)
while (pos < logPageLength)
{
messageNumberInPage++;
// Check if this and/or previous log pages have resulted in too much outgoing data (in which case we need to interrupt it to allow I/O with the IC to occur).
// This can happen if the [user provided] incoming RPC message handlers are generating large and/or a high number of outgoing messages (eg. in a tight loop).
// The user can still easily write code that overwhelms the output stream's buffer (which will result in OutgoingMessageStream.checkInternalBuffer() failing),
// but this at least helps mitigate the problem - especially during recovery, when messages previously sent "spaced out" are arriving extremely rapidly in a
// consecutive batch of log pages. See bug #194 for more details.
if (IC.isOutgoingMessageStreamGettingFull(0.75)) // Allow "space" for the current message to add outgoing messages [although whatever space we leave can still end up being insufficient]
{
if (startingMessageNumber === 0)
{
// Initial interruption
const pageBuffer: Buffer = Buffer.alloc(logPageLength);
receiveBuffer.copy(pageBuffer, 0, 0, logPageLength);
_interruptedLogPages.push({ pageBuffer: pageBuffer, startingMessageNumber: messageNumberInPage });
Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Outgoing message stream is getting full (${IC.outgoingMessageStreamBacklog()} bytes); interrupting log page ${pageSequenceID} (${logPageLength} bytes) at message ${messageNumberInPage}`);
}
else
{
// Re-interruption [ie. we are being called via processInterruptedLogPages()].
// This is a no-op unless we've made additional progress through the page.
if (messageNumberInPage > startingMessageNumber)
{
// Rather than needlessly re-allocate/copy and re-enqueue the same page, we just need to communicate [to processInterruptedLogPages()] that the startingMessageNumber has advanced
_interruptedLogPages.push({ pageBuffer: _emptyPageBuffer, startingMessageNumber: messageNumberInPage });
Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Outgoing message stream is getting full (${IC.outgoingMessageStreamBacklog()} bytes); re-interrupting log page ${pageSequenceID} at message ${messageNumberInPage}`);
}
}
// Give the outgoing message stream a chance to be flushed to the IC (ie. allow I/O with the IC to interleave)
setImmediate(() => processInterruptedLogPages(config));
return (-2); // The page was interrupted
}
// Parse the message "header"
const msg: MessageMetaData = new MessageMetaData(receiveBuffer, pos);
pos = msg.startOfDataIndex;
// If a startingMessageNumber was specified, then skip messages until we reach it
if (startingMessageNumber > 0)
{
if (messageNumberInPage < startingMessageNumber)
{
pos = msg.endOfMessageIndex;
continue;
}
if (messageNumberInPage === startingMessageNumber)
{
Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Resuming processing of interrupted log page ${pageSequenceID} at message ${startingMessageNumber}`);
}
}
if (Utils.canLog(Utils.LoggingLevel.Verbose)) // We add this check because this is a high-volume code path, and Utils.log() is expensive
{
let showBytes: boolean = config.lbOptions.debugOutputLogging; // For debugging
Utils.log(`Received '${MessageType[msg.messageType]}' (${msg.totalLength} bytes)` + (showBytes ? `: ${Utils.makeDisplayBytes(receiveBuffer, msg.startOfMessageIndex, msg.totalLength)}` : ""));
}
// A [user-supplied] handler for a received message (typically an RPC) may have called IC.stop(), in which case we can't continue to process messages in the log page
if (Configuration.loadedConfig().isIntegratedIC && !IC.isRunning())
{
break;
}
// Parse the message "data" section
switch (msg.messageType)
{
case MessageType.BecomingPrimary: // No data
IC.isPrimary(true);
IC.emitAppEvent(AppEventType.BecomingPrimary);
IC.checkSelfConnection();
// An active/active secondary (including the checkpointing secondary) remains in constant recovery until
// it becomes the primary (although only a non-checkpointing secondary can ever become the primary)
if (_isRecoveryRunning)
{
_isRecoveryRunning = false;
IC.onRecoveryComplete();
IC.emitAppEvent(AppEventType.RecoveryComplete);
}
else
{
throw new Error(`This [non-checkpointing] active/active secondary (instance '${IC.instanceName()}' replica #${config.replicaNumber}) was not in recovery as expected`);
}
break;
case MessageType.UpgradeTakeCheckpoint: // No data
case MessageType.UpgradeService: // No data
const mode: AppUpgradeMode = (msg.messageType === MessageType.UpgradeTakeCheckpoint) ? AppUpgradeMode.Live : AppUpgradeMode.Test;
if (!_isRecoveryRunning)
{
// A Checkpoint must have preceded this message
throw new Error(`"Unexpected message '${MessageType[msg.messageType]}'; a '${MessageType[MessageType.Checkpoint]}' message should have preceded this message`);
}
else
{
if (mode === AppUpgradeMode.Live)
{
_isRecoveryRunning = false;
IC.onRecoveryComplete();
IC.emitAppEvent(AppEventType.RecoveryComplete);
}
}
// Tell the app to upgrade. Note: The handlers for these events MUST execute synchronously (ie. they must not return until the upgrade is complete).
Utils.log(`Upgrading app (state and code) [in '${AppUpgradeMode[mode]}' mode]...`, null, Utils.LoggingLevel.Minimal);
IC.clearUpgradeFlags();
IC.emitAppEvent(AppEventType.UpgradeState, mode);
IC.emitAppEvent(AppEventType.UpgradeCode, mode);
if (!IC.checkUpgradeFlags())
{
throw new Error(`Upgrade incomplete: Either IC.upgrade() and/or the upgrade() method of your AmbrosiaAppState instance was not called`);
}
else
{
Utils.log("Upgrade of state and code complete", null, Utils.LoggingLevel.Minimal);
}
if (mode === AppUpgradeMode.Test)
{
// When doing a 'test mode' upgrade there will be no 'completion' message from the IC signalling when
// message playback is complete, so we will not end up emitting a 'RecoveryComplete' event to the app
// (and neither will the 'UpgradeComplete' event be emitted)
Utils.log("Running recovery after app upgrade...", null, Utils.LoggingLevel.Minimal);
}
if (mode === AppUpgradeMode.Live)
{
// Take a checkpoint (of the [now upgraded] app state)
outgoingCheckpoint = config.checkpointProducer();
checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
IC.sendCheckpoint(outgoingCheckpoint, () =>
// Note: This lambda runs ONLY if sendCheckpoint() succeeds
{
IC.isPrimary(true);
IC.emitAppEvent(AppEventType.BecomingPrimary);
IC.checkSelfConnection();
// The "live" upgrade is not actually complete at this point: it is complete after the next 'TakeCheckpoint' message is
// received (which will usually be the next message received, but there can also be other messages that come before it).
// Note: Another 'tell' that the upgrade has completed is that the <instanceTable>.CurrentVersion will change to the upgradeVersion.
_completeLiveUpgradeAtNextTakeCheckpoint = true;
});
}
break;
case MessageType.TakeBecomingPrimaryCheckpoint: // No data
outgoingCheckpoint = config.checkpointProducer();
checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
if (!_isRecoveryRunning)
{
const initialMessage: Uint8Array = makeInitialMessage(StringEncoding.toUTF8Bytes(Root.languageBindingVersion())); // Note: We could send any arbitrary bytes
IC.sendMessage(initialMessage, MessageType.InitialMessage, IC.instanceName());
}
else
{
_isRecoveryRunning = false;
IC.onRecoveryComplete();
IC.emitAppEvent(AppEventType.RecoveryComplete);
}
IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
IC.sendCheckpoint(outgoingCheckpoint, () =>
// Note: This lambda runs ONLY if sendCheckpoint() succeeds
{
IC.isPrimary(true);
IC.emitAppEvent(AppEventType.BecomingPrimary);
IC.checkSelfConnection();
});
break;
case MessageType.InitialMessage:
// The data of an InitialMessage is whatever we set it to be in our 'TakeBecomingPrimaryCheckpoint' response. We simply ignore it.
IC.emitAppEvent(AppEventType.FirstStart);
updateReplayStats(msg.messageType);
break;
case MessageType.RPC:
let rpc: IncomingRPC = new IncomingRPC(receiveBuffer, msg.startOfDataIndex, msg.endOfMessageIndex);
config.dispatcher(rpc);
updateReplayStats(msg.messageType, rpc);
break;
case MessageType.TakeCheckpoint: // No data
// TODO: JonGold needs to address issue #158 ("Clarify Ambrosia protocol") since the C# LB has (obsolete?) code that:
// a) Handles 'TakeCheckpoint' at startup (a case which is not in the protocol spec), and
// b) Sends an 'InitialMessage' in response to a TakeCheckpoint (again, this is not in the protocol spec)
// The protocol spec can be found here: https://github.com/microsoft/AMBROSIA/blob/master/CONTRIBUTING/AMBROSIA_client_network_protocol.md#communication-protocols
outgoingCheckpoint = config.checkpointProducer();
checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
IC.sendCheckpoint(outgoingCheckpoint, () =>
// Note: This lambda runs ONLY if sendCheckpoint() succeeds
{
if (_completeLiveUpgradeAtNextTakeCheckpoint)
{
try
{
_completeLiveUpgradeAtNextTakeCheckpoint = false;
// The "live" upgrade is now complete, so update the ambrosiaConfig.json file so that we're prepared for the next restart.
// However, we only do this if the upgrade was requested via the ambrosiaConfig.json. If the upgrade was requested via an
// explicit call to "Ambrosia.exe RegisterInstance" then we assume the user is handling the upgrade process manually (or
// via an 'upgrade orchestration service' they built).
if (Configuration.loadedConfig().isLiveUpgradeRequested)
{
Configuration.loadedConfig().updateSetting("appVersion", Configuration.loadedConfig().upgradeVersion);
Configuration.loadedConfig().updateSetting("activeCode", Configuration.ActiveCodeType[Configuration.ActiveCodeType.VNext]);
// The IC doesn't update the registered currentVersion after the upgrade (it only updates <instanceTable>.CurrentVersion),
// so we have to re-register at the next restart to update it [without this, AmbrosiaConfigFile.initializeAsync() will throw]
Configuration.loadedConfig().updateSetting("autoRegister", true);
}
else
{
// Note: Because the upgrade wasn't requested via ambrosiaConfig.json, we can't report the upgradeVersion that currentVersion must be set to
Utils.log(`Warning: You must re-register the '${IC.instanceName()}' instance (to update --currentVersion) to finish the upgrade`);
}
Utils.log("Upgrade complete", null, Utils.LoggingLevel.Minimal);
// The app may want to do its own post-upgrade work (eg. custom handling of re-registration / code-switching, or signalling
// to any orchestrating infrastructure that may be managing an active/active upgrade).
// Note: Because this event is raised only as a consequence of a non-replayable message (UpgradeTakeCheckpoint), whatever
// actions (if any) the app takes when responding to it are not part of a deterministic chain.
IC.emitAppEvent(AppEventType.UpgradeComplete);
}
catch (error: unknown)
{
Utils.log(Utils.makeError(error));
}
}
});
break;
case MessageType.Checkpoint:
let checkpointLengthVarInt: DataFormat.varIntResult = DataFormat.readVarInt64(receiveBuffer, pos);
let checkpointLength: number = checkpointLengthVarInt.value;
Utils.log(`Starting recovery [load checkpoint (${checkpointLength} bytes) and replay messages]...`);
_isRecoveryRunning = true;
// We keep track of the number of received/resent messages during recovery
IC._counters.remoteSentMessageCount = IC._counters.receivedMessageCount = IC._counters.receivedForkMessageCount = IC._counters.sentForkMessageCount = 0;
// By returning the checkpoint length we tell the caller to read the next checkpointLength bytes as checkpoint data.
// Note: It's safe to return here as a 'Checkpoint' will always be the only message in the log page.
// The caller will skip to the first byte AFTER the current log page [whose length will include
// checkpointLengthVarInt.numBytesRead] so there's no need to also pass back checkpointLengthVarInt.numBytesRead.
return (checkpointLength); // Note: Can be 0
case MessageType.RPCBatch:
case MessageType.CountReplayableRPCBatchByte:
let rpcCountVarInt: DataFormat.varIntResult = DataFormat.readVarInt32(receiveBuffer, pos);
let rpcCount: number = rpcCountVarInt.value;
pos += rpcCountVarInt.length;
if (msg.messageType === MessageType.CountReplayableRPCBatchByte)
{
let forkRpcCountVarInt: DataFormat.varIntResult = DataFormat.readVarInt32(receiveBuffer, pos);
let forkRpcCount: number = forkRpcCountVarInt.value;
pos += forkRpcCountVarInt.length;
Utils.log(`Processing RPC batch (of ${rpcCount} messages, ${forkRpcCount} of which are replayable)...`);
}
else
{
Utils.log(`Processing RPC batch (of ${rpcCount} messages)...`);
}
// Note: We'll let the processing of each contained message update _replayedMessageCount and _receivedForkMessageCount, so we don't
// call updateReplayStats() here (effectively, we ignore the batch "wrapper" for the purpose of tracking replay stats)
continue;
default:
throw new Error(`Log page ${pageSequenceID} contains a message of unknown type (${msg.messageType}); message: ${Utils.makeDisplayBytes(receiveBuffer, msg.startOfMessageIndex, msg.totalLength)}`);
}
pos = msg.endOfMessageIndex;
}
return (-1); // The page was fully processed
}