in hollow/src/main/java/com/netflix/hollow/tools/combine/HollowCombiner.java [233:363]
public void combine() {
SimultaneousExecutor executor = new SimultaneousExecutor(getClass(), "combine");
final int numThreads = executor.getCorePoolSize();
createOrdinalRemappers();
createHashOrderIndependentOrdinalMaps();
final Set<String> processedTypes = new HashSet<>();
final Set<PrimaryKey> processedPrimaryKeys = new HashSet<>();
final Set<PrimaryKey> selectedPrimaryKeys = new HashSet<>();
while(processedTypes.size() < output.getOrderedTypeStates().size()){
/// find the next primary keys
for(PrimaryKey key : primaryKeys) {
if (!processedPrimaryKeys.contains(key) && !ignoredTypes.contains(key.getType())) {
if(!isAnySelectedPrimaryKeyADependencyOf(key.getType(), selectedPrimaryKeys)) {
selectedPrimaryKeys.add(key);
}
}
}
final Set<String> typesToProcessThisIteration = new HashSet<>();
final Map<String, HollowPrimaryKeyIndex[]> primaryKeyIndexes = new HashMap<>();
final HollowCombinerExcludePrimaryKeysCopyDirector primaryKeyCopyDirector = new HollowCombinerExcludePrimaryKeysCopyDirector(copyDirector);
for(HollowSchema schema : output.getSchemas()) {
if(!processedTypes.contains(schema.getName()) && !ignoredTypes.contains(schema.getName())) {
if(selectedPrimaryKeys.isEmpty() || isAnySelectedPrimaryKeyDependentOn(schema.getName(), selectedPrimaryKeys)) {
for(PrimaryKey pk : selectedPrimaryKeys) {
if(pk.getType().equals(schema.getName())) {
HollowPrimaryKeyIndex[] indexes = new HollowPrimaryKeyIndex[inputs.length];
for(int i=0;i<indexes.length;i++) {
if(inputs[i].getTypeState(pk.getType()) != null)
indexes[i] = new HollowPrimaryKeyIndex(inputs[i], pk);
}
for(int i=0;i<indexes.length;i++) {
HollowTypeReadState typeState = inputs[i].getTypeState(pk.getType());
if(typeState != null) {
BitSet populatedOrdinals = typeState.getListener(PopulatedOrdinalListener.class).getPopulatedOrdinals();
int ordinal = populatedOrdinals.nextSetBit(0);
while(ordinal != -1) {
if(primaryKeyCopyDirector.shouldCopy(typeState, ordinal)) {
Object[] recordKey = indexes[i].getRecordKey(ordinal);
for(int j=i+1;j<indexes.length;j++) {
primaryKeyCopyDirector.excludeKey(indexes[j], recordKey);
}
}
ordinal = populatedOrdinals.nextSetBit(ordinal + 1);
}
}
}
primaryKeyIndexes.put(pk.getType(), indexes);
}
}
typesToProcessThisIteration.add(schema.getName());
}
}
}
if(typesToProcessThisIteration.isEmpty())
break;
for(int i=0;i<numThreads;i++) {
final int threadNumber = i;
executor.execute(() -> {
for(int i1 =0; i1 <inputs.length; i1++) {
HollowCombinerCopyDirector copyDirector = selectedPrimaryKeys.isEmpty()
? HollowCombiner.this.copyDirector
: primaryKeyCopyDirector;
HollowReadStateEngine inputEngine = inputs[i1];
OrdinalRemapper ordinalRemapper = selectedPrimaryKeys.isEmpty()
? ordinalRemappers[i1]
: new HollowCombinerPrimaryKeyOrdinalRemapper(ordinalRemappers, primaryKeyIndexes, i1);
Map<String, HollowCombinerCopier> copierMap = new HashMap<>();
List<HollowCombinerCopier> copierList = new ArrayList<>();
for(String typeName : typesToProcessThisIteration) {
HollowTypeReadState readState = inputEngine.getTypeState(typeName);
HollowTypeWriteState writeState = output.getTypeState(typeName);
if (readState != null && writeState != null) {
HollowCombinerCopier copier = new HollowCombinerCopier(readState, writeState, ordinalRemapper);
copierList.add(copier);
copierMap.put(typeName, copier);
}
}
for(String typeName : processedTypes) {
HollowTypeReadState readState = inputEngine.getTypeState(typeName);
HollowTypeWriteState writeState = output.getTypeState(typeName);
if (readState != null && writeState != null) {
HollowCombinerCopier copier = new HollowCombinerCopier(readState, writeState, ordinalRemappers[i1]);
copierMap.put(typeName, copier);
}
}
copiersPerType.set(copierMap);
int currentOrdinal = threadNumber;
while(!copierList.isEmpty()) {
copyOrdinalForAllStates(currentOrdinal, copierList, ordinalRemapper, copyDirector);
currentOrdinal += numThreads;
}
}
});
}
try {
executor.awaitSuccessfulCompletionOfCurrentTasks();
} catch(Throwable th) {
throw new RuntimeException(th);
}
processedTypes.addAll(typesToProcessThisIteration);
processedPrimaryKeys.addAll(selectedPrimaryKeys);
selectedPrimaryKeys.clear();
}
executor.shutdown();
}