export function processLogPage()

in Clients/AmbrosiaJS/Ambrosia-Node/src/Messages.ts [760:1053]


export function processLogPage(receiveBuffer: Buffer, config: Configuration.AmbrosiaConfig, startingMessageNumber: number = 0): number
{
    // Parse the header
    // Note: committerID can be negative
    // let committerID: number = DataFormat.readInt32Fixed(receiveBuffer, 0); // receiveBuffer.readInt32LE(0);
    let logPageLength: number = DataFormat.readInt32Fixed(receiveBuffer, 4); // receiveBuffer.readInt32LE(4);
    // let checkSum: bigint = DataFormat.readInt64Fixed(receiveBuffer, 8); // receiveBuffer.readBigInt64LE(8);
    let pageSequenceID: bigint = DataFormat.readInt64Fixed(receiveBuffer, 16); // receiveBuffer.readBigInt64LE(16);
    let pos: number = 24;
    let outgoingCheckpoint: Streams.OutgoingCheckpoint;
    let checkpointMessage: Uint8Array;
    let messageNumberInPage: number = 0;

    // Parse/process the message(s)
    while (pos < logPageLength)
    {
        messageNumberInPage++;

        // Check if this and/or previous log pages have resulted in too much outgoing data (in which case we need to interrupt it to allow I/O with the IC to occur).
        // This can happen if the [user provided] incoming RPC message handlers are generating large and/or a high number of outgoing messages (eg. in a tight loop).
        // The user can still easily write code that overwhelms the output stream's buffer (which will result in OutgoingMessageStream.checkInternalBuffer() failing),
        // but this at least helps mitigate the problem - especially during recovery, when messages previously sent "spaced out" are arriving extremely rapidly in a 
        // consecutive batch of log pages. See bug #194 for more details.
        if (IC.isOutgoingMessageStreamGettingFull(0.75)) // Allow "space" for the current message to add outgoing messages [although whatever space we leave can still end up being insufficient]
        {
            if (startingMessageNumber === 0)
            {
                // Initial interruption
                const pageBuffer: Buffer = Buffer.alloc(logPageLength);
                receiveBuffer.copy(pageBuffer, 0, 0, logPageLength);
                _interruptedLogPages.push({ pageBuffer: pageBuffer, startingMessageNumber: messageNumberInPage });
                Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Outgoing message stream is getting full (${IC.outgoingMessageStreamBacklog()} bytes); interrupting log page ${pageSequenceID} (${logPageLength} bytes) at message ${messageNumberInPage}`);
            }
            else
            {
                // Re-interruption [ie. we are being called via processInterruptedLogPages()].
                // This is a no-op unless we've made additional progress through the page.
                if (messageNumberInPage > startingMessageNumber)
                {
                    // Rather than needlessly re-allocate/copy and re-enqueue the same page, we just need to communicate [to processInterruptedLogPages()] that the startingMessageNumber has advanced
                    _interruptedLogPages.push({ pageBuffer: _emptyPageBuffer, startingMessageNumber: messageNumberInPage });
                    Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Outgoing message stream is getting full (${IC.outgoingMessageStreamBacklog()} bytes); re-interrupting log page ${pageSequenceID} at message ${messageNumberInPage}`);
                }
            }

            // Give the outgoing message stream a chance to be flushed to the IC (ie. allow I/O with the IC to interleave)
            setImmediate(() => processInterruptedLogPages(config));
            return (-2); // The page was interrupted
        }

        // Parse the message "header"
        const msg: MessageMetaData = new MessageMetaData(receiveBuffer, pos);
        pos = msg.startOfDataIndex;

        // If a startingMessageNumber was specified, then skip messages until we reach it
        if (startingMessageNumber > 0)
        {
            if (messageNumberInPage < startingMessageNumber)
            {
                pos = msg.endOfMessageIndex;
                continue;
            }
            if (messageNumberInPage === startingMessageNumber)
            {
                Utils.traceLog(Utils.TraceFlag.LogPageInterruption, `Resuming processing of interrupted log page ${pageSequenceID} at message ${startingMessageNumber}`);
            }
        }

        if (Utils.canLog(Utils.LoggingLevel.Verbose)) // We add this check because this is a high-volume code path, and Utils.log() is expensive
        {
            let showBytes: boolean = config.lbOptions.debugOutputLogging; // For debugging
            Utils.log(`Received '${MessageType[msg.messageType]}' (${msg.totalLength} bytes)` + (showBytes ? `: ${Utils.makeDisplayBytes(receiveBuffer, msg.startOfMessageIndex, msg.totalLength)}` : ""));
        }

        // A [user-supplied] handler for a received message (typically an RPC) may have called IC.stop(), in which case we can't continue to process messages in the log page
        if (Configuration.loadedConfig().isIntegratedIC && !IC.isRunning())
        {
            break;
        }

        // Parse the message "data" section
        switch (msg.messageType)
        {
            case MessageType.BecomingPrimary: // No data
                IC.isPrimary(true);
                IC.emitAppEvent(AppEventType.BecomingPrimary);
                IC.checkSelfConnection();

                // An active/active secondary (including the checkpointing secondary) remains in constant recovery until
                // it becomes the primary (although only a non-checkpointing secondary can ever become the primary)
                if (_isRecoveryRunning)
                {
                    _isRecoveryRunning = false;
                    IC.onRecoveryComplete();
                    IC.emitAppEvent(AppEventType.RecoveryComplete);
                }
                else
                {
                    throw new Error(`This [non-checkpointing] active/active secondary (instance '${IC.instanceName()}' replica #${config.replicaNumber}) was not in recovery as expected`);
                }
                break;

            case MessageType.UpgradeTakeCheckpoint: // No data
            case MessageType.UpgradeService: // No data
                const mode: AppUpgradeMode = (msg.messageType === MessageType.UpgradeTakeCheckpoint) ? AppUpgradeMode.Live : AppUpgradeMode.Test;

                if (!_isRecoveryRunning)
                {
                    // A Checkpoint must have preceded this message
                    throw new Error(`"Unexpected message '${MessageType[msg.messageType]}'; a '${MessageType[MessageType.Checkpoint]}' message should have preceded this message`);
                }
                else
                {
                    if (mode === AppUpgradeMode.Live)
                    {
                        _isRecoveryRunning = false;
                        IC.onRecoveryComplete();
                        IC.emitAppEvent(AppEventType.RecoveryComplete);
                    }
                }

                // Tell the app to upgrade. Note: The handlers for these events MUST execute synchronously (ie. they must not return until the upgrade is complete).
                Utils.log(`Upgrading app (state and code) [in '${AppUpgradeMode[mode]}' mode]...`, null, Utils.LoggingLevel.Minimal);
                IC.clearUpgradeFlags();
                IC.emitAppEvent(AppEventType.UpgradeState, mode);
                IC.emitAppEvent(AppEventType.UpgradeCode, mode);
                if (!IC.checkUpgradeFlags())
                {
                    throw new Error(`Upgrade incomplete: Either IC.upgrade() and/or the upgrade() method of your AmbrosiaAppState instance was not called`);
                }
                else
                {
                    Utils.log("Upgrade of state and code complete", null, Utils.LoggingLevel.Minimal);
                }

                if (mode === AppUpgradeMode.Test)
                {
                    // When doing a 'test mode' upgrade there will be no 'completion' message from the IC signalling when 
                    // message playback is complete, so we will not end up emitting a 'RecoveryComplete' event to the app
                    // (and neither will the 'UpgradeComplete' event be emitted)
                    Utils.log("Running recovery after app upgrade...", null, Utils.LoggingLevel.Minimal); 
                }

                if (mode === AppUpgradeMode.Live)
                {
                    // Take a checkpoint (of the [now upgraded] app state)
                    outgoingCheckpoint = config.checkpointProducer();
                    checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
                    IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
                    IC.sendCheckpoint(outgoingCheckpoint, () =>
                    // Note: This lambda runs ONLY if sendCheckpoint() succeeds
                    {
                        IC.isPrimary(true);
                        IC.emitAppEvent(AppEventType.BecomingPrimary);
                        IC.checkSelfConnection();

                        // The "live" upgrade is not actually complete at this point: it is complete after the next 'TakeCheckpoint' message is
                        // received (which will usually be the next message received, but there can also be other messages that come before it).
                        // Note: Another 'tell' that the upgrade has completed is that the <instanceTable>.CurrentVersion will change to the upgradeVersion.
                        _completeLiveUpgradeAtNextTakeCheckpoint = true;
                    });
                }
                break;

            case MessageType.TakeBecomingPrimaryCheckpoint: // No data
                outgoingCheckpoint = config.checkpointProducer();
                checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
                
                if (!_isRecoveryRunning)
                {
                    const initialMessage: Uint8Array = makeInitialMessage(StringEncoding.toUTF8Bytes(Root.languageBindingVersion())); // Note: We could send any arbitrary bytes
                    IC.sendMessage(initialMessage, MessageType.InitialMessage, IC.instanceName());
                }
                else
                {
                    _isRecoveryRunning = false;
                    IC.onRecoveryComplete();
                    IC.emitAppEvent(AppEventType.RecoveryComplete);
                }

                IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
                IC.sendCheckpoint(outgoingCheckpoint, () =>
                // Note: This lambda runs ONLY if sendCheckpoint() succeeds
                {
                    IC.isPrimary(true);
                    IC.emitAppEvent(AppEventType.BecomingPrimary);
                    IC.checkSelfConnection();
                });
                break;

            case MessageType.InitialMessage:
                // The data of an InitialMessage is whatever we set it to be in our 'TakeBecomingPrimaryCheckpoint' response. We simply ignore it.
                IC.emitAppEvent(AppEventType.FirstStart);
                updateReplayStats(msg.messageType);
                break;

            case MessageType.RPC:
                let rpc: IncomingRPC = new IncomingRPC(receiveBuffer, msg.startOfDataIndex, msg.endOfMessageIndex);
                config.dispatcher(rpc);
                updateReplayStats(msg.messageType, rpc);
                break;
            
            case MessageType.TakeCheckpoint: // No data
                // TODO: JonGold needs to address issue #158 ("Clarify Ambrosia protocol") since the C# LB has (obsolete?) code that:
                //       a) Handles 'TakeCheckpoint' at startup (a case which is not in the protocol spec), and
                //       b) Sends an 'InitialMessage' in response to a TakeCheckpoint (again, this is not in the protocol spec)
                //       The protocol spec can be found here: https://github.com/microsoft/AMBROSIA/blob/master/CONTRIBUTING/AMBROSIA_client_network_protocol.md#communication-protocols
                outgoingCheckpoint = config.checkpointProducer();
                checkpointMessage = makeCheckpointMessage(outgoingCheckpoint.length);
                IC.sendMessage(checkpointMessage, MessageType.Checkpoint, IC.instanceName());
                IC.sendCheckpoint(outgoingCheckpoint, () =>
                // Note: This lambda runs ONLY if sendCheckpoint() succeeds
                {
                    if (_completeLiveUpgradeAtNextTakeCheckpoint)
                    {
                        try
                        {
                            _completeLiveUpgradeAtNextTakeCheckpoint = false;
                            // The "live" upgrade is now complete, so update the ambrosiaConfig.json file so that we're prepared for the next restart.
                            // However, we only do this if the upgrade was requested via the ambrosiaConfig.json. If the upgrade was requested via an
                            // explicit call to "Ambrosia.exe RegisterInstance" then we assume the user is handling the upgrade process manually (or
                            // via an 'upgrade orchestration service' they built).
                            if (Configuration.loadedConfig().isLiveUpgradeRequested)
                            {
                                Configuration.loadedConfig().updateSetting("appVersion", Configuration.loadedConfig().upgradeVersion);
                                Configuration.loadedConfig().updateSetting("activeCode", Configuration.ActiveCodeType[Configuration.ActiveCodeType.VNext]);
                                // The IC doesn't update the registered currentVersion after the upgrade (it only updates <instanceTable>.CurrentVersion),
                                // so we have to re-register at the next restart to update it [without this, AmbrosiaConfigFile.initializeAsync() will throw]
                                Configuration.loadedConfig().updateSetting("autoRegister", true); 
                            }
                            else
                            {
                                // Note: Because the upgrade wasn't requested via ambrosiaConfig.json, we can't report the upgradeVersion that currentVersion must be set to
                                Utils.log(`Warning: You must re-register the '${IC.instanceName()}' instance (to update --currentVersion) to finish the upgrade`);
                            }
                            Utils.log("Upgrade complete", null, Utils.LoggingLevel.Minimal);
                            // The app may want to do its own post-upgrade work (eg. custom handling of re-registration / code-switching, or signalling 
                            // to any orchestrating infrastructure that may be managing an active/active upgrade).
                            // Note: Because this event is raised only as a consequence of a non-replayable message (UpgradeTakeCheckpoint), whatever 
                            //       actions (if any) the app takes when responding to it are not part of a deterministic chain.
                            IC.emitAppEvent(AppEventType.UpgradeComplete);
                        }
                        catch (error: unknown)
                        {
                            Utils.log(Utils.makeError(error));
                        }
                    }
                });
                break;

            case MessageType.Checkpoint:
                let checkpointLengthVarInt: DataFormat.varIntResult = DataFormat.readVarInt64(receiveBuffer, pos);
                let checkpointLength: number = checkpointLengthVarInt.value;
                Utils.log(`Starting recovery [load checkpoint (${checkpointLength} bytes) and replay messages]...`);
                _isRecoveryRunning = true;

                // We keep track of the number of received/resent messages during recovery
                IC._counters.remoteSentMessageCount = IC._counters.receivedMessageCount = IC._counters.receivedForkMessageCount = IC._counters.sentForkMessageCount = 0; 

                // By returning the checkpoint length we tell the caller to read the next checkpointLength bytes as checkpoint data.
                // Note: It's safe to return here as a 'Checkpoint' will always be the only message in the log page.
                //       The caller will skip to the first byte AFTER the current log page [whose length will include
                //       checkpointLengthVarInt.numBytesRead] so there's no need to also pass back checkpointLengthVarInt.numBytesRead.
                return (checkpointLength); // Note: Can be 0
            
            case MessageType.RPCBatch:
            case MessageType.CountReplayableRPCBatchByte:
                let rpcCountVarInt: DataFormat.varIntResult = DataFormat.readVarInt32(receiveBuffer, pos);
                let rpcCount: number = rpcCountVarInt.value;
                pos += rpcCountVarInt.length;
                if (msg.messageType === MessageType.CountReplayableRPCBatchByte)
                {
                    let forkRpcCountVarInt: DataFormat.varIntResult = DataFormat.readVarInt32(receiveBuffer, pos);
                    let forkRpcCount: number = forkRpcCountVarInt.value;
                    pos += forkRpcCountVarInt.length;
                    Utils.log(`Processing RPC batch (of ${rpcCount} messages, ${forkRpcCount} of which are replayable)...`);
                }
                else
                {
                    Utils.log(`Processing RPC batch (of ${rpcCount} messages)...`);
                }
                // Note: We'll let the processing of each contained message update _replayedMessageCount and _receivedForkMessageCount, so we don't
                //       call updateReplayStats() here (effectively, we ignore the batch "wrapper" for the purpose of tracking replay stats)
                continue;

            default:
                throw new Error(`Log page ${pageSequenceID} contains a message of unknown type (${msg.messageType}); message: ${Utils.makeDisplayBytes(receiveBuffer, msg.startOfMessageIndex, msg.totalLength)}`);
        }

        pos = msg.endOfMessageIndex;
    }

    return (-1); // The page was fully processed
}