in hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java [343:461]
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) {
int numShards = this.numShards;
int bitsPerMapPointer = this.bitsPerMapPointer;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
bitsPerMapPointer = this.revBitsPerMapPointer;
}
int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;
numMapsInDelta = new int[numShards];
numBucketsInDelta = new long[numShards];
mapPointersAndSizesArray = new FixedLengthElementArray[numShards];
entryData = new FixedLengthElementArray[numShards];
deltaAddedOrdinals = new ByteDataArray[numShards];
deltaRemovedOrdinals = new ByteDataArray[numShards];
ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);
int shardMask = numShards - 1;
int addedOrdinal = deltaAdditions.nextSetBit(0);
while(addedOrdinal != -1) {
numMapsInDelta[addedOrdinal & shardMask]++;
long readPointer = ordinalMap.getPointerForData(addedOrdinal);
int size = VarInt.readVInt(ordinalMap.getByteData().getUnderlyingArray(), readPointer);
numBucketsInDelta[addedOrdinal & shardMask] += HashCodes.hashTableSize(size);
addedOrdinal = deltaAdditions.nextSetBit(addedOrdinal + 1);
}
for(int i=0;i<numShards;i++) {
mapPointersAndSizesArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)numMapsInDelta[i] * bitsPerMapFixedLengthPortion);
entryData[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, numBucketsInDelta[i] * bitsPerMapEntry);
deltaAddedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
deltaRemovedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
}
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
int mapCounter[] = new int[numShards];
long bucketCounter[] = new long[numShards];
int previousRemovedOrdinal[] = new int[numShards];
int previousAddedOrdinal[] = new int[numShards];
HollowWriteStateEnginePrimaryKeyHasher primaryKeyHasher = null;
if(getSchema().getHashKey() != null) {
try {
primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine());
} catch (FieldPaths.FieldPathException e) {
if (e.error == NOT_BINDABLE) {
LOG.log(Level.WARNING, "Failed to create a key hasher for " + getSchema().getHashKey() +
" because a field could not be bound to a type in the state");
} else {
throw e;
}
}
}
for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
int shardNumber = ordinal & shardMask;
if(deltaAdditions.get(ordinal)) {
long readPointer = ordinalMap.getPointerForData(ordinal);
int size = VarInt.readVInt(data, readPointer);
readPointer += VarInt.sizeOfVInt(size);
int numBuckets = HashCodes.hashTableSize(size);
long endBucketPosition = bucketCounter[shardNumber] + numBuckets;
mapPointersAndSizesArray[shardNumber].setElementValue((long)bitsPerMapFixedLengthPortion * mapCounter[shardNumber], bitsPerMapPointer, endBucketPosition);
mapPointersAndSizesArray[shardNumber].setElementValue(((long)bitsPerMapFixedLengthPortion * mapCounter[shardNumber]) + bitsPerMapPointer, bitsPerMapSizeValue, size);
int keyElementOrdinal = 0;
for(int j=0;j<numBuckets;j++) {
entryData[shardNumber].setElementValue((long)bitsPerMapEntry * (bucketCounter[shardNumber] + j), bitsPerKeyElement, (1L << bitsPerKeyElement) - 1);
}
for(int j=0;j<size;j++) {
int keyElementOrdinalDelta = VarInt.readVInt(data, readPointer);
readPointer += VarInt.sizeOfVInt(keyElementOrdinalDelta);
int valueElementOrdinal = VarInt.readVInt(data, readPointer);
readPointer += VarInt.sizeOfVInt(valueElementOrdinal);
int hashedBucket = VarInt.readVInt(data, readPointer);
readPointer += VarInt.sizeOfVInt(hashedBucket);
keyElementOrdinal += keyElementOrdinalDelta;
if(primaryKeyHasher != null)
hashedBucket = primaryKeyHasher.getRecordHash(keyElementOrdinal) & (numBuckets - 1);
while(entryData[shardNumber].getElementValue((long)bitsPerMapEntry * (bucketCounter[shardNumber] + hashedBucket), bitsPerKeyElement) != ((1L << bitsPerKeyElement) - 1)) {
hashedBucket++;
hashedBucket &= (numBuckets - 1);
}
long mapEntryBitOffset = (long)bitsPerMapEntry * (bucketCounter[shardNumber] + hashedBucket);
entryData[shardNumber].clearElementValue(mapEntryBitOffset, bitsPerMapEntry);
entryData[shardNumber].setElementValue(mapEntryBitOffset, bitsPerKeyElement, keyElementOrdinal);
entryData[shardNumber].setElementValue(mapEntryBitOffset + bitsPerKeyElement, bitsPerValueElement, valueElementOrdinal);
}
bucketCounter[shardNumber] += numBuckets;
mapCounter[shardNumber]++;
int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) {
int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}
}