absl::StatusOr CallFederatedIntrinsic()

in tensorflow_federated/cc/core/impl/executors/federating_executor.cc [365:475]


  absl::StatusOr<ExecutorValue> CallFederatedIntrinsic(
      FederatedIntrinsic function, ExecutorValue arg) {
    switch (function) {
      case FederatedIntrinsic::VALUE_AT_CLIENTS: {
        return ClientsAllEqualValue(TFF_TRY(Embed(arg)));
      }
      case FederatedIntrinsic::VALUE_AT_SERVER: {
        return ExecutorValue::CreateServerPlaced(TFF_TRY(Embed(arg)));
      }
      case FederatedIntrinsic::EVAL_AT_SERVER: {
        auto embedded = TFF_TRY(Embed(arg));
        return ExecutorValue::CreateServerPlaced(ShareValueId(
            TFF_TRY(child_->CreateCall(embedded->ref(), absl::nullopt))));
      }
      case FederatedIntrinsic::EVAL_AT_CLIENTS: {
        auto embedded = TFF_TRY(Embed(arg));
        Clients client_values = NewClients();
        for (int i = 0; i < num_clients_; i++) {
          client_values->emplace_back(ShareValueId(
              TFF_TRY(child_->CreateCall(embedded->ref(), absl::nullopt))));
        }
        return ExecutorValue::CreateClientsPlaced(std::move(client_values));
      }
      case FederatedIntrinsic::AGGREGATE: {
        TFF_TRY(CheckLenForUseAsArgument(arg, "federated_aggregate", 5));
        const auto& value = arg.structure()->at(0);
        const auto& zero = arg.structure()->at(1);
        auto zero_child_id = TFF_TRY(Embed(zero));
        const auto& accumulate = arg.structure()->at(2);
        auto accumulate_child_id = TFF_TRY(Embed(accumulate));
        // `merge` is unused (argument four).
        const auto& report = arg.structure()->at(4);
        auto report_child_id = TFF_TRY(Embed(report));
        TFF_TRY(value.CheckArgumentType(ExecutorValue::ValueType::CLIENTS,
                                        "`federated_aggregate`'s `value`"));
        absl::optional<OwnedValueId> current_owner = absl::nullopt;
        ValueId current = zero_child_id->ref();
        for (const auto& client_val_id : *value.clients()) {
          auto acc_arg =
              TFF_TRY(child_->CreateStruct({current, client_val_id->ref()}));
          current_owner =
              TFF_TRY(child_->CreateCall(accumulate_child_id->ref(), acc_arg));
          current = current_owner.value().ref();
        }
        auto result =
            TFF_TRY(child_->CreateCall(report_child_id->ref(), current));
        return ExecutorValue::CreateServerPlaced(
            ShareValueId(std::move(result)));
      }
      case FederatedIntrinsic::BROADCAST: {
        TFF_TRY(arg.CheckArgumentType(ExecutorValue::ValueType::SERVER,
                                      "`federated_broadcast`"));
        return ClientsAllEqualValue(arg.server());
      }
      case FederatedIntrinsic::MAP: {
        TFF_TRY(CheckLenForUseAsArgument(arg, "federated_map", 2));
        const auto& fn = arg.structure()->at(0);
        auto child_fn = TFF_TRY(Embed(fn));
        ValueId child_fn_ref = child_fn->ref();
        const auto& data = arg.structure()->at(1);
        if (data.type() == ExecutorValue::ValueType::CLIENTS) {
          Clients results = NewClients();
          for (int i = 0; i < num_clients_; i++) {
            auto client_arg = data.clients()->at(i)->ref();
            auto result = TFF_TRY(child_->CreateCall(child_fn_ref, client_arg));
            results->emplace_back(ShareValueId(std::move(result)));
          }
          return ExecutorValue::CreateClientsPlaced(std::move(results));
        } else if (data.type() == ExecutorValue::ValueType::SERVER) {
          auto res =
              TFF_TRY(child_->CreateCall(child_fn_ref, data.server()->ref()));
          return ExecutorValue::CreateServerPlaced(
              ShareValueId(std::move(res)));
        } else {
          return absl::InvalidArgumentError(
              "Attempted to map non-federated value.");
        }
      }
      case FederatedIntrinsic::SELECT: {
        TFF_TRY(CheckLenForUseAsArgument(arg, "federated_select", 4));
        const auto& keys = arg.structure()->at(0);
        // Argument two (`max_key`) is unused in this impl.
        const auto& server_val = arg.structure()->at(2);
        const auto& select_fn = arg.structure()->at(3);
        TFF_TRY(keys.CheckArgumentType(ExecutorValue::ValueType::CLIENTS,
                                       "`federated_select`'s `keys`"));
        const Clients& keys_child_ids = keys.clients();
        TFF_TRY(
            server_val.CheckArgumentType(ExecutorValue::ValueType::SERVER,
                                         "`federated_select`'s `server_val`"));
        ValueId server_val_child_id = server_val.server()->ref();
        TFF_TRY(
            select_fn.CheckArgumentType(ExecutorValue::ValueType::UNPLACED,
                                        "`federated_select`'s `select_fn`"));
        ValueId select_fn_child_id = select_fn.unplaced()->ref();
        return CallFederatedSelect(keys_child_ids, server_val_child_id,
                                   select_fn_child_id);
      }
      case FederatedIntrinsic::ZIP_AT_CLIENTS: {
        Clients results = NewClients();
        for (uint32_t i = 0; i < num_clients_; i++) {
          results->push_back(TFF_TRY(ZipStructIntoClient(arg, i)));
        }
        return ExecutorValue::CreateClientsPlaced(std::move(results));
      }
      case FederatedIntrinsic::ZIP_AT_SERVER: {
        return ExecutorValue::CreateServerPlaced(
            TFF_TRY(ZipStructIntoServer(arg)));
      }
    }
  }