absl::StatusOr CallIntrinsicAggregate()

in tensorflow_federated/cc/core/impl/executors/composing_executor.cc [584:665]


  absl::StatusOr<ExecutorValue> CallIntrinsicAggregate(
      ExecutorValue&& arg) const {
    TFF_TRY(arg.CheckLenForUseAsArgument("federated_aggregate", 5));
    const auto& value = arg.structure()->at(0);
    if (value.type() != ExecutorValue::ValueType::CLIENTS) {
      return absl::InvalidArgumentError(
          "Cannot aggregate a value not placed at clients");
    }
    const auto& zero = arg.structure()->at(1);
    v0::Value zero_val;
    ParallelTasks tasks;
    TFF_TRY(MaterializeValue(zero, &zero_val, tasks));
    TFF_TRY(tasks.WaitAll());

    const auto& accumulate = arg.structure()->at(2);
    auto accumulate_val =
        TFF_TRY(accumulate.GetUnplacedFunctionProto("accumulate"));

    const auto& merge = arg.structure()->at(3);
    auto merge_val = TFF_TRY(merge.GetUnplacedFunctionProto("merge"));
    auto merge_id = TFF_TRY(merge.unplaced()->Embedded(*server_));

    const auto& report = arg.structure()->at(4);
    auto report_val = TFF_TRY(report.GetUnplacedFunctionProto("report"));
    auto report_id = TFF_TRY(report.unplaced()->Embedded(*server_));

    v0::Value null_report_val;
    *null_report_val.mutable_computation() = IdentityComp();
    v0::Value aggregate;
    aggregate.mutable_computation()->mutable_intrinsic()->mutable_uri()->assign(
        kFederatedAggregateUri.data(), kFederatedAggregateUri.size());

    // Initiate the aggregation in each child.
    std::vector<OwnedValueId> child_result_ids;
    child_result_ids.reserve(children_.size());
    for (uint32_t i = 0; i < children_.size(); i++) {
      const auto& child = children_[i].executor();
      ValueId child_val = value.clients()->at(i)->ref();
      std::vector<OwnedValueId> arg_owners;
      std::vector<ValueId> arg_ids;
      arg_ids.emplace_back(child_val);
      for (const v0::Value& arg_value :
           {zero_val, *accumulate_val, *merge_val, null_report_val}) {
        OwnedValueId child_id = TFF_TRY(child->CreateValue(arg_value));
        arg_ids.emplace_back(child_id.ref());
        arg_owners.emplace_back(std::move(child_id));
      }
      auto child_arg_id = TFF_TRY(child->CreateStruct(std::move(arg_ids)));
      auto child_aggregate_id = TFF_TRY(child->CreateValue(aggregate));
      auto child_result_id =
          TFF_TRY(child->CreateCall(child_aggregate_id, child_arg_id));
      child_result_ids.push_back(std::move(child_result_id));
    }

    // Materialize and merge the results from each child executor.
    // TODO(b/192457028): parallelize this so that we're materializing more than
    // one value at a time and so that we can begin merging as soon as any
    // result is available.
    absl::optional<OwnedValueId> current = absl::nullopt;
    for (uint32_t i = 0; i < children_.size(); i++) {
      const auto& child = children_[i].executor();
      v0::Value child_result = TFF_TRY(child->Materialize(child_result_ids[i]));
      if (!child_result.has_federated() ||
          child_result.federated().type().placement().value().uri() !=
              kServerUri) {
        return absl::InternalError(
            "Child executor returned non-server-placed value");
      }
      auto child_result_server_id =
          TFF_TRY(server_->CreateValue(child_result.federated().value(0)));
      if (current.has_value()) {
        auto merge_arg = TFF_TRY(
            server_->CreateStruct({current.value(), child_result_server_id}));
        current = TFF_TRY(server_->CreateCall(merge_id->ref(), merge_arg));
      } else {
        current = std::move(child_result_server_id);
      }
    }
    auto result =
        TFF_TRY(server_->CreateCall(report_id->ref(), current.value()));
    return ExecutorValue::CreateServerPlaced(ShareValueId(std::move(result)));
  }