in flsim/reducers/dp_round_reducer.py [0:0]
def reduce(self) -> Tuple[nn.Module, float]:
if not self.privacy_on:
return super().reduce()
# only sum in the rank 0 reducer (no broadcast yet)
self._reduce_all(OperationType.SUM) # OperationType.SUM)
self.logger.debug(f"Sum of weights after aggregation: {self.sum_weights}")
if FLDistributedUtils.is_master_worker():
total_weights = float(self.sum_weights.item())
if abs(total_weights - self.num_users_per_round) > 1e-5:
self.logger.error(
f"total weights {total_weights} is not equal to "
f"number of users {self.num_users_per_round}. "
"Please make sure reduction_type=AVERGAE."
)
"""
The final amount of noise added must be equal to
(max_norm * noise_multiplier) / users_per_round, similar to
Google's user-level DP https://arxiv.org/pdf/1710.06963.pdf.
Note that in the _generate_noise() function, the noise_multiplier
is already multiplied.
"""
self.privacy_engine.add_noise(
self.reduced_module, self.clipping_value / total_weights
)
# broadcast the new noisy model to all workers.
state_dict = FLModelParamUtils.get_state_dict(
self.reduced_module,
# pyre-fixme[16]: `DPRoundReducer` has no attribute `cfg`.
only_federated_params=self.cfg.only_federated_params,
)
FLDistributedUtils.distributed_operation(
chain([self.sum_weights], state_dict.values()), OperationType.BROADCAST
)
self.logger.debug(
f"Sum of client weights after reduction on worker: {self.sum_weights}"
)
self._privacy_budget = self.privacy_engine.get_privacy_spent()
self.logger.info(f"User Privacy Budget: {self._privacy_budget}")
return self.reduced_module, float(self.sum_weights.item())