private synchronized TopologyUpdate updateTopology()

in accord-core/src/main/java/accord/local/CommandStores.java [334:402]


    private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology, boolean startSync)
    {
        checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");

        long epoch = newTopology.epoch();
        if (epoch <= prev.global.epoch())
            return new TopologyUpdate(prev, () -> done(epoch));

        Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
        Ranges added = newLocalTopology.ranges().difference(prev.local.ranges());
        Ranges subtracted = prev.local.ranges().difference(newLocalTopology.ranges());

        if (added.isEmpty() && subtracted.isEmpty())
        {
            Supplier<EpochReady> epochReady = () -> done(epoch);
            // even though we haven't changed our replication, we need to check if the membership of our shard has changed
            if (newLocalTopology.shards().equals(prev.local.shards()))
                return new TopologyUpdate(new Snapshot(prev.shards, newLocalTopology, newTopology), epochReady);
            // if it has, we still need to make sure we have witnessed the transactions of the majority of prior epoch
            // which we do by fetching deps and replicating them to CommandsForKey/historicalRangeCommands
        }

        List<Supplier<EpochReady>> bootstrapUpdates = new ArrayList<>();
        List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
        for (ShardHolder shard : prev.shards)
        {
            if (subtracted.intersects(shard.ranges().currentRanges()))
            {
                RangesForEpoch newRanges = shard.ranges().withRanges(newTopology.epoch(), shard.ranges().currentRanges().difference(subtracted));
                shard.ranges.current = newRanges;
                bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges()));
            }
            // TODO (desired): only sync affected shards
            Ranges ranges = shard.ranges().currentRanges();
            // ranges can be empty when ranges are lost or consolidated across epochs.
            if (epoch > 1 && startSync && !ranges.isEmpty())
                bootstrapUpdates.add(shard.store.sync(node, ranges, epoch));
            result.add(shard);
        }

        if (!added.isEmpty())
        {
            // TODO (required): shards must rebalance
            for (Ranges add : shardDistributor.split(added))
            {
                RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
                ShardHolder shardHolder = new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder);
                rangesHolder.current = new RangesForEpoch(epoch, add, shardHolder.store);

                Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range));
                if (partitioned.containsKey(true))
                    bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch()));
                if (partitioned.containsKey(false))
                    bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, partitioned.get(false)));
                result.add(shardHolder);
            }
        }

        Supplier<EpochReady> bootstrap = bootstrapUpdates.isEmpty() ? () -> done(epoch) : () -> {
            List<EpochReady> list = bootstrapUpdates.stream().map(Supplier::get).collect(toList());
            return new EpochReady(epoch,
                AsyncChains.reduce(list.stream().map(b -> b.metadata).collect(toList()), (a, b) -> null).beginAsResult(),
                AsyncChains.reduce(list.stream().map(b -> b.coordination).collect(toList()), (a, b) -> null).beginAsResult(),
                AsyncChains.reduce(list.stream().map(b -> b.data).collect(toList()), (a, b) -> null).beginAsResult(),
                AsyncChains.reduce(list.stream().map(b -> b.reads).collect(toList()), (a, b) -> null).beginAsResult()
            );
        };
        return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
    }