public void calculateDelta()

in hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java [343:461]


    public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) {
        int numShards = this.numShards;
        int bitsPerMapPointer = this.bitsPerMapPointer;
        if (isReverse && this.numShards != this.revNumShards) {
            numShards = this.revNumShards;
            bitsPerMapPointer = this.revBitsPerMapPointer;
        }

        int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
        int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;

        numMapsInDelta = new int[numShards];
        numBucketsInDelta = new long[numShards];
        mapPointersAndSizesArray = new FixedLengthElementArray[numShards];
        entryData = new FixedLengthElementArray[numShards];
        deltaAddedOrdinals = new ByteDataArray[numShards];
        deltaRemovedOrdinals = new ByteDataArray[numShards];
        
        ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);
        
        int shardMask = numShards - 1;
        
        int addedOrdinal = deltaAdditions.nextSetBit(0);
        while(addedOrdinal != -1) {
            numMapsInDelta[addedOrdinal & shardMask]++;
            long readPointer = ordinalMap.getPointerForData(addedOrdinal);
            int size = VarInt.readVInt(ordinalMap.getByteData().getUnderlyingArray(), readPointer);
            numBucketsInDelta[addedOrdinal & shardMask] += HashCodes.hashTableSize(size);

            addedOrdinal = deltaAdditions.nextSetBit(addedOrdinal + 1);
        }

        for(int i=0;i<numShards;i++) {
            mapPointersAndSizesArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)numMapsInDelta[i] * bitsPerMapFixedLengthPortion);
            entryData[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, numBucketsInDelta[i] * bitsPerMapEntry);
            deltaAddedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
            deltaRemovedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
        }

        ByteData data = ordinalMap.getByteData().getUnderlyingArray();

        int mapCounter[] = new int[numShards];
        long bucketCounter[] = new long[numShards];
        int previousRemovedOrdinal[] = new int[numShards];
        int previousAddedOrdinal[] = new int[numShards];
        
        HollowWriteStateEnginePrimaryKeyHasher primaryKeyHasher = null;

        if(getSchema().getHashKey() != null) {
            try {
                primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine());
            } catch (FieldPaths.FieldPathException e) {
                if (e.error == NOT_BINDABLE) {
                    LOG.log(Level.WARNING, "Failed to create a key hasher for " + getSchema().getHashKey() +
                        " because a field could not be bound to a type in the state");
                } else {
                    throw e;
                }
            }
        }

        for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
            int shardNumber = ordinal & shardMask;
            if(deltaAdditions.get(ordinal)) {
                long readPointer = ordinalMap.getPointerForData(ordinal);

                int size = VarInt.readVInt(data, readPointer);
                readPointer += VarInt.sizeOfVInt(size);

                int numBuckets = HashCodes.hashTableSize(size);

                long endBucketPosition = bucketCounter[shardNumber] + numBuckets;

                mapPointersAndSizesArray[shardNumber].setElementValue((long)bitsPerMapFixedLengthPortion * mapCounter[shardNumber], bitsPerMapPointer, endBucketPosition);
                mapPointersAndSizesArray[shardNumber].setElementValue(((long)bitsPerMapFixedLengthPortion * mapCounter[shardNumber]) + bitsPerMapPointer, bitsPerMapSizeValue, size);

                int keyElementOrdinal = 0;

                for(int j=0;j<numBuckets;j++) {
                    entryData[shardNumber].setElementValue((long)bitsPerMapEntry * (bucketCounter[shardNumber] + j), bitsPerKeyElement, (1L << bitsPerKeyElement) - 1);
                }

                for(int j=0;j<size;j++) {
                    int keyElementOrdinalDelta = VarInt.readVInt(data, readPointer);
                    readPointer += VarInt.sizeOfVInt(keyElementOrdinalDelta);
                    int valueElementOrdinal = VarInt.readVInt(data, readPointer);
                    readPointer += VarInt.sizeOfVInt(valueElementOrdinal);
                    int hashedBucket = VarInt.readVInt(data, readPointer);
                    readPointer += VarInt.sizeOfVInt(hashedBucket);

                    keyElementOrdinal += keyElementOrdinalDelta;

                    if(primaryKeyHasher != null)
                        hashedBucket = primaryKeyHasher.getRecordHash(keyElementOrdinal) & (numBuckets - 1);

                    while(entryData[shardNumber].getElementValue((long)bitsPerMapEntry * (bucketCounter[shardNumber] + hashedBucket), bitsPerKeyElement) != ((1L << bitsPerKeyElement) - 1)) {
                        hashedBucket++;
                        hashedBucket &= (numBuckets - 1);
                    }

                    long mapEntryBitOffset = (long)bitsPerMapEntry * (bucketCounter[shardNumber] + hashedBucket);
                    entryData[shardNumber].clearElementValue(mapEntryBitOffset, bitsPerMapEntry);
                    entryData[shardNumber].setElementValue(mapEntryBitOffset, bitsPerKeyElement, keyElementOrdinal);
                    entryData[shardNumber].setElementValue(mapEntryBitOffset + bitsPerKeyElement, bitsPerValueElement, valueElementOrdinal);
                }

                bucketCounter[shardNumber] += numBuckets;
                mapCounter[shardNumber]++;

                int shardOrdinal = ordinal / numShards;
                VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
                previousAddedOrdinal[shardNumber] = shardOrdinal;
            } else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) {
                int shardOrdinal = ordinal / numShards;
                VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
                previousRemovedOrdinal[shardNumber] = shardOrdinal;
            }
        }
    }