public void combine()

in hollow/src/main/java/com/netflix/hollow/tools/combine/HollowCombiner.java [233:363]


    public void combine() {
        SimultaneousExecutor executor = new SimultaneousExecutor(getClass(), "combine");
        final int numThreads = executor.getCorePoolSize();

        createOrdinalRemappers();
        createHashOrderIndependentOrdinalMaps();

        final Set<String> processedTypes = new HashSet<>();
        final Set<PrimaryKey> processedPrimaryKeys = new HashSet<>();
        final Set<PrimaryKey> selectedPrimaryKeys = new HashSet<>();

        while(processedTypes.size() < output.getOrderedTypeStates().size()){

            /// find the next primary keys
            for(PrimaryKey key : primaryKeys) {
                if (!processedPrimaryKeys.contains(key) && !ignoredTypes.contains(key.getType())) {
                    if(!isAnySelectedPrimaryKeyADependencyOf(key.getType(), selectedPrimaryKeys)) {
                        selectedPrimaryKeys.add(key);
                    }
                }
            }

            final Set<String> typesToProcessThisIteration = new HashSet<>();
            final Map<String, HollowPrimaryKeyIndex[]> primaryKeyIndexes = new HashMap<>();
            final HollowCombinerExcludePrimaryKeysCopyDirector primaryKeyCopyDirector = new HollowCombinerExcludePrimaryKeysCopyDirector(copyDirector);

            for(HollowSchema schema : output.getSchemas()) {
                if(!processedTypes.contains(schema.getName()) && !ignoredTypes.contains(schema.getName())) {
                    if(selectedPrimaryKeys.isEmpty() || isAnySelectedPrimaryKeyDependentOn(schema.getName(), selectedPrimaryKeys)) {
                        for(PrimaryKey pk : selectedPrimaryKeys) {
                            if(pk.getType().equals(schema.getName())) {
                                HollowPrimaryKeyIndex[] indexes = new HollowPrimaryKeyIndex[inputs.length];
                                for(int i=0;i<indexes.length;i++) {
                                    if(inputs[i].getTypeState(pk.getType()) != null)
                                        indexes[i] = new HollowPrimaryKeyIndex(inputs[i], pk);
                                }

                                for(int i=0;i<indexes.length;i++) {
                                    HollowTypeReadState typeState = inputs[i].getTypeState(pk.getType());
                                    if(typeState != null) {
                                        BitSet populatedOrdinals = typeState.getListener(PopulatedOrdinalListener.class).getPopulatedOrdinals();
    
                                        int ordinal = populatedOrdinals.nextSetBit(0);
                                        while(ordinal != -1) {
                                            if(primaryKeyCopyDirector.shouldCopy(typeState, ordinal)) {
                                                Object[] recordKey = indexes[i].getRecordKey(ordinal);
    
                                                for(int j=i+1;j<indexes.length;j++) {
                                                    primaryKeyCopyDirector.excludeKey(indexes[j], recordKey);
                                                }
                                            }
    
                                            ordinal = populatedOrdinals.nextSetBit(ordinal + 1);
                                        }
                                    }
                                }

                                primaryKeyIndexes.put(pk.getType(), indexes);
                            }
                        }

                        typesToProcessThisIteration.add(schema.getName());
                    }
                }
            }

            if(typesToProcessThisIteration.isEmpty())
                break;

            for(int i=0;i<numThreads;i++) {
                final int threadNumber = i;
                executor.execute(() -> {
                    for(int i1 =0; i1 <inputs.length; i1++) {
                        HollowCombinerCopyDirector copyDirector = selectedPrimaryKeys.isEmpty()
                                ? HollowCombiner.this.copyDirector
                                : primaryKeyCopyDirector;

                        HollowReadStateEngine inputEngine = inputs[i1];
                        OrdinalRemapper ordinalRemapper = selectedPrimaryKeys.isEmpty()
                                ? ordinalRemappers[i1]
                                : new HollowCombinerPrimaryKeyOrdinalRemapper(ordinalRemappers, primaryKeyIndexes, i1);

                        Map<String, HollowCombinerCopier> copierMap = new HashMap<>();
                        List<HollowCombinerCopier> copierList = new ArrayList<>();

                        for(String typeName : typesToProcessThisIteration) {
                            HollowTypeReadState readState = inputEngine.getTypeState(typeName);
                            HollowTypeWriteState writeState = output.getTypeState(typeName);
                            if (readState != null && writeState != null) {
                                HollowCombinerCopier copier = new HollowCombinerCopier(readState, writeState, ordinalRemapper);
                                copierList.add(copier);
                                copierMap.put(typeName, copier);
                            }
                        }

                        for(String typeName : processedTypes) {
                            HollowTypeReadState readState = inputEngine.getTypeState(typeName);
                            HollowTypeWriteState writeState = output.getTypeState(typeName);
                            if (readState != null && writeState != null) {
                                HollowCombinerCopier copier = new HollowCombinerCopier(readState, writeState, ordinalRemappers[i1]);
                                copierMap.put(typeName, copier);
                            }
                        }

                        copiersPerType.set(copierMap);

                        int currentOrdinal = threadNumber;

                        while(!copierList.isEmpty()) {
                            copyOrdinalForAllStates(currentOrdinal, copierList, ordinalRemapper, copyDirector);

                            currentOrdinal += numThreads;
                        }

                    }
                });
            }

            try {
                executor.awaitSuccessfulCompletionOfCurrentTasks();
            } catch(Throwable th) {
                throw new RuntimeException(th);
            }

            processedTypes.addAll(typesToProcessThisIteration);
            processedPrimaryKeys.addAll(selectedPrimaryKeys);
            selectedPrimaryKeys.clear();
        }

        executor.shutdown();
    }