in src/group.h [380:476]
bool update(GroupInfo& group, uint32_t sortOrder, uint32_t timeoutMilliseconds) {
auto now = std::chrono::steady_clock::now();
auto pingNow = [&]() {
group.lastPing = now;
group.hasPinged = true;
group.pingFuture = callImpl<uint32_t>(
*rpc, group.brokerName, "BrokerService::ping", group.name, rpc->getName(), timeoutMilliseconds);
};
std::lock_guard l(group.mutex);
group.sortOrder = sortOrder;
bool updated = group.isResyncing && group.haveUpdate;
if (updated) {
group.wantsResync = false;
group.isResyncing = false;
group.haveUpdate = false;
group.syncId = group.newSyncId;
group.members = std::move(group.newMembers);
pingNow();
} else if (group.wantsResync && !group.isResyncing) {
if (group.resyncError) {
log.verbose("Error during resync, trying again");
group.resyncError = false;
resync(group);
}
}
if (!group.hasPinged || updated ||
now - group.lastPing >=
std::min(std::chrono::milliseconds(4000), std::chrono::milliseconds(timeoutMilliseconds) / 2)) {
if (group.hasPinged && !group.pingFuture) {
if (now - group.lastPingResponse >= std::chrono::seconds(8)) {
log.verbose("Broker has not responded in %g seconds!\n", seconds(now - group.lastPingResponse));
}
if (group.brokerConnectionIsActive &&
now - group.lastPingResponse >= std::chrono::seconds(1) + std::chrono::milliseconds(timeoutMilliseconds)) {
group.brokerConnectionIsActive = false;
group.members.clear();
group.syncId = 0;
resync(group);
updated = true;
}
} else {
group.brokerConnectionIsActive = true;
group.lastPingResponse = now;
}
if (group.pingFuture && *group.pingFuture != group.syncId) {
group.members.clear();
group.syncId = 0;
resync(group);
updated = true;
}
pingNow();
}
if (updated) {
for (auto& wh : group.activeAllReductions) {
auto h = wh.lock();
if (h && h->flags.load(std::memory_order_relaxed) == 0) {
h->setException(rpc::Error("AllReduce operation cancelled due to a group change"));
}
}
group.activeAllReductions.clear();
} else {
auto timeout = rpc->getTimeout();
auto now = std::chrono::steady_clock::now();
bool shouldResync = false;
group.activeAllReductions.erase(
std::remove_if(
group.activeAllReductions.begin(), group.activeAllReductions.end(),
[&](auto& wh) {
auto h = wh.lock();
if (!h) {
return true;
}
if (h->flags.load(std::memory_order_relaxed)) {
return true;
}
if (now >= h->timestamp + timeout) {
shouldResync = true;
h->setException(rpc::Error("AllReduce operation timed out"));
return true;
}
return false;
}),
group.activeAllReductions.end());
if (shouldResync) {
resync(group);
}
}
return updated;
}