private void copyFilteredObjectState()

in hollow/src/main/java/com/netflix/hollow/tools/filter/FilteredHollowBlobWriter.java [200:308]


    private void copyFilteredObjectState(boolean delta, HollowBlobInput in, FilteredHollowBlobWriterStreamAndFilter[] streamAndFilters, HollowObjectSchema schema, int numShards) throws IOException {
        DataOutputStream[] os = streamsOnly(streamAndFilters);
        HollowObjectSchema[] filteredObjectSchemas = new HollowObjectSchema[os.length];

        for(int i=0;i<streamAndFilters.length;i++) {
            HollowObjectSchema filteredObjectSchema = getFilteredObjectSchema(schema, streamAndFilters[i].getConfig());
            filteredObjectSchemas[i] = filteredObjectSchema;
            filteredObjectSchema.writeTo(streamAndFilters[i].getStream());
            
            VarInt.writeVInt(streamAndFilters[i].getStream(), 1 + VarInt.sizeOfVInt(numShards));
            VarInt.writeVInt(streamAndFilters[i].getStream(), 0); /// forwards compatibility
            VarInt.writeVInt(streamAndFilters[i].getStream(), numShards);
        }

        if(numShards > 1)
            copyVInt(in, os);
        
        for(int shard=0;shard<numShards;shard++) {
            int maxShardOrdinal = copyVInt(in, os);
            int numRecordsToCopy = maxShardOrdinal + 1;
    
            if(delta) {
                GapEncodedVariableLengthIntegerReader.copyEncodedDeltaOrdinals(in, os);
                GapEncodedVariableLengthIntegerReader addedOrdinals = GapEncodedVariableLengthIntegerReader.readEncodedDeltaOrdinals(in, memoryRecycler);
                numRecordsToCopy = addedOrdinals.remainingElements();
                for(DataOutputStream stream : os)
                    addedOrdinals.writeTo(stream);
            }
    
            /// SETUP ///
            int bitsPerField[] = new int[schema.numFields()];
            for(int i=0;i<schema.numFields();i++)
                bitsPerField[i] = VarInt.readVInt(in);
    
            FixedLengthElementArray fixedLengthArraysPerStream[] = new FixedLengthElementArray[os.length];
            long bitsRequiredPerStream[] = new long[os.length];
            List<FixedLengthArrayWriter> fixedLengthArraysPerField[] = (List<FixedLengthArrayWriter>[])new List[schema.numFields()];
            for(int i=0;i<fixedLengthArraysPerField.length;i++)
                fixedLengthArraysPerField[i] = new ArrayList<FixedLengthArrayWriter>();
    
            for(int i=0;i<streamAndFilters.length;i++) {
                long bitsPerRecord = writeBitsPerField(schema, bitsPerField, filteredObjectSchemas[i], streamAndFilters[i].getStream());
    
                bitsRequiredPerStream[i] = bitsPerRecord * numRecordsToCopy;
                fixedLengthArraysPerStream[i] = new FixedLengthElementArray(memoryRecycler,  bitsRequiredPerStream[i]);
                FixedLengthArrayWriter filteredArrayWriter = new FixedLengthArrayWriter(fixedLengthArraysPerStream[i]);
    
                for(int j=0;j<schema.numFields();j++) {
                    if(filteredObjectSchemas[i].getPosition(schema.getFieldName(j)) != -1) {
                        fixedLengthArraysPerField[j].add(filteredArrayWriter);
                    }
                }
            }
            /// END SETUP ///
    
            /// read the unfiltered long array into memory
            FixedLengthElementArray unfilteredFixedLengthFields = FixedLengthElementArray.newFrom(in, memoryRecycler);
    
            /// populate the filtered arrays (each field just gets written to all FixedLengthArrayWriters assigned to its field index)
            long bitsPerRecord = 0;
            for(int fieldBits : bitsPerField)
                bitsPerRecord += fieldBits;
    
            long stopBit = bitsPerRecord * numRecordsToCopy;
            long bitCursor = 0;
            int fieldCursor = 0;
    
            while(bitCursor < stopBit) {
                if(!fixedLengthArraysPerField[fieldCursor].isEmpty()) {
                    long fieldValue = bitsPerField[fieldCursor] > 56 ?
                            unfilteredFixedLengthFields.getLargeElementValue(bitCursor, bitsPerField[fieldCursor])
                            : unfilteredFixedLengthFields.getElementValue(bitCursor, bitsPerField[fieldCursor]);
    
                            for(int i=0;i<fixedLengthArraysPerField[fieldCursor].size();i++)
                                fixedLengthArraysPerField[fieldCursor].get(i).writeField(fieldValue, bitsPerField[fieldCursor]);
                }
    
                bitCursor += bitsPerField[fieldCursor];
                if(++fieldCursor == schema.numFields())
                    fieldCursor = 0;
    
            }
    
            /// write the filtered arrays
            for(int i=0;i<os.length;i++) {
                long numLongsRequired = bitsRequiredPerStream[i] == 0 ? 0 : ((bitsRequiredPerStream[i] - 1) / 64) + 1;
                fixedLengthArraysPerStream[i].writeTo(os[i], numLongsRequired);
            }
    
            /// copy the var length arrays for populated fields
            for(int i=0;i<schema.numFields();i++) {
                List<DataOutputStream> streamsWithFieldList = new ArrayList<DataOutputStream>();
                for(int j=0;j<streamAndFilters.length;j++) {
                    ObjectFilterConfig objectTypeConfig = streamAndFilters[j].getConfig().getObjectTypeConfig(schema.getName());
                    if(objectTypeConfig.includesField(schema.getFieldName(i)))
                        streamsWithFieldList.add(streamAndFilters[j].getStream());
                }
    
                DataOutputStream streamsWithField[] = new DataOutputStream[streamsWithFieldList.size()];
                streamsWithField = streamsWithFieldList.toArray(streamsWithField);
    
                long numBytesInVarLengthData = IOUtils.copyVLong(in, streamsWithField);
                IOUtils.copyBytes(in, streamsWithField, numBytesInVarLengthData);
            }
        }

        if(!delta)
            copySnapshotPopulatedOrdinals(in, os);
    }