in accord-core/src/main/java/accord/local/CommandStores.java [508:584]
private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology, boolean startSync)
{
Invariants.requireArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");
long epoch = newTopology.epoch();
if (epoch <= prev.global.epoch())
return new TopologyUpdate(prev, () -> done(epoch));
Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
Ranges addedGlobal = newTopology.ranges().without(prev.global.ranges());
node.addNewRangesToDurableBefore(addedGlobal, epoch);
Ranges added = newLocalTopology.ranges().without(prev.local.ranges());
Ranges subtracted = prev.local.ranges().without(newLocalTopology.ranges());
if (added.isEmpty() && subtracted.isEmpty())
{
Supplier<EpochReady> epochReady = () -> done(epoch);
// even though we haven't changed our replication, we need to check if the membership of our shard has changed
if (newLocalTopology.shards().equals(prev.local.shards()))
return new TopologyUpdate(new Snapshot(prev.shards, newLocalTopology, newTopology), epochReady);
// if it has, we still need to make sure we have witnessed the transactions of the majority of prior epoch
// which we do by fetching deps and replicating them to CommandsForKey/historicalRangeCommands
}
List<Supplier<EpochReady>> bootstrapUpdates = new ArrayList<>();
List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
for (ShardHolder shard : prev.shards)
{
Ranges current = shard.ranges().currentRanges();
Ranges removeRanges = subtracted.slice(current, Minimal);
if (!removeRanges.isEmpty())
{
// TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion
shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted));
shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges);
bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges));
}
// TODO (desired): only sync affected shards
Ranges ranges = shard.ranges().currentRanges();
// ranges can be empty when ranges are lost or consolidated across epochs.
if (epoch > 1 && startSync && requiresSync(ranges, prev.global, newTopology))
{
bootstrapUpdates.add(shard.store.sync(node, ranges, epoch));
}
result.add(shard);
}
if (!added.isEmpty())
{
for (Ranges addRanges : shardDistributor.split(added))
{
EpochUpdateHolder updateHolder = new EpochUpdateHolder();
RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, addRanges);
updateHolder.add(epoch, rangesForEpoch, addRanges);
ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder));
shard.ranges = rangesForEpoch;
Map<Boolean, Ranges> partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range));
if (partitioned.containsKey(false))
bootstrapUpdates.add(shard.store.initialise(epoch, partitioned.get(false)));
if (partitioned.containsKey(true))
bootstrapUpdates.add(shard.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch()));
result.add(shard);
}
}
Supplier<EpochReady> bootstrap = bootstrapUpdates.isEmpty() ? () -> done(epoch) : () -> {
List<EpochReady> list = bootstrapUpdates.stream().map(Supplier::get).collect(toList());
return new EpochReady(epoch,
AsyncChains.reduce(list.stream().map(b -> b.metadata).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.coordinate).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.data).collect(toList()), (a, b) -> null).beginAsResult(),
AsyncChains.reduce(list.stream().map(b -> b.reads).collect(toList()), (a, b) -> null).beginAsResult()
);
};
return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
}