in lambda/api/experiment_metrics.py [0:0]
def update_variant_metrics(self, metrics: list, timestamp=int(time())):
"""
Group by endpoint variants and metric type to increment dynamodb counts
"""
table = self.dynamodb.Table(self.metrics_table)
# Sort the list by endpoint_name and variant_name first to ensure groupby is efficient
metrics = sorted(
metrics, key=lambda m: (m["endpoint_name"], m["endpoint_variant"])
)
responses = []
for (endpoint_name, variant_name), vg in groupby(
metrics, lambda m: (m["endpoint_name"], m["endpoint_variant"])
):
# Get the total invocation and rewards
invocation_count = 0
conversion_count = 0
reward_sum = 0.0
for m in vg:
if m["type"] == "invocation":
invocation_count += 1
elif m["type"] == "conversion":
conversion_count += 1
reward_sum += m["reward"]
else:
raise Exception("Unsupported type {}".format(m["type"]))
logging.debug(
f"Update metrics for endpoint: {endpoint_name}, variant: {variant_name} invocations: {invocation_count}, conversions: {conversion_count}, rewards: {reward_sum}"
)
# Update variant in dynamo db with these counts
response = table.update_item(
Key={"endpoint_name": endpoint_name},
UpdateExpression="ADD variant_metrics.#variant.invocation_count :i, "
"variant_metrics.#variant.conversion_count :c, "
"variant_metrics.#variant.reward_sum :r "
"SET #created_at = if_not_exists(#created_at, :now), #updated_at = :now ",
ExpressionAttributeNames={
"#variant": variant_name,
"#created_at": "created_at",
"#updated_at": "updated_at",
},
ExpressionAttributeValues={
":i": int(invocation_count),
":c": int(conversion_count),
":r": Decimal(str(reward_sum)),
":now": timestamp,
},
ReturnValues="UPDATED_NEW",
)
# Return total counts per endpoint_name and endpoint_variant
logging.debug(response)
metrics = response["Attributes"]["variant_metrics"][variant_name]
new_counts = {
"endpoint_name": endpoint_name,
"endpoint_variant": variant_name,
"invocation_count": metrics.get("invocation_count", 0),
"conversion_count": metrics.get("conversion_count", 0),
"reward_sum": metrics.get("reward_sum", 0.0),
}
responses.append(new_counts)
# Put cloudwatch metrics against this timestamp
dt = datetime.fromtimestamp(timestamp)
if invocation_count > 0:
self.put_cloudwatch_metric(
"Invocations", endpoint_name, variant_name, invocation_count, dt
)
if conversion_count > 0:
self.put_cloudwatch_metric(
"Conversions", endpoint_name, variant_name, conversion_count, dt
)
self.put_cloudwatch_metric(
"Rewards", endpoint_name, variant_name, reward_sum, dt
)
return responses