in src/accumulator.cc [485:641]
void update() {
rpc::AutoGradMode ng(false);
if (shouldUpdateGroup_) {
groupService->update(*group, 0, 10 * 1000);
}
glock l(h->mutex);
auto now = std::chrono::steady_clock::now();
bool logit = now - lastlog >= std::chrono::seconds(5);
if (logit) {
lastlog = now;
log.debug("DEBUG syncUpdate()\n");
}
auto result = [&](auto&& r) {
if (r) {
if (r->tryCall()) {
r = {};
return true;
}
}
return false;
};
if (result(findLeaderResult)) {
if (syncLeader == myName) {
log.verbose("I am now the leader!\n");
}
}
if (!hasGradients_ && !isCopyingGradients) {
auto& v = gradientReductions[nextGradientReductionResultIndex];
if (v) {
result(&v->result);
if (nextGradientReductionResultIndex == gradientReductions.size() - 1) {
nextGradientReductionResultIndex = 0;
} else {
++nextGradientReductionResultIndex;
}
}
}
result(requestModelResult);
if (h->syncId != group->syncId) {
h->syncId = group->syncId;
syncLeader.clear();
findLeaderResult = {};
for (auto& v : gradientReductions) {
v = {};
}
nextGradientReductionIndex = 0;
nextGradientReductionResultIndex = 0;
requestModelResult = {};
h->requestedModelUpdate.clear();
h->haveNewParameters = false;
hasNewUserState_ = false;
wantsUserState_ = false;
isFindingLeader = true;
isWaitingForModel = false;
hasGradients_ = false;
members_.clear();
if (h->syncId == 0) {
log.verbose("Sync failure, could not join group\n", h->syncId);
} else {
log.verbose("Sync %#x success, finding leader\n", h->syncId);
findLeaderResult = std::make_shared<ResultCallback>();
AccumulatorFindLeaderType data;
data.modelVersion = h->modelVersion;
data.name = myName;
findLeaderReduce = allReduceService->allReduce(
group, "Accumulator::findLeader", std::move(data),
[&](AccumulatorFindLeaderType& a, AccumulatorFindLeaderType& b) {
if (std::tie(a.modelVersion, a.name) < std::tie(b.modelVersion, b.name)) {
std::swap(a, b);
}
},
[findLeaderResult = this->findLeaderResult,
this](AccumulatorFindLeaderType* v, [[maybe_unused]] rpc::Error* error) {
if (v) {
*findLeaderResult = [this, modelVersion = v->modelVersion,
leader = std::string(v->name)]() mutable noexcept {
isFindingLeader = false;
if (syncLeader.empty()) {
log.verbose("Leader is %s\n", leader);
} else if (syncLeader == leader) {
log.verbose("Leader is still %s\n", leader);
} else {
log.verbose("Leader changed from %s to %s\n", syncLeader, leader);
}
syncLeader = std::move(leader);
{
std::lock_guard l(group->mutex);
members_ = group->members;
}
log.verbose("Group has %d members\n", members_.size());
if (modelVersion != h->modelVersion || !hasReceivedModel_) {
requestModel();
}
};
} else {
log.error("findLeader failed with error %s\n", error->what());
*findLeaderResult = [this]() mutable {
if (h->syncId == group->syncId) {
resync();
}
};
}
});
}
}
if (h->haveNewParameters) {
if (!isWaitingForModel && h->modelVersion != h->newModelVersion) {
h->haveNewParameters = false;
log.debug(
"ignoring unexpected new params due to modelVersion mismatch (got %d, expected %d)\n", h->newModelVersion,
h->modelVersion);
} else {
log.debug("DEBUG new params\n");
commitModelUpdate();
isWaitingForModel = false;
}
} else if (isWaitingForModel && now - isWaitingForModelTimestamp >= std::chrono::seconds(10)) {
log.debug("Timed out waiting for model, retrying\n");
requestModel();
} else if (
!isWaitingForModel && connectedImpl() && syncLeader != myName &&
now - lastReceivedModel >= std::chrono::minutes(2)) {
log.verbose("EMERGENCY RESYNC TO GET MODEL UPDATE!\n");
lastReceivedModel = now;
resync();
}
if (h->haveNewBuffers) {
commitBuffersUpdate();
}
if (!members_.empty()) {
sendModelUpdates();
}
if (logit) {
log.debug("DEBUG connected: %d\n", connectedImpl());
log.debug("DEBUG isWaitingForModel: %d\n", isWaitingForModel);
log.debug("DEBUG isFindingLeader: %d\n", isFindingLeader);
log.debug("DEBUG isCopyingGradients: %d\n", isCopyingGradients);
log.debug("DEBUG nextGradientReductionIndex: %d\n", nextGradientReductionIndex);
log.debug("DEBUG nextGradientReductionResultIndex: %d\n", nextGradientReductionResultIndex);
}
}