in tensorflow_serving/core/aspired_versions_manager.cc [253:333]
void AspiredVersionsManager::ProcessAspiredVersionsRequest(
const StringPiece servable_name,
std::vector<ServableData<std::unique_ptr<Loader>>> versions) {
VLOG(1) << "Processing aspired versions request: " << servable_name << ": "
<< ServableVersionsDebugString(versions);
const std::set<int64_t> next_aspired_versions = GetVersionNumbers(versions);
// We gather all the servables with the servable_name and
// 1. Add the current aspired version numbers to a set,
// 2. Set the aspired bool to false for all current servable harnesses which
// are not aspired.
std::set<int64_t> current_aspired_versions;
std::set<int64_t> current_aspired_versions_with_error;
const std::vector<ServableStateSnapshot<Aspired>> state_snapshots =
basic_manager_->GetManagedServableStateSnapshots<Aspired>(
string(servable_name));
for (const ServableStateSnapshot<Aspired>& state_snapshot : state_snapshots) {
if (state_snapshot.additional_state->is_aspired) {
current_aspired_versions.insert(state_snapshot.id.version);
if (state_snapshot.state == LoaderHarness::State::kError) {
current_aspired_versions_with_error.insert(state_snapshot.id.version);
}
}
// If this version is not part of the aspired versions.
if (next_aspired_versions.find(state_snapshot.id.version) ==
next_aspired_versions.end()) {
VLOG(1) << "Setting is_aspired=false for " << state_snapshot.id;
basic_manager_->GetAdditionalServableState<Aspired>(state_snapshot.id)
->is_aspired = false;
basic_manager_->CancelLoadServableRetry(state_snapshot.id);
}
}
// We do a set_difference (A - B), on the next aspired versions and the
// current aspired versions to find the version numbers which need to be
// added the harness map.
std::set<int64_t> additions;
std::set_difference(
next_aspired_versions.begin(), next_aspired_versions.end(),
current_aspired_versions.begin(), current_aspired_versions.end(),
std::inserter(additions, additions.begin()));
// We go through the aspired_servable_versions, pull out the versions which
// need to be added and add them to the harness map.
for (auto& version : versions) {
bool should_add = false;
const auto& version_id = version.id();
if (additions.find(version.id().version) != additions.end()) {
should_add = true;
}
if (enable_reload_servables_with_error_ &&
current_aspired_versions_with_error.find(version.id().version) !=
current_aspired_versions_with_error.end()) {
ServableId id;
id.name = std::string(servable_name);
id.version = version_id.version;
const Status manage_status = basic_manager_->StopManagingServable(id);
DCHECK(manage_status.ok()) << manage_status.error_message();
if (!manage_status.ok()) {
LOG(ERROR) << "Internal error: Unable to clear errored servable "
<< version_id.DebugString() << " from 'basic_manager_': "
<< manage_status.error_message();
}
should_add = true;
}
// if this aspired version is not already present in the map.
if (should_add) {
const Status manage_status =
basic_manager_->ManageServableWithAdditionalState(
std::move(version), std::unique_ptr<Aspired>(new Aspired{true}));
DCHECK(manage_status.ok()) << manage_status.error_message();
if (!manage_status.ok()) {
LOG(ERROR) << "Internal error: Unable to transfer servable "
<< version_id.DebugString()
<< " to 'basic_manager_': " << manage_status.error_message();
}
}
}
}