void AspiredVersionsManager::ProcessAspiredVersionsRequest()

in tensorflow_serving/core/aspired_versions_manager.cc [253:333]


void AspiredVersionsManager::ProcessAspiredVersionsRequest(
    const StringPiece servable_name,
    std::vector<ServableData<std::unique_ptr<Loader>>> versions) {
  VLOG(1) << "Processing aspired versions request: " << servable_name << ": "
          << ServableVersionsDebugString(versions);

  const std::set<int64_t> next_aspired_versions = GetVersionNumbers(versions);

  // We gather all the servables with the servable_name and
  // 1. Add the current aspired version numbers to a set,
  // 2. Set the aspired bool to false for all current servable harnesses which
  // are not aspired.
  std::set<int64_t> current_aspired_versions;
  std::set<int64_t> current_aspired_versions_with_error;
  const std::vector<ServableStateSnapshot<Aspired>> state_snapshots =
      basic_manager_->GetManagedServableStateSnapshots<Aspired>(
          string(servable_name));
  for (const ServableStateSnapshot<Aspired>& state_snapshot : state_snapshots) {
    if (state_snapshot.additional_state->is_aspired) {
      current_aspired_versions.insert(state_snapshot.id.version);
      if (state_snapshot.state == LoaderHarness::State::kError) {
        current_aspired_versions_with_error.insert(state_snapshot.id.version);
      }
    }
    // If this version is not part of the aspired versions.
    if (next_aspired_versions.find(state_snapshot.id.version) ==
        next_aspired_versions.end()) {
      VLOG(1) << "Setting is_aspired=false for " << state_snapshot.id;
      basic_manager_->GetAdditionalServableState<Aspired>(state_snapshot.id)
          ->is_aspired = false;
      basic_manager_->CancelLoadServableRetry(state_snapshot.id);
    }
  }

  // We do a set_difference (A - B), on the next aspired versions and the
  // current aspired versions to find the version numbers which need to be
  // added the harness map.
  std::set<int64_t> additions;
  std::set_difference(
      next_aspired_versions.begin(), next_aspired_versions.end(),
      current_aspired_versions.begin(), current_aspired_versions.end(),
      std::inserter(additions, additions.begin()));

  // We go through the aspired_servable_versions, pull out the versions which
  // need to be added and add them to the harness map.
  for (auto& version : versions) {
    bool should_add = false;
    const auto& version_id = version.id();
    if (additions.find(version.id().version) != additions.end()) {
      should_add = true;
    }
    if (enable_reload_servables_with_error_ &&
        current_aspired_versions_with_error.find(version.id().version) !=
            current_aspired_versions_with_error.end()) {
      ServableId id;
      id.name = std::string(servable_name);
      id.version = version_id.version;
      const Status manage_status = basic_manager_->StopManagingServable(id);
      DCHECK(manage_status.ok()) << manage_status.error_message();
      if (!manage_status.ok()) {
        LOG(ERROR) << "Internal error: Unable to clear errored servable "
                   << version_id.DebugString() << " from 'basic_manager_': "
                   << manage_status.error_message();
      }
      should_add = true;
    }

    // if this aspired version is not already present in the map.
    if (should_add) {
      const Status manage_status =
          basic_manager_->ManageServableWithAdditionalState(
              std::move(version), std::unique_ptr<Aspired>(new Aspired{true}));
      DCHECK(manage_status.ok()) << manage_status.error_message();
      if (!manage_status.ok()) {
        LOG(ERROR) << "Internal error: Unable to transfer servable "
                   << version_id.DebugString()
                   << " to 'basic_manager_': " << manage_status.error_message();
      }
    }
  }
}