public static CommandsForKey fromBytes()

in accord-core/src/main/java/accord/local/cfk/Serialize.java [715:1043]


    public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in)
    {
        if (!in.hasRemaining())
            return null;

        in = in.duplicate();
        int commandCount = VIntCoding.readUnsignedVInt32(in) - 1;
        if (commandCount <= 0)
        {
            if (commandCount == -1)
                return new CommandsForKey(key);

            long maxUniqueHlc = VIntCoding.readUnsignedVInt(in);
            return new CommandsForKey(key).updateUniqueHlc(maxUniqueHlc);
        }

        TxnId[] txnIds = cachedTxnIds().get(commandCount);
        int[] commandFlags = cachedInts().getInts(commandCount);
        TxnInfo[] txns = new TxnInfo[commandCount];
        int nodeIdCount = VIntCoding.readUnsignedVInt32(in);
        int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
        long nodeIdMask = (1L << bitsPerNodeId) - 1;
        Object[] nodeIds = cachedAny().get(nodeIdCount);
        {
            int prev = VIntCoding.readUnsignedVInt32(in);
            nodeIds[0] = new Id(prev);
            for (int i = 1 ; i < nodeIdCount ; ++i)
                nodeIds[i] = new Id(prev += VIntCoding.readUnsignedVInt32(in));
        }

        int globalFlags = (int) readLeastSignificantBytes(3, in);
        int headerByteCount, hlcBytesLookup;
        {
            headerByteCount = 1 + ((globalFlags >>> 14) & 0x3);
            hlcBytesLookup = setHlcByteDeltas((globalFlags >>> 16) & 0x3, (globalFlags >>> 18) & 0x3, (globalFlags >>> 20) & 0x3, (globalFlags >>> 22) & 0x3);
        }

        long flagHistory = EMPTY_FLAG_HISTORY;
        QuickBounds bounds;
        long prevEpoch, prevHlc, maxUniqueHlc = 0;
        {
            long startEpoch, endEpoch;
            {
                startEpoch = VIntCoding.readUnsignedVInt(in);
                boolean hasEndEpoch = (startEpoch & 1) == 1;
                startEpoch /= 2;
                if (hasEndEpoch) endEpoch = startEpoch + VIntCoding.readUnsignedVInt(in);
                else endEpoch = Long.MAX_VALUE;
            }

            bounds = NO_BOUNDS_INFO.withEpochs(startEpoch, endEpoch);
            prevEpoch = startEpoch + VIntCoding.readUnsignedVInt(in);
            prevHlc = VIntCoding.readUnsignedVInt(in);
            {
                int flags = ((globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT) == 0) ? RX_FLAGS : VIntCoding.readUnsignedVInt32(in);
                Id node = (Id)nodeIds[VIntCoding.readUnsignedVInt32(in)];
                bounds = bounds.withGcBeforeBeforeAtLeast(TxnId.fromValues(prevEpoch, prevHlc, flags, node));
            }
            if (0 != (globalFlags & HAS_BOOTSTRAPPED_AT_HEADER_BIT))
            {
                long epoch = prevEpoch + VIntCoding.readUnsignedVInt(in);
                long hlc = prevHlc + VIntCoding.readVInt(in);
                int flags = ((globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT) == 0) ? RX_FLAGS : VIntCoding.readUnsignedVInt32(in);
                Id node = (Id)nodeIds[VIntCoding.readUnsignedVInt32(in)];
                bounds = bounds.withBootstrappedAtLeast(TxnId.fromValues(epoch, hlc, flags, node));
            }
            if (0 != (globalFlags & HAS_MAX_HLC_HEADER_BIT))
                maxUniqueHlc = bounds.gcBefore.hlc() + VIntCoding.readVInt(in);
        }
        int prunedBeforeIndex = VIntCoding.readUnsignedVInt32(in) - 1;

        for (int i = 0 ; i < commandCount ; ++i)
        {
            long header = readLeastSignificantBytes(headerByteCount, in);
            header |= 1L << (8 * headerByteCount); // marker so we know where to shift-left most-significant bytes to
            int commandDecodeFlags = (int)(header & 0xF);
            InternalStatus status = DECODE_STATUS[commandDecodeFlags];
            header >>>= 4;
            commandDecodeFlags <<= 6;

            {
                int overrideMask = (globalFlags >>> HAS_STATUS_OVERRIDES_HEADER_BIT_SHIFT) & 0x1;
                int statusOverrides = (int) (header & overrideMask);
                header >>>= overrideMask;
                commandDecodeFlags |= statusOverrides << 3;

                int statusFlags = status.flags ^ statusOverrides;
                int hasExecuteAt = (statusFlags >>> 1) & (globalFlags >>> HAS_EXECUTE_AT_HEADER_BIT_SHIFT);
                int hasMissingDeps = (statusFlags & 0x1) & (globalFlags >>> HAS_MISSING_DEPS_HEADER_BIT_SHIFT);
                commandDecodeFlags |= ((int)header & hasExecuteAt) << 1;
                header >>>= hasExecuteAt;
                commandDecodeFlags |= ((int)header & hasMissingDeps);
                header >>>= hasMissingDeps;
                int ballotMask = status.hasBallot ? ((globalFlags >>> HAS_BALLOT_HEADER_BIT_SHIFT) & 0x1) : 0;
                commandDecodeFlags |= ((int)header & ballotMask) << 2;
                header >>>= ballotMask;
                commandFlags[i] = commandDecodeFlags;
            }

            int encodedFlags = (int)header & 7;
            header >>>= 3;

            boolean hlcIsNegative = false;
            long epoch = prevEpoch;
            int readEpochBytes = 0;
            {
                boolean hasEpochDelta = (header & 1) == 1;
                header >>>= 1;
                if (hasEpochDelta)
                {
                    hlcIsNegative = (header & 1) == 1;
                    header >>>= 1;

                    int epochFlag = ((int)header & 0x3);
                    header >>>= 2;
                    switch (epochFlag)
                    {
                        default: throw new AssertionError("Unexpected value not 0-3");
                        case 0: ++epoch; break;
                        case 1: epoch += (header & 0xf); header >>>= 4; break;
                        case 2: readEpochBytes = 1; break;
                        case 3: readEpochBytes = 4; break;
                    }
                }
            }

            Id node = (Id)nodeIds[(int)(header & nodeIdMask)];
            header >>>= bitsPerNodeId;

            int readHlcBytes = getHlcBytes(hlcBytesLookup, (int)(header & 0x3));
            header >>>= 2;

            long hlc = header;
            {
                long highestBit = Long.highestOneBit(hlc);
                hlc ^= highestBit;
                int hlcShift = Long.numberOfTrailingZeros(highestBit);
                hlc |= readLeastSignificantBytes(readHlcBytes, in) << hlcShift;
            }
            if (hlcIsNegative)
                hlc = -1-hlc;
            hlc += prevHlc;

            int decodedFlags;
            switch (encodedFlags)
            {
                case 0: decodedFlags = in.get() & 0xff; break;
                case 1: decodedFlags = in.getShort() & 0xffff; break;
                default: decodedFlags = selectFlagHistory(encodedFlags, flagHistory);
            }
            if (readEpochBytes > 0)
                epoch += readEpochBytes == 1 ? (in.get() & 0xff) : in.getInt();

            TxnId txnId = TxnId.fromValues(epoch, hlc, decodedFlags, node);
            txnIds[i] = txnId;
            flagHistory = updateFlagHistory(decodedFlags, encodedFlags, flagHistory);

            prevEpoch = epoch;
            prevHlc = hlc;
        }

        int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in);
        int unmanagedCount = unmanagedPendingCommitCount + VIntCoding.readUnsignedVInt32(in);
        Unmanaged[] unmanageds;
        if (unmanagedCount == 0)
        {
            unmanageds = NO_PENDING_UNMANAGED;
        }
        else
        {
            unmanageds = new Unmanaged[unmanagedCount];
            Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
            int bytesPerNodeId = (bitsPerNodeId + 7) / 8;
            for (int i = 0 ; i < unmanagedCount ; ++i)
            {
                TxnId txnId = readTxnId(in, nodeIds, bytesPerNodeId);
                Timestamp waitingUntil = readTimestamp(in, nodeIds, bytesPerNodeId);
                unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
                if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY;
            }
        }

        if (((globalFlags & HAS_EXECUTE_AT_HEADER_BIT) | (globalFlags & HAS_MISSING_DEPS_HEADER_BIT) | (globalFlags & HAS_BALLOT_HEADER_BIT)) != 0)
        {
            TxnId[] missingIdBuffer = cachedTxnIds().get(8);
            int missingIdCount = 0, maxIdBufferCount = 0;
            int bitsPerTxnId = numberOfBitsToRepresent(commandCount);
            int txnIdMask = (1 << bitsPerTxnId) - 1;
            int bitsPerMissingId = bitsPerTxnId + 1;

            int decodeExecuteAtBits = (globalFlags & HAS_EXECUTE_AT_HEADER_BIT) != 0 ? in.getShort() & 0xffff : 0;
            int bitsPerExecuteAtEpoch = decodeExecuteAtBits >>> 10;
            int bitsPerExecuteAtHlc = 1 + ((decodeExecuteAtBits >>> 4) & 0x3f);
            int bitsPerExecuteAtFlags = decodeExecuteAtBits & 0xf;
            if (bitsPerExecuteAtFlags == 15) bitsPerExecuteAtFlags = 16;
            int bitsPerExecuteAt = bitsPerExecuteAtEpoch + bitsPerExecuteAtHlc + bitsPerExecuteAtFlags + bitsPerNodeId;

            long executeAtEpochMask = bitsPerExecuteAtEpoch == 0 ? 0 : (-1L >>> (64 - bitsPerExecuteAtEpoch));
            long executeAtHlcMask = (-1L >>> (64 - bitsPerExecuteAtHlc));
            long executeAtFlagsMask = bitsPerExecuteAtFlags == 0 ? 0 : (-1L >>> (64 - bitsPerExecuteAtFlags));

            int decodeBallotBits = (globalFlags & HAS_BALLOT_HEADER_BIT) != 0 ? in.getShort() & 0xffff : 0;
            int bitsPerBallotEpoch = decodeBallotBits >>> 10;
            int bitsPerBallotHlc = 1 + ((decodeBallotBits >>> 4) & 0x3f);
            int bitsPerBallotFlags = decodeBallotBits & 0xf;
            if (bitsPerBallotFlags == 15) bitsPerBallotFlags = 16;
            int bitsPerBallot = bitsPerBallotEpoch + bitsPerBallotHlc + bitsPerBallotFlags + bitsPerNodeId;

            long ballotEpochMask = bitsPerBallotEpoch == 0 ? 0 : (-1L >>> (64 - bitsPerBallotEpoch));
            long ballotHlcMask = (-1L >>> (64 - bitsPerBallotHlc));
            long ballotFlagsMask = bitsPerBallotFlags == 0 ? 0 : (-1L >>> (64 - bitsPerBallotFlags));

            Ballot prevBallot = null;
            final BitUtils.BitReader reader = new BitUtils.BitReader();
            for (int i = 0 ; i < commandCount ; ++i)
            {
                TxnId txnId = txnIds[i];
                int commandDecodeFlags = commandFlags[i];
                Timestamp executeAt = txnId;
                if ((commandDecodeFlags & HAS_EXECUTE_AT_HEADER_BIT) != 0)
                {
                    long epoch, hlc;
                    int flags;
                    Id id;
                    if (bitsPerExecuteAt <= 64)
                    {
                        long executeAtBits = reader.read(bitsPerExecuteAt, in);
                        epoch = txnId.epoch() + (executeAtBits & executeAtEpochMask);
                        executeAtBits >>>= bitsPerExecuteAtEpoch;
                        hlc = txnId.hlc() + (executeAtBits & executeAtHlcMask);
                        executeAtBits >>>= bitsPerExecuteAtHlc;
                        flags = (int)(executeAtBits & executeAtFlagsMask);
                        executeAtBits >>>= bitsPerExecuteAtFlags;
                        id = (Id)nodeIds[(int)(executeAtBits & nodeIdMask)];
                    }
                    else
                    {
                        epoch = txnId.epoch() + reader.read(bitsPerExecuteAtEpoch, in);
                        hlc = txnId.hlc() + reader.read(bitsPerExecuteAtHlc, in);
                        flags = (int) reader.read(bitsPerExecuteAtFlags, in);
                        id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
                    }
                    executeAt = Timestamp.fromValues(epoch, hlc, flags, id);
                }

                TxnId[] missing = NO_TXNIDS;
                if ((commandDecodeFlags & HAS_MISSING_DEPS_HEADER_BIT) != 0)
                {
                    int prev = -1;
                    while (true)
                    {
                        if (missingIdCount == missingIdBuffer.length)
                            missingIdBuffer = cachedTxnIds().resize(missingIdBuffer, missingIdCount, missingIdCount * 2);

                        int next = (int) reader.read(bitsPerMissingId, in);
                        Invariants.require(next > prev);
                        missingIdBuffer[missingIdCount++] = txnIds[next & txnIdMask];
                        if (next >= commandCount)
                            break; // finished this array
                        prev = next;
                    }

                    missing = Arrays.copyOf(missingIdBuffer, missingIdCount);
                    maxIdBufferCount = missingIdCount;
                    missingIdCount = 0;
                }

                Ballot ballot = Ballot.ZERO;
                if ((commandDecodeFlags & HAS_BALLOT_HEADER_BIT) != 0)
                {
                    if (prevBallot == null)
                    {
                        long msb = reader.read(64, in);
                        long lsb = reader.read(64, in);
                        Id id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
                        ballot = Ballot.fromBits(msb, lsb, id);
                    }
                    else
                    {
                        long epoch, hlc;
                        int flags;
                        Id id;
                        if (bitsPerBallot <= 64)
                        {
                            long ballotBits = reader.read(bitsPerBallot, in);
                            epoch = prevBallot.epoch() + decodeZigZag64(ballotBits & ballotEpochMask);
                            ballotBits >>>= bitsPerBallotEpoch;
                            hlc = prevBallot.hlc() + decodeZigZag64(ballotBits & ballotHlcMask);
                            ballotBits >>>= bitsPerBallotHlc;
                            flags = (int)(ballotBits & ballotFlagsMask);
                            ballotBits >>>= bitsPerBallotFlags;
                            id = (Id)nodeIds[(int)(ballotBits & nodeIdMask)];
                        }
                        else
                        {
                            epoch = prevBallot.epoch() + decodeZigZag64(reader.read(bitsPerBallotEpoch, in));
                            hlc = prevBallot.hlc() + decodeZigZag64(reader.read(bitsPerBallotHlc, in));
                            flags = (int) reader.read(bitsPerBallotFlags, in);
                            id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
                        }
                        ballot = Ballot.fromValues(epoch, hlc, flags, id);
                    }

                    prevBallot = ballot;
                }

                int statusIndex = commandDecodeFlags >>> 6;
                InternalStatus status = DECODE_STATUS[statusIndex];
                int statusOverrides = (commandDecodeFlags >>> 3) & 0x1;
                txns[i] = create(bounds, txnId, status, statusOverrides, executeAt, missing, ballot);
            }

            cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount);
        }
        else
        {
            for (int i = 0 ; i < commandCount ; ++i)
            {
                int commandDecodeFlags = commandFlags[i];
                int statusIndex = commandDecodeFlags >>> 6;
                InternalStatus status = DECODE_STATUS[statusIndex];
                int statusOverrides = (commandDecodeFlags >>> 3) & 0x1;
                txns[i] = create(bounds, txnIds[i], status, statusOverrides, txnIds[i], NO_TXNIDS, Ballot.ZERO);
            }
        }
        cachedTxnIds().forceDiscard(txnIds, commandCount);
        cachedAny().forceDiscard(nodeIds, nodeIdCount);
        return CommandsForKey.SerializerSupport.create(key, txns, maxUniqueHlc, unmanageds, prunedBeforeIndex == -1 ? TxnId.NONE : txns[prunedBeforeIndex], bounds);
    }