in accord-core/src/main/java/accord/local/cfk/Serialize.java [137:713]
private static ByteBuffer unsafeToBytesWithoutKey(CommandsForKey cfk)
{
Invariants.require(!cfk.isLoadingPruned());
int commandCount = cfk.size();
if (commandCount == 0)
{
if (!cfk.hasMaxUniqueHlc())
return ByteBuffer.allocate(1);
int size = 1 + VIntCoding.sizeOfUnsignedVInt(cfk.maxUniqueHlc);
ByteBuffer result = ByteBuffer.allocate(size);
VIntCoding.writeUnsignedVInt32(1, result);
VIntCoding.writeUnsignedVInt(cfk.maxUniqueHlc, result);
Invariants.require(!result.hasRemaining());
result.flip();
return result;
}
int[] nodeIds = cachedInts().getInts(Math.min(64, Math.max(4, commandCount)));
try
{
// first compute the unique Node Ids and some basic characteristics of the data, such as
// whether we have any missing transactions to encode, any executeAt that are not equal to their TxnId
// and whether there are any non-standard flag bits to encode
int nodeIdCount, missingIdCount = 0, executeAtCount = 0, ballotCount = 0, overrideCount = 0;
int bitsPerExecuteAtEpoch = 0, bitsPerExecuteAtFlags = 0, bitsPerExecuteAtHlc = 1; // to permit us to use full 64 bits and encode in 5 bits we force at least one hlc bit
{
nodeIds[0] = cfk.redundantBefore().node.id;
nodeIdCount = 1;
{
TxnId bootstrappedAt = cfk.bootstrappedAt();
if (bootstrappedAt != null)
nodeIds[nodeIdCount++] = bootstrappedAt.node.id;
}
for (int i = 0 ; i < commandCount ; ++i)
{
if (nodeIdCount + 3 >= nodeIds.length)
{
nodeIdCount = compact(nodeIds, nodeIdCount);
if (nodeIdCount > nodeIds.length/2 || nodeIdCount + 3 >= nodeIds.length)
nodeIds = cachedInts().resize(nodeIds, nodeIds.length, Math.max(nodeIdCount + 4, nodeIds.length * 2));
}
TxnInfo txn = cfk.get(i);
Invariants.require(!txn.is(InternalStatus.PRUNED));
overrideCount += txn.statusOverrides();
nodeIds[nodeIdCount++] = txn.node.id;
if (txn.executeAt != txn)
{
Invariants.require(txn.hasExecuteAt());
nodeIds[nodeIdCount++] = txn.executeAt.node.id;
bitsPerExecuteAtEpoch = Math.max(bitsPerExecuteAtEpoch, numberOfBitsToRepresent(txn.executeAt.epoch() - txn.epoch()));
bitsPerExecuteAtHlc = Math.max(bitsPerExecuteAtHlc, numberOfBitsToRepresent(txn.executeAt.hlc() - txn.hlc()));
bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, numberOfBitsToRepresent(txn.executeAt.flags()));
executeAtCount += 1;
}
if (txn.getClass() == TxnInfoExtra.class)
{
TxnInfoExtra extra = (TxnInfoExtra) txn;
missingIdCount += extra.missing.length;
Invariants.require(extra.missing.length == 0 || txn.hasDeps());
if (extra.ballot != Ballot.ZERO)
{
Invariants.require(txn.hasBallot());
nodeIds[nodeIdCount++] = extra.ballot.node.id;
ballotCount += 1;
}
}
}
for (Unmanaged unmanaged : cfk.unmanageds)
{
if (nodeIdCount + 2 >= nodeIds.length)
{
nodeIdCount = compact(nodeIds, nodeIdCount);
if (nodeIdCount > nodeIds.length / 2 || nodeIdCount + 2 >= nodeIds.length)
nodeIds = cachedInts().resize(nodeIds, nodeIds.length, nodeIds.length * 2);
}
nodeIds[nodeIdCount++] = unmanaged.txnId.node.id;
nodeIds[nodeIdCount++] = unmanaged.waitingUntil.node.id;
}
nodeIdCount = compact(nodeIds, nodeIdCount);
Invariants.require(nodeIdCount > 0);
}
// We can now use this information to calculate the fixed header size, compute the amount
// of additional space we'll need to store the TxnId and its basic info
int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
int minHeaderBits = 10 + bitsPerNodeId + (overrideCount > 0 ? 1 : 0);
int headerFlags = (executeAtCount > 0 ? 1 : 0)
| (missingIdCount > 0 ? 2 : 0)
| (ballotCount > 0 ? 4 : 0);
int maxHeaderBits = minHeaderBits;
int totalBytes = 0;
int prunedBeforeIndex = cfk.prunedBefore().equals(TxnId.NONE) ? -1 : cfk.indexOf(cfk.prunedBefore());
long flagHistory = EMPTY_FLAG_HISTORY;
long prevEpoch = cfk.redundantBefore().epoch();
long prevHlc = cfk.redundantBefore().hlc();
int[] bytesHistogram = cachedInts().getInts(12);
Arrays.fill(bytesHistogram, 0);
for (int i = 0 ; i < commandCount ; ++i)
{
int headerBits = minHeaderBits;
int payloadBits = 0;
TxnInfo txn = cfk.get(i);
{
long epoch = txn.epoch();
Invariants.require(epoch >= prevEpoch);
long epochDelta = epoch - prevEpoch;
long hlc = txn.hlc();
long hlcDelta = hlc - prevHlc;
if (epochDelta > 0)
{
if (hlcDelta < 0)
hlcDelta = -1 - hlcDelta;
headerBits += 3;
if (epochDelta > 1)
{
if (epochDelta <= 0xf) headerBits += 4;
else if (epochDelta <= 0xff) totalBytes += 1;
else { totalBytes += 4; Invariants.require(epochDelta <= 0xffffffffL); }
}
}
payloadBits += numberOfBitsToRepresent(hlcDelta);
prevEpoch = epoch;
prevHlc = hlc;
}
int flags = txn.flags();
int encodedFlagBits = encodedFlagBits(flags, flagHistory);
flagHistory = updateFlagHistory(flags, encodedFlagBits, flagHistory);
if (encodedFlagBits < 2)
totalBytes += 1 + encodedFlagBits;
if (txn.hasExecuteAt())
headerBits += headerFlags & 0x1;
if (txn.hasDeps())
headerBits += (headerFlags >>> 1) & 0x1;
if (txn.hasBallot())
headerBits += (headerFlags >>> 2);
maxHeaderBits = Math.max(headerBits, maxHeaderBits);
int basicBytes = (headerBits + payloadBits + 7)/8;
bytesHistogram[basicBytes]++;
}
int minBasicBytes = -1, maxBasicBytes = 0;
for (int i = 0 ; i < bytesHistogram.length ; ++i)
{
if (bytesHistogram[i] == 0) continue;
if (minBasicBytes == -1) minBasicBytes = i;
maxBasicBytes = i;
}
for (int i = minBasicBytes + 1 ; i <= maxBasicBytes ; ++i)
bytesHistogram[i] += bytesHistogram[i-1];
int globalFlags = (missingIdCount > 0 ? HAS_MISSING_DEPS_HEADER_BIT : 0)
| (executeAtCount > 0 ? HAS_EXECUTE_AT_HEADER_BIT : 0)
| (ballotCount > 0 ? HAS_BALLOT_HEADER_BIT : 0)
| (overrideCount > 0 ? HAS_STATUS_OVERRIDES_HEADER_BIT : 0)
| (cfk.bootstrappedAt() != null ? HAS_BOOTSTRAPPED_AT_HEADER_BIT : 0)
| (hasBoundsFlags(cfk) ? HAS_BOUNDS_FLAGS_HEADER_BIT : 0)
| (cfk.hasMaxUniqueHlc() ? HAS_MAX_HLC_HEADER_BIT : 0)
;
int headerBytes = (maxHeaderBits+7)/8;
globalFlags |= Invariants.requireArgument(headerBytes - 1, headerBytes <= 4) << 14;
int hlcBytesLookup;
{ // 2bits per size, first value may be zero and remainder may be increments of 1-4;
// only need to be able to encode a distribution of approx. 8 bytes at most, so
// pick lowest number we need first, then next lowest as 25th %ile while ensuring value of 1-4;
// then pick highest number we need, ensuring at least 2 greater than second (leaving room for third)
// then pick third number as 75th %ile, but at least 1 less than highest, and one more than second
// finally, ensure third then second are distributed so that there is no more than a gap of 4 between them and the next
int l0 = Math.max(0, Math.min(3, minBasicBytes - headerBytes));
int l1 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes, commandCount/4);
l1 = Math.max(l0+1, Math.min(l0+4, (l1 < 0 ? -1 - l1 : l1) - headerBytes));
int l3 = Math.max(l1+2, maxBasicBytes - headerBytes);
int l2 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes,(3*commandCount)/4);
l2 = Math.max(l1+1, Math.min(l3-1, (l2 < 0 ? -1 -l2 : l2) - headerBytes));
while (l3-l2 > 4) ++l2;
while (l2-l1 > 4) ++l1;
hlcBytesLookup = setHlcBytes(l0, l1, l2, l3);
globalFlags |= (l0 | ((l1-(1+l0))<<2) | ((l2-(1+l1))<<4) | ((l3-(1+l2))<<6)) << 16;
}
int hlcFlagLookup = hlcBytesLookupToHlcFlagLookup(hlcBytesLookup);
totalBytes += bytesHistogram[minBasicBytes] * (headerBytes + getHlcBytes(hlcBytesLookup, getHlcFlag(hlcFlagLookup, minBasicBytes - headerBytes)));
for (int i = minBasicBytes + 1 ; i <= maxBasicBytes ; ++i)
totalBytes += (bytesHistogram[i] - bytesHistogram[i-1]) * (headerBytes + getHlcBytes(hlcBytesLookup, getHlcFlag(hlcFlagLookup, i - headerBytes)));
totalBytes += sizeOfUnsignedVInt(commandCount + 1);
totalBytes += sizeOfUnsignedVInt(nodeIdCount);
totalBytes += sizeOfUnsignedVInt(nodeIds[0]);
for (int i = 1 ; i < nodeIdCount ; ++i)
totalBytes += sizeOfUnsignedVInt(nodeIds[i] - nodeIds[i - 1]);
totalBytes += 3;
cachedInts().forceDiscard(bytesHistogram);
{
Timestamp redundantBefore = cfk.redundantBefore();
TxnId bootstrappedAt = cfk.bootstrappedAt();
prevEpoch = redundantBefore.epoch();
prevHlc = redundantBefore.hlc();
{
QuickBounds bounds = cfk.bounds();
long start = bounds.startEpoch;
long end = bounds.endEpoch;
totalBytes += sizeOfUnsignedVInt(start * 2 + (end == Long.MAX_VALUE ? 0 : 1));
if (end != Long.MAX_VALUE)
totalBytes += sizeOfUnsignedVInt(end - start);
totalBytes += sizeOfUnsignedVInt(prevEpoch - start);
}
totalBytes += sizeOfUnsignedVInt(prevHlc);
if (0 != (globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT))
totalBytes += sizeOfUnsignedVInt(redundantBefore.flags());
totalBytes += sizeOfUnsignedVInt(Arrays.binarySearch(nodeIds, 0, nodeIdCount, redundantBefore.node.id));
if (bootstrappedAt != null)
{
totalBytes += sizeOfUnsignedVInt(bootstrappedAt.epoch() - prevEpoch);
totalBytes += sizeOfVInt(bootstrappedAt.hlc() - prevHlc);
if (0 != (globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT))
totalBytes += sizeOfUnsignedVInt(bootstrappedAt.flags());
totalBytes += sizeOfUnsignedVInt(Arrays.binarySearch(nodeIds, 0, nodeIdCount, bootstrappedAt.node.id));
}
if (0 != (globalFlags & HAS_MAX_HLC_HEADER_BIT))
totalBytes += sizeOfVInt(cfk.maxUniqueHlc - prevHlc);
}
totalBytes += sizeOfUnsignedVInt(prunedBeforeIndex + 1);
int bitsPerBallotEpoch = 0, bitsPerBallotHlc = 1, bitsPerBallotFlags = 0;
if ((missingIdCount | executeAtCount | ballotCount) > 0)
{
if (ballotCount > 0)
{
Ballot prevBallot = null;
for (int i = 0 ; i < commandCount ; ++i)
{
TxnInfo txn = cfk.get(i);
if (txn.getClass() != TxnInfoExtra.class) continue;
if (!txn.hasBallot()) continue;
TxnInfoExtra extra = (TxnInfoExtra) txn;
if (extra.ballot == Ballot.ZERO) continue;
if (prevBallot != null)
{
bitsPerBallotEpoch = Math.max(bitsPerBallotEpoch, numberOfBitsToRepresent(encodeZigZag64(extra.ballot.epoch() - prevBallot.epoch())));
bitsPerBallotHlc = Math.max(bitsPerBallotHlc, numberOfBitsToRepresent(encodeZigZag64(extra.ballot.hlc() - prevBallot.hlc())));
bitsPerBallotFlags = Math.max(bitsPerBallotFlags, numberOfBitsToRepresent(extra.ballot.flags()));
}
prevBallot = extra.ballot;
}
totalBytes += 2; // encode bit widths
}
if (executeAtCount > 0)
totalBytes += 2; // encode bit widths
// account for encoding missing id stream
int missingIdBits = 1 + numberOfBitsToRepresent(commandCount);
int executeAtBits = bitsPerNodeId
+ bitsPerExecuteAtEpoch
+ bitsPerExecuteAtHlc
+ bitsPerExecuteAtFlags;
int ballotBits = bitsPerNodeId
+ bitsPerBallotEpoch
+ bitsPerBallotHlc
+ bitsPerBallotFlags;
totalBytes += (missingIdBits * missingIdCount
+ executeAtBits * executeAtCount
+ (ballotCount > 0 ? ballotBits * (ballotCount - 1) + bitsPerNodeId + 128 : 0)
+ 7)/8;
}
// count unmanaged bytes
int unmanagedPendingCommitCount = 0;
{
int bytesPerNodeId = (bitsPerNodeId + 7) / 8;
for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
{
Unmanaged unmanaged = cfk.getUnmanaged(i);
if (unmanaged.pending == Unmanaged.Pending.COMMIT)
++unmanagedPendingCommitCount;
totalBytes += 2 * (TIMESTAMP_BASE_SIZE + bytesPerNodeId);
}
}
totalBytes += sizeOfUnsignedVInt(unmanagedPendingCommitCount);
totalBytes += sizeOfUnsignedVInt(cfk.unmanagedCount() - unmanagedPendingCommitCount);
ByteBuffer out = ByteBuffer.allocate(totalBytes);
VIntCoding.writeUnsignedVInt32(commandCount + 1, out);
VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
VIntCoding.writeUnsignedVInt32(nodeIds[0], out);
for (int i = 1 ; i < nodeIdCount ; ++i) // TODO (desired): can encode more efficiently as a stream of N bit integers
VIntCoding.writeUnsignedVInt32(nodeIds[i] - nodeIds[i-1], out);
writeLeastSignificantBytes(globalFlags, 3, out);
{
TxnId redundantBefore = cfk.redundantBefore();
Timestamp bootstrappedAt = cfk.bootstrappedAt();
QuickBounds bounds = cfk.bounds();
long start = bounds.startEpoch;
long end = bounds.endEpoch;
VIntCoding.writeUnsignedVInt((start << 1) | (end != Long.MAX_VALUE ? 1 : 0), out);
if (end != Long.MAX_VALUE)
VIntCoding.writeUnsignedVInt(end - start, out);
VIntCoding.writeUnsignedVInt(prevEpoch - start, out);
VIntCoding.writeUnsignedVInt(prevHlc, out);
if (0 != (globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT))
VIntCoding.writeUnsignedVInt32(redundantBefore.flags(), out);
VIntCoding.writeUnsignedVInt32(Arrays.binarySearch(nodeIds, 0, nodeIdCount, redundantBefore.node.id), out);
if (bootstrappedAt != null)
{
VIntCoding.writeUnsignedVInt(bootstrappedAt.epoch() - prevEpoch, out);
VIntCoding.writeVInt(bootstrappedAt.hlc() - prevHlc, out);
if (0 != (globalFlags & HAS_BOUNDS_FLAGS_HEADER_BIT))
VIntCoding.writeUnsignedVInt(bootstrappedAt.flags(), out);
VIntCoding.writeUnsignedVInt(Arrays.binarySearch(nodeIds, 0, nodeIdCount, bootstrappedAt.node.id), out);
}
if (0 != (globalFlags & HAS_MAX_HLC_HEADER_BIT))
VIntCoding.writeVInt(cfk.maxUniqueHlc - prevHlc, out);
}
VIntCoding.writeUnsignedVInt32(prunedBeforeIndex + 1, out);
flagHistory = EMPTY_FLAG_HISTORY;
int flagsPlus = globalFlags & COMMAND_HEADER_BIT_FLAGS_MASK;
// TODO (desired): check this loop compiles correctly to only branch on epoch case, for binarySearch and flushing
for (int i = 0 ; i < commandCount ; ++i)
{
TxnInfo txn = cfk.get(i);
long bits = txn.status().ordinal();
int bitIndex = 4;
int statusHasExecuteAt = txn.hasExecuteAt() ? 1 : 0;
int statusHasDeps = txn.hasDeps() ? 1 : 0;
int statusHasBallot = txn.hasBallot() ? 1 : 0;
int statusOverrides = txn.statusOverrides();
bits |= ((long)statusOverrides) << bitIndex;
bitIndex += 1 & (flagsPlus >>> HAS_STATUS_OVERRIDES_HEADER_BIT_SHIFT);
long hasExecuteAt = txn.executeAt != txn ? 1 : 0;
Invariants.require(hasExecuteAt <= statusHasExecuteAt);
bits |= hasExecuteAt << bitIndex;
bitIndex += statusHasExecuteAt & (flagsPlus >>> HAS_EXECUTE_AT_HEADER_BIT_SHIFT);
long hasMissingIds = txn.getClass() == TxnInfoExtra.class && ((TxnInfoExtra)txn).missing != NO_TXNIDS ? 1 : 0;
bits |= hasMissingIds << bitIndex;
bitIndex += statusHasDeps & (flagsPlus >>> HAS_MISSING_DEPS_HEADER_BIT_SHIFT);
long hasBallot = txn.getClass() == TxnInfoExtra.class && ((TxnInfoExtra)txn).ballot != Ballot.ZERO ? 1 : 0;
bits |= hasBallot << bitIndex;
bitIndex += statusHasBallot & (flagsPlus >>> HAS_BALLOT_HEADER_BIT_SHIFT);
int encodedFlagBits;
{
int flags = txn.flags();
encodedFlagBits = encodedFlagBits(flags, flagHistory);
flagHistory = updateFlagHistory(flags, encodedFlagBits, flagHistory);
bits |= (long)encodedFlagBits << bitIndex;
bitIndex += 3;
}
long hlcBits;
int extraEpochDeltaBytes = 0;
{
long epoch = txn.epoch();
long delta = epoch - prevEpoch;
long hlc = txn.hlc();
hlcBits = hlc - prevHlc;
if (delta == 0)
{
bitIndex++;
}
else
{
bits |= 1L << bitIndex++;
if (hlcBits < 0)
{
hlcBits = -1 - hlcBits;
bits |= 1L << bitIndex;
}
bitIndex++;
if (delta > 1)
{
if (delta <= 0xf)
{
bits |= 1L << bitIndex;
bits |= delta << (bitIndex + 2);
bitIndex += 4;
}
else
{
bits |= (delta <= 0xff ? 2L : 3L) << bitIndex;
extraEpochDeltaBytes = Ints.checkedCast(delta);
}
}
bitIndex += 2;
}
prevEpoch = epoch;
prevHlc = hlc;
}
bits |= ((long)Arrays.binarySearch(nodeIds, 0, nodeIdCount, txn.node.id)) << bitIndex;
bitIndex += bitsPerNodeId;
bits |= hlcBits << (bitIndex + 2);
hlcBits >>>= 8*headerBytes - (bitIndex + 2);
int hlcFlag = getHlcFlag(hlcFlagLookup, (7 + numberOfBitsToRepresent(hlcBits))/8);
bits |= ((long)hlcFlag) << bitIndex;
writeLeastSignificantBytes(bits, headerBytes, out);
writeLeastSignificantBytes(hlcBits, getHlcBytes(hlcBytesLookup, hlcFlag), out);
if (encodedFlagBits < 2)
{
int flags = txn.flags();
if (encodedFlagBits == 0) out.put((byte)flags);
else out.putShort((short)flags);
}
if (extraEpochDeltaBytes > 0)
{
if (extraEpochDeltaBytes <= 0xff) out.put((byte)extraEpochDeltaBytes);
else out.putInt(extraEpochDeltaBytes);
}
}
VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - unmanagedPendingCommitCount, out);
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
{
int bytesPerNodeId = (bitsPerNodeId + 7) / 8;
for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
{
Unmanaged unmanaged = cfk.getUnmanaged(i);
Invariants.require(unmanaged.pending == pending);
writeTxnId(unmanaged.txnId, out, nodeIds, nodeIdCount, bytesPerNodeId);
writeTimestamp(unmanaged.waitingUntil, out, nodeIds, nodeIdCount, bytesPerNodeId);
if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY;
}
}
if ((executeAtCount | missingIdCount | ballotCount) > 0)
{
int bitsPerCommandId = numberOfBitsToRepresent(commandCount);
int bitsPerMissingId = 1 + bitsPerCommandId;
int bitsPerExecuteAt = bitsPerExecuteAtEpoch + bitsPerExecuteAtHlc + bitsPerExecuteAtFlags + bitsPerNodeId;
if (bitsPerBallotFlags == 15) bitsPerBallotFlags = 16;
int bitsPerBallot = bitsPerBallotEpoch + bitsPerBallotHlc + bitsPerBallotFlags + bitsPerNodeId;
Invariants.require(bitsPerExecuteAtEpoch < 64);
Invariants.require(bitsPerExecuteAtHlc <= 64);
Invariants.require(bitsPerExecuteAtFlags <= 16);
if (0 != (globalFlags & HAS_EXECUTE_AT_HEADER_BIT)) // we encode both 15 and 16 bits for flag length as 15 to fit in a short
out.putShort((short) ((bitsPerExecuteAtEpoch << 10) | ((bitsPerExecuteAtHlc-1) << 4) | (Math.min(15, bitsPerExecuteAtFlags))));
if (0 != (globalFlags & HAS_BALLOT_HEADER_BIT)) // we encode both 15 and 16 bits for flag length as 15 to fit in a short
out.putShort((short) ((bitsPerBallotEpoch << 10) | ((bitsPerBallotHlc-1) << 4) | (Math.min(15, bitsPerBallotFlags))));
long buffer = 0L;
int bufferCount = 0;
Ballot prevBallot = null;
for (int i = 0 ; i < commandCount ; ++i)
{
TxnInfo txn = cfk.get(i);
if (txn.executeAt != txn)
{
Timestamp executeAt = txn.executeAt;
int nodeIdx = Arrays.binarySearch(nodeIds, 0, nodeIdCount, executeAt.node.id);
if (bitsPerExecuteAt <= 64)
{
Invariants.require(executeAt.epoch() >= txn.epoch());
long executeAtBits = executeAt.epoch() - txn.epoch();
int offset = bitsPerExecuteAtEpoch;
executeAtBits |= (executeAt.hlc() - txn.hlc()) << offset ;
offset += bitsPerExecuteAtHlc;
executeAtBits |= ((long)executeAt.flags()) << offset;
offset += bitsPerExecuteAtFlags;
executeAtBits |= ((long)nodeIdx) << offset;
buffer = flushBits(buffer, bufferCount, executeAtBits, bitsPerExecuteAt, out);
bufferCount = (bufferCount + bitsPerExecuteAt) & 63;
}
else
{
buffer = flushBits(buffer, bufferCount, executeAt.epoch() - txn.epoch(), bitsPerExecuteAtEpoch, out);
bufferCount = (bufferCount + bitsPerExecuteAtEpoch) & 63;
buffer = flushBits(buffer, bufferCount, executeAt.hlc() - txn.hlc(), bitsPerExecuteAtHlc, out);
bufferCount = (bufferCount + bitsPerExecuteAtHlc) & 63;
buffer = flushBits(buffer, bufferCount, executeAt.flags(), bitsPerExecuteAtFlags, out);
bufferCount = (bufferCount + bitsPerExecuteAtFlags) & 63;
buffer = flushBits(buffer, bufferCount, nodeIdx, bitsPerNodeId, out);
bufferCount = (bufferCount + bitsPerNodeId) & 63;
}
}
if (txn.getClass() == TxnInfoExtra.class)
{
TxnInfoExtra extra = (TxnInfoExtra) txn;
TxnId[] missing = extra.missing;
if (missing.length > 0)
{
int j = 0;
while (j < missing.length - 1)
{
int missingId = cfk.indexOf(missing[j++]);
buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) & 63;
}
int missingId = cfk.indexOf(missing[missing.length - 1]);
missingId |= 1 << bitsPerCommandId;
buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) & 63;
}
Ballot ballot = extra.ballot;
if (ballot != Ballot.ZERO)
{
int nodeIdx = Arrays.binarySearch(nodeIds, 0, nodeIdCount, ballot.node.id);
if (prevBallot == null)
{
buffer = flushBits(buffer, bufferCount, ballot.msb, 64, out);
buffer = flushBits(buffer, bufferCount, ballot.lsb, 64, out);
buffer = flushBits(buffer, bufferCount, nodeIdx, bitsPerNodeId, out);
bufferCount = (bufferCount + bitsPerNodeId) & 63;
}
else if (bitsPerBallot <= 64)
{
long ballotBits = encodeZigZag64(ballot.epoch() - prevBallot.epoch());
int offset = bitsPerBallotEpoch;
ballotBits |= encodeZigZag64(ballot.hlc() - prevBallot.hlc()) << offset ;
offset += bitsPerBallotHlc;
ballotBits |= ((long)ballot.flags()) << offset;
offset += bitsPerBallotFlags;
ballotBits |= ((long)nodeIdx) << offset;
buffer = flushBits(buffer, bufferCount, ballotBits, bitsPerBallot, out);
bufferCount = (bufferCount + bitsPerBallot) & 63;
}
else
{
buffer = flushBits(buffer, bufferCount, encodeZigZag64(ballot.epoch() - prevBallot.epoch()), bitsPerBallotEpoch, out);
bufferCount = (bufferCount + bitsPerBallotEpoch) & 63;
buffer = flushBits(buffer, bufferCount, encodeZigZag64(ballot.hlc() - prevBallot.hlc()), bitsPerBallotHlc, out);
bufferCount = (bufferCount + bitsPerBallotHlc) & 63;
buffer = flushBits(buffer, bufferCount, ballot.flags(), bitsPerBallotFlags, out);
bufferCount = (bufferCount + bitsPerBallotFlags) & 63;
buffer = flushBits(buffer, bufferCount, nodeIdx, bitsPerNodeId, out);
bufferCount = (bufferCount + bitsPerNodeId) & 63;
}
prevBallot = ballot;
}
}
}
writeMostSignificantBytes(buffer, (bufferCount + 7)/8, out);
}
Invariants.require(!out.hasRemaining());
out.flip();
return out;
}
finally
{
cachedInts().forceDiscard(nodeIds);
}
}