in accord-core/src/main/java/accord/local/CommandStores.java [334:402]
private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology, boolean startSync)
{
checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");
long epoch = newTopology.epoch();
if (epoch <= prev.global.epoch())
return new TopologyUpdate(prev, () -> done(epoch));
Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
Ranges added = newLocalTopology.ranges().difference(prev.local.ranges());
Ranges subtracted = prev.local.ranges().difference(newLocalTopology.ranges());
if (added.isEmpty() && subtracted.isEmpty())
{
Supplier<EpochReady> epochReady = () -> done(epoch);
// even though we haven't changed our replication, we need to check if the membership of our shard has changed
if (newLocalTopology.shards().equals(prev.local.shards()))
return new TopologyUpdate(new Snapshot(prev.shards, newLocalTopology, newTopology), epochReady);
// if it has, we still need to make sure we have witnessed the transactions of the majority of prior epoch
// which we do by fetching deps and replicating them to CommandsForKey/historicalRangeCommands
}
List<Supplier<EpochReady>> bootstrapUpdates = new ArrayList<>();
List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
for (ShardHolder shard : prev.shards)
{
if (subtracted.intersects(shard.ranges().currentRanges()))
{
RangesForEpoch newRanges = shard.ranges().withRanges(newTopology.epoch(), shard.ranges().currentRanges().difference(subtracted));
shard.ranges.current = newRanges;
bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges()));
}
// TODO (desired): only sync affected shards
Ranges ranges = shard.ranges().currentRanges();
// ranges can be empty when ranges are lost or consolidated across epochs.
if (epoch > 1 && startSync && !ranges.isEmpty())
bootstrapUpdates.add(shard.store.sync(node, ranges, epoch));
result.add(shard);
}
if (!added.isEmpty())
{
// TODO (required): shards must rebalance
for (Ranges add : shardDistributor.split(added))
{
RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
ShardHolder shardHolder = new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder);
rangesHolder.current = new RangesForEpoch(epoch, add, shardHolder.store);
Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range));
if (partitioned.containsKey(true))
bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch()));
if (partitioned.containsKey(false))
bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, partitioned.get(false)));
result.add(shardHolder);
}
}
Supplier<EpochReady> bootstrap = bootstrapUpdates.isEmpty() ? () -> done(epoch) : () -> {
List<EpochReady> list = bootstrapUpdates.stream().map(Supplier::get).collect(toList());
return new EpochReady(epoch,
AsyncChains.reduce(list.stream().map(b -> b.metadata).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.coordination).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.data).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.reads).collect(toList()), (a, b) -> null).beginAsResult()
);
};
return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
}