in hollow/src/main/java/com/netflix/hollow/tools/filter/FilteredHollowBlobWriter.java [200:308]
private void copyFilteredObjectState(boolean delta, HollowBlobInput in, FilteredHollowBlobWriterStreamAndFilter[] streamAndFilters, HollowObjectSchema schema, int numShards) throws IOException {
DataOutputStream[] os = streamsOnly(streamAndFilters);
HollowObjectSchema[] filteredObjectSchemas = new HollowObjectSchema[os.length];
for(int i=0;i<streamAndFilters.length;i++) {
HollowObjectSchema filteredObjectSchema = getFilteredObjectSchema(schema, streamAndFilters[i].getConfig());
filteredObjectSchemas[i] = filteredObjectSchema;
filteredObjectSchema.writeTo(streamAndFilters[i].getStream());
VarInt.writeVInt(streamAndFilters[i].getStream(), 1 + VarInt.sizeOfVInt(numShards));
VarInt.writeVInt(streamAndFilters[i].getStream(), 0); /// forwards compatibility
VarInt.writeVInt(streamAndFilters[i].getStream(), numShards);
}
if(numShards > 1)
copyVInt(in, os);
for(int shard=0;shard<numShards;shard++) {
int maxShardOrdinal = copyVInt(in, os);
int numRecordsToCopy = maxShardOrdinal + 1;
if(delta) {
GapEncodedVariableLengthIntegerReader.copyEncodedDeltaOrdinals(in, os);
GapEncodedVariableLengthIntegerReader addedOrdinals = GapEncodedVariableLengthIntegerReader.readEncodedDeltaOrdinals(in, memoryRecycler);
numRecordsToCopy = addedOrdinals.remainingElements();
for(DataOutputStream stream : os)
addedOrdinals.writeTo(stream);
}
/// SETUP ///
int bitsPerField[] = new int[schema.numFields()];
for(int i=0;i<schema.numFields();i++)
bitsPerField[i] = VarInt.readVInt(in);
FixedLengthElementArray fixedLengthArraysPerStream[] = new FixedLengthElementArray[os.length];
long bitsRequiredPerStream[] = new long[os.length];
List<FixedLengthArrayWriter> fixedLengthArraysPerField[] = (List<FixedLengthArrayWriter>[])new List[schema.numFields()];
for(int i=0;i<fixedLengthArraysPerField.length;i++)
fixedLengthArraysPerField[i] = new ArrayList<FixedLengthArrayWriter>();
for(int i=0;i<streamAndFilters.length;i++) {
long bitsPerRecord = writeBitsPerField(schema, bitsPerField, filteredObjectSchemas[i], streamAndFilters[i].getStream());
bitsRequiredPerStream[i] = bitsPerRecord * numRecordsToCopy;
fixedLengthArraysPerStream[i] = new FixedLengthElementArray(memoryRecycler, bitsRequiredPerStream[i]);
FixedLengthArrayWriter filteredArrayWriter = new FixedLengthArrayWriter(fixedLengthArraysPerStream[i]);
for(int j=0;j<schema.numFields();j++) {
if(filteredObjectSchemas[i].getPosition(schema.getFieldName(j)) != -1) {
fixedLengthArraysPerField[j].add(filteredArrayWriter);
}
}
}
/// END SETUP ///
/// read the unfiltered long array into memory
FixedLengthElementArray unfilteredFixedLengthFields = FixedLengthElementArray.newFrom(in, memoryRecycler);
/// populate the filtered arrays (each field just gets written to all FixedLengthArrayWriters assigned to its field index)
long bitsPerRecord = 0;
for(int fieldBits : bitsPerField)
bitsPerRecord += fieldBits;
long stopBit = bitsPerRecord * numRecordsToCopy;
long bitCursor = 0;
int fieldCursor = 0;
while(bitCursor < stopBit) {
if(!fixedLengthArraysPerField[fieldCursor].isEmpty()) {
long fieldValue = bitsPerField[fieldCursor] > 56 ?
unfilteredFixedLengthFields.getLargeElementValue(bitCursor, bitsPerField[fieldCursor])
: unfilteredFixedLengthFields.getElementValue(bitCursor, bitsPerField[fieldCursor]);
for(int i=0;i<fixedLengthArraysPerField[fieldCursor].size();i++)
fixedLengthArraysPerField[fieldCursor].get(i).writeField(fieldValue, bitsPerField[fieldCursor]);
}
bitCursor += bitsPerField[fieldCursor];
if(++fieldCursor == schema.numFields())
fieldCursor = 0;
}
/// write the filtered arrays
for(int i=0;i<os.length;i++) {
long numLongsRequired = bitsRequiredPerStream[i] == 0 ? 0 : ((bitsRequiredPerStream[i] - 1) / 64) + 1;
fixedLengthArraysPerStream[i].writeTo(os[i], numLongsRequired);
}
/// copy the var length arrays for populated fields
for(int i=0;i<schema.numFields();i++) {
List<DataOutputStream> streamsWithFieldList = new ArrayList<DataOutputStream>();
for(int j=0;j<streamAndFilters.length;j++) {
ObjectFilterConfig objectTypeConfig = streamAndFilters[j].getConfig().getObjectTypeConfig(schema.getName());
if(objectTypeConfig.includesField(schema.getFieldName(i)))
streamsWithFieldList.add(streamAndFilters[j].getStream());
}
DataOutputStream streamsWithField[] = new DataOutputStream[streamsWithFieldList.size()];
streamsWithField = streamsWithFieldList.toArray(streamsWithField);
long numBytesInVarLengthData = IOUtils.copyVLong(in, streamsWithField);
IOUtils.copyBytes(in, streamsWithField, numBytesInVarLengthData);
}
}
if(!delta)
copySnapshotPopulatedOrdinals(in, os);
}