in accord-core/src/main/java/accord/local/cfk/Serialize.java [715:1043]
public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in)
{
if (!in.hasRemaining())
return null;
in = in.duplicate();
int commandCount = VIntCoding.readUnsignedVInt32(in) - 1;
if (commandCount <= 0)
{
if (commandCount == -1)
return new CommandsForKey(key);
long maxUniqueHlc = VIntCoding.readUnsignedVInt(in);
return new CommandsForKey(key).updateUniqueHlc(maxUniqueHlc);
}
TxnId[] txnIds = cachedTxnIds().get(commandCount);
int[] commandFlags = cachedInts().getInts(commandCount);
TxnInfo[] txns = new TxnInfo[commandCount];
int nodeIdCount = VIntCoding.readUnsignedVInt32(in);
int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
long nodeIdMask = (1L << bitsPerNodeId) - 1;
Object[] nodeIds = cachedAny().get(nodeIdCount);
{
int prev = VIntCoding.readUnsignedVInt32(in);
nodeIds[0] = new Id(prev);
for (int i = 1 ; i < nodeIdCount ; ++i)
nodeIds[i] = new Id(prev += VIntCoding.readUnsignedVInt32(in));
}
int globalFlags = (int) readLeastSignificantBytes(3, in);
int headerByteCount, hlcBytesLookup;
{
headerByteCount = 1 + ((globalFlags >>> 14) & 0x3);
hlcBytesLookup = setHlcByteDeltas((globalFlags >>> 16) & 0x3, (globalFlags >>> 18) & 0x3, (globalFlags >>> 20) & 0x3, (globalFlags >>> 22) & 0x3);
}
long flagHistory = EMPTY_FLAG_HISTORY;
QuickBounds bounds;
long prevEpoch, prevHlc, maxUniqueHlc = 0;
{
long startEpoch, endEpoch;
{
startEpoch = VIntCoding.readUnsignedVInt(in);
boolean hasEndEpoch = (startEpoch & 1) == 1;
startEpoch /= 2;
if (hasEndEpoch) endEpoch = startEpoch + VIntCoding.readUnsignedVInt(in);
else endEpoch = Long.MAX_VALUE;
}
bounds = NO_BOUNDS_INFO.withEpochs(startEpoch, endEpoch);
prevEpoch = startEpoch + VIntCoding.readUnsignedVInt(in);
prevHlc = VIntCoding.readUnsignedVInt(in);
{
int flags = ((globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT) == 0) ? RX_FLAGS : VIntCoding.readUnsignedVInt32(in);
Id node = (Id)nodeIds[VIntCoding.readUnsignedVInt32(in)];
bounds = bounds.withGcBeforeBeforeAtLeast(TxnId.fromValues(prevEpoch, prevHlc, flags, node));
}
if (0 != (globalFlags & HAS_BOOTSTRAPPED_AT_HEADER_BIT))
{
long epoch = prevEpoch + VIntCoding.readUnsignedVInt(in);
long hlc = prevHlc + VIntCoding.readVInt(in);
int flags = ((globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT) == 0) ? RX_FLAGS : VIntCoding.readUnsignedVInt32(in);
Id node = (Id)nodeIds[VIntCoding.readUnsignedVInt32(in)];
bounds = bounds.withBootstrappedAtLeast(TxnId.fromValues(epoch, hlc, flags, node));
}
if (0 != (globalFlags & HAS_MAX_HLC_HEADER_BIT))
maxUniqueHlc = bounds.gcBefore.hlc() + VIntCoding.readVInt(in);
}
int prunedBeforeIndex = VIntCoding.readUnsignedVInt32(in) - 1;
for (int i = 0 ; i < commandCount ; ++i)
{
long header = readLeastSignificantBytes(headerByteCount, in);
header |= 1L << (8 * headerByteCount); // marker so we know where to shift-left most-significant bytes to
int commandDecodeFlags = (int)(header & 0xF);
InternalStatus status = DECODE_STATUS[commandDecodeFlags];
header >>>= 4;
commandDecodeFlags <<= 6;
{
int overrideMask = (globalFlags >>> HAS_STATUS_OVERRIDES_HEADER_BIT_SHIFT) & 0x1;
int statusOverrides = (int) (header & overrideMask);
header >>>= overrideMask;
commandDecodeFlags |= statusOverrides << 3;
int statusFlags = status.flags ^ statusOverrides;
int hasExecuteAt = (statusFlags >>> 1) & (globalFlags >>> HAS_EXECUTE_AT_HEADER_BIT_SHIFT);
int hasMissingDeps = (statusFlags & 0x1) & (globalFlags >>> HAS_MISSING_DEPS_HEADER_BIT_SHIFT);
commandDecodeFlags |= ((int)header & hasExecuteAt) << 1;
header >>>= hasExecuteAt;
commandDecodeFlags |= ((int)header & hasMissingDeps);
header >>>= hasMissingDeps;
int ballotMask = status.hasBallot ? ((globalFlags >>> HAS_BALLOT_HEADER_BIT_SHIFT) & 0x1) : 0;
commandDecodeFlags |= ((int)header & ballotMask) << 2;
header >>>= ballotMask;
commandFlags[i] = commandDecodeFlags;
}
int encodedFlags = (int)header & 7;
header >>>= 3;
boolean hlcIsNegative = false;
long epoch = prevEpoch;
int readEpochBytes = 0;
{
boolean hasEpochDelta = (header & 1) == 1;
header >>>= 1;
if (hasEpochDelta)
{
hlcIsNegative = (header & 1) == 1;
header >>>= 1;
int epochFlag = ((int)header & 0x3);
header >>>= 2;
switch (epochFlag)
{
default: throw new AssertionError("Unexpected value not 0-3");
case 0: ++epoch; break;
case 1: epoch += (header & 0xf); header >>>= 4; break;
case 2: readEpochBytes = 1; break;
case 3: readEpochBytes = 4; break;
}
}
}
Id node = (Id)nodeIds[(int)(header & nodeIdMask)];
header >>>= bitsPerNodeId;
int readHlcBytes = getHlcBytes(hlcBytesLookup, (int)(header & 0x3));
header >>>= 2;
long hlc = header;
{
long highestBit = Long.highestOneBit(hlc);
hlc ^= highestBit;
int hlcShift = Long.numberOfTrailingZeros(highestBit);
hlc |= readLeastSignificantBytes(readHlcBytes, in) << hlcShift;
}
if (hlcIsNegative)
hlc = -1-hlc;
hlc += prevHlc;
int decodedFlags;
switch (encodedFlags)
{
case 0: decodedFlags = in.get() & 0xff; break;
case 1: decodedFlags = in.getShort() & 0xffff; break;
default: decodedFlags = selectFlagHistory(encodedFlags, flagHistory);
}
if (readEpochBytes > 0)
epoch += readEpochBytes == 1 ? (in.get() & 0xff) : in.getInt();
TxnId txnId = TxnId.fromValues(epoch, hlc, decodedFlags, node);
txnIds[i] = txnId;
flagHistory = updateFlagHistory(decodedFlags, encodedFlags, flagHistory);
prevEpoch = epoch;
prevHlc = hlc;
}
int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in);
int unmanagedCount = unmanagedPendingCommitCount + VIntCoding.readUnsignedVInt32(in);
Unmanaged[] unmanageds;
if (unmanagedCount == 0)
{
unmanageds = NO_PENDING_UNMANAGED;
}
else
{
unmanageds = new Unmanaged[unmanagedCount];
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
int bytesPerNodeId = (bitsPerNodeId + 7) / 8;
for (int i = 0 ; i < unmanagedCount ; ++i)
{
TxnId txnId = readTxnId(in, nodeIds, bytesPerNodeId);
Timestamp waitingUntil = readTimestamp(in, nodeIds, bytesPerNodeId);
unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY;
}
}
if (((globalFlags & HAS_EXECUTE_AT_HEADER_BIT) | (globalFlags & HAS_MISSING_DEPS_HEADER_BIT) | (globalFlags & HAS_BALLOT_HEADER_BIT)) != 0)
{
TxnId[] missingIdBuffer = cachedTxnIds().get(8);
int missingIdCount = 0, maxIdBufferCount = 0;
int bitsPerTxnId = numberOfBitsToRepresent(commandCount);
int txnIdMask = (1 << bitsPerTxnId) - 1;
int bitsPerMissingId = bitsPerTxnId + 1;
int decodeExecuteAtBits = (globalFlags & HAS_EXECUTE_AT_HEADER_BIT) != 0 ? in.getShort() & 0xffff : 0;
int bitsPerExecuteAtEpoch = decodeExecuteAtBits >>> 10;
int bitsPerExecuteAtHlc = 1 + ((decodeExecuteAtBits >>> 4) & 0x3f);
int bitsPerExecuteAtFlags = decodeExecuteAtBits & 0xf;
if (bitsPerExecuteAtFlags == 15) bitsPerExecuteAtFlags = 16;
int bitsPerExecuteAt = bitsPerExecuteAtEpoch + bitsPerExecuteAtHlc + bitsPerExecuteAtFlags + bitsPerNodeId;
long executeAtEpochMask = bitsPerExecuteAtEpoch == 0 ? 0 : (-1L >>> (64 - bitsPerExecuteAtEpoch));
long executeAtHlcMask = (-1L >>> (64 - bitsPerExecuteAtHlc));
long executeAtFlagsMask = bitsPerExecuteAtFlags == 0 ? 0 : (-1L >>> (64 - bitsPerExecuteAtFlags));
int decodeBallotBits = (globalFlags & HAS_BALLOT_HEADER_BIT) != 0 ? in.getShort() & 0xffff : 0;
int bitsPerBallotEpoch = decodeBallotBits >>> 10;
int bitsPerBallotHlc = 1 + ((decodeBallotBits >>> 4) & 0x3f);
int bitsPerBallotFlags = decodeBallotBits & 0xf;
if (bitsPerBallotFlags == 15) bitsPerBallotFlags = 16;
int bitsPerBallot = bitsPerBallotEpoch + bitsPerBallotHlc + bitsPerBallotFlags + bitsPerNodeId;
long ballotEpochMask = bitsPerBallotEpoch == 0 ? 0 : (-1L >>> (64 - bitsPerBallotEpoch));
long ballotHlcMask = (-1L >>> (64 - bitsPerBallotHlc));
long ballotFlagsMask = bitsPerBallotFlags == 0 ? 0 : (-1L >>> (64 - bitsPerBallotFlags));
Ballot prevBallot = null;
final BitUtils.BitReader reader = new BitUtils.BitReader();
for (int i = 0 ; i < commandCount ; ++i)
{
TxnId txnId = txnIds[i];
int commandDecodeFlags = commandFlags[i];
Timestamp executeAt = txnId;
if ((commandDecodeFlags & HAS_EXECUTE_AT_HEADER_BIT) != 0)
{
long epoch, hlc;
int flags;
Id id;
if (bitsPerExecuteAt <= 64)
{
long executeAtBits = reader.read(bitsPerExecuteAt, in);
epoch = txnId.epoch() + (executeAtBits & executeAtEpochMask);
executeAtBits >>>= bitsPerExecuteAtEpoch;
hlc = txnId.hlc() + (executeAtBits & executeAtHlcMask);
executeAtBits >>>= bitsPerExecuteAtHlc;
flags = (int)(executeAtBits & executeAtFlagsMask);
executeAtBits >>>= bitsPerExecuteAtFlags;
id = (Id)nodeIds[(int)(executeAtBits & nodeIdMask)];
}
else
{
epoch = txnId.epoch() + reader.read(bitsPerExecuteAtEpoch, in);
hlc = txnId.hlc() + reader.read(bitsPerExecuteAtHlc, in);
flags = (int) reader.read(bitsPerExecuteAtFlags, in);
id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
}
executeAt = Timestamp.fromValues(epoch, hlc, flags, id);
}
TxnId[] missing = NO_TXNIDS;
if ((commandDecodeFlags & HAS_MISSING_DEPS_HEADER_BIT) != 0)
{
int prev = -1;
while (true)
{
if (missingIdCount == missingIdBuffer.length)
missingIdBuffer = cachedTxnIds().resize(missingIdBuffer, missingIdCount, missingIdCount * 2);
int next = (int) reader.read(bitsPerMissingId, in);
Invariants.require(next > prev);
missingIdBuffer[missingIdCount++] = txnIds[next & txnIdMask];
if (next >= commandCount)
break; // finished this array
prev = next;
}
missing = Arrays.copyOf(missingIdBuffer, missingIdCount);
maxIdBufferCount = missingIdCount;
missingIdCount = 0;
}
Ballot ballot = Ballot.ZERO;
if ((commandDecodeFlags & HAS_BALLOT_HEADER_BIT) != 0)
{
if (prevBallot == null)
{
long msb = reader.read(64, in);
long lsb = reader.read(64, in);
Id id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
ballot = Ballot.fromBits(msb, lsb, id);
}
else
{
long epoch, hlc;
int flags;
Id id;
if (bitsPerBallot <= 64)
{
long ballotBits = reader.read(bitsPerBallot, in);
epoch = prevBallot.epoch() + decodeZigZag64(ballotBits & ballotEpochMask);
ballotBits >>>= bitsPerBallotEpoch;
hlc = prevBallot.hlc() + decodeZigZag64(ballotBits & ballotHlcMask);
ballotBits >>>= bitsPerBallotHlc;
flags = (int)(ballotBits & ballotFlagsMask);
ballotBits >>>= bitsPerBallotFlags;
id = (Id)nodeIds[(int)(ballotBits & nodeIdMask)];
}
else
{
epoch = prevBallot.epoch() + decodeZigZag64(reader.read(bitsPerBallotEpoch, in));
hlc = prevBallot.hlc() + decodeZigZag64(reader.read(bitsPerBallotHlc, in));
flags = (int) reader.read(bitsPerBallotFlags, in);
id = (Id)nodeIds[(int)(reader.read(bitsPerNodeId, in))];
}
ballot = Ballot.fromValues(epoch, hlc, flags, id);
}
prevBallot = ballot;
}
int statusIndex = commandDecodeFlags >>> 6;
InternalStatus status = DECODE_STATUS[statusIndex];
int statusOverrides = (commandDecodeFlags >>> 3) & 0x1;
txns[i] = create(bounds, txnId, status, statusOverrides, executeAt, missing, ballot);
}
cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount);
}
else
{
for (int i = 0 ; i < commandCount ; ++i)
{
int commandDecodeFlags = commandFlags[i];
int statusIndex = commandDecodeFlags >>> 6;
InternalStatus status = DECODE_STATUS[statusIndex];
int statusOverrides = (commandDecodeFlags >>> 3) & 0x1;
txns[i] = create(bounds, txnIds[i], status, statusOverrides, txnIds[i], NO_TXNIDS, Ballot.ZERO);
}
}
cachedTxnIds().forceDiscard(txnIds, commandCount);
cachedAny().forceDiscard(nodeIds, nodeIdCount);
return CommandsForKey.SerializerSupport.create(key, txns, maxUniqueHlc, unmanageds, prunedBeforeIndex == -1 ? TxnId.NONE : txns[prunedBeforeIndex], bounds);
}