def update_variant_metrics()

in lambda/api/experiment_metrics.py [0:0]


    def update_variant_metrics(self, metrics: list, timestamp=int(time())):
        """
        Group by endpoint variants and metric type to increment dynamodb counts
        """
        table = self.dynamodb.Table(self.metrics_table)

        # Sort the list by endpoint_name and variant_name first to ensure groupby is efficient
        metrics = sorted(
            metrics, key=lambda m: (m["endpoint_name"], m["endpoint_variant"])
        )

        responses = []
        for (endpoint_name, variant_name), vg in groupby(
            metrics, lambda m: (m["endpoint_name"], m["endpoint_variant"])
        ):
            # Get the total invocation and rewards
            invocation_count = 0
            conversion_count = 0
            reward_sum = 0.0
            for m in vg:
                if m["type"] == "invocation":
                    invocation_count += 1
                elif m["type"] == "conversion":
                    conversion_count += 1
                    reward_sum += m["reward"]
                else:
                    raise Exception("Unsupported type {}".format(m["type"]))
            logging.debug(
                f"Update metrics for endpoint: {endpoint_name}, variant: {variant_name} invocations: {invocation_count}, conversions: {conversion_count}, rewards: {reward_sum}"
            )
            # Update variant in dynamo db with these counts
            response = table.update_item(
                Key={"endpoint_name": endpoint_name},
                UpdateExpression="ADD variant_metrics.#variant.invocation_count :i, "
                "variant_metrics.#variant.conversion_count :c, "
                "variant_metrics.#variant.reward_sum :r "
                "SET #created_at = if_not_exists(#created_at, :now), #updated_at = :now ",
                ExpressionAttributeNames={
                    "#variant": variant_name,
                    "#created_at": "created_at",
                    "#updated_at": "updated_at",
                },
                ExpressionAttributeValues={
                    ":i": int(invocation_count),
                    ":c": int(conversion_count),
                    ":r": Decimal(str(reward_sum)),
                    ":now": timestamp,
                },
                ReturnValues="UPDATED_NEW",
            )

            # Return total counts per endpoint_name and endpoint_variant
            logging.debug(response)
            metrics = response["Attributes"]["variant_metrics"][variant_name]
            new_counts = {
                "endpoint_name": endpoint_name,
                "endpoint_variant": variant_name,
                "invocation_count": metrics.get("invocation_count", 0),
                "conversion_count": metrics.get("conversion_count", 0),
                "reward_sum": metrics.get("reward_sum", 0.0),
            }
            responses.append(new_counts)

            # Put cloudwatch metrics against this timestamp
            dt = datetime.fromtimestamp(timestamp)
            if invocation_count > 0:
                self.put_cloudwatch_metric(
                    "Invocations", endpoint_name, variant_name, invocation_count, dt
                )
            if conversion_count > 0:
                self.put_cloudwatch_metric(
                    "Conversions", endpoint_name, variant_name, conversion_count, dt
                )
                self.put_cloudwatch_metric(
                    "Rewards", endpoint_name, variant_name, reward_sum, dt
                )

        return responses