in orders/src/on_events/main.py [0:0]
def update_order(order_id: str, status: str, products: Optional[List[dict]] = None) -> None:
"""
Update packages in the order
"""
logger.info({
"message": "Update status for order {} to {}".format(order_id, status),
"orderId": order_id,
"status": status
})
update_expression = "set #s = :s"
attribute_names = {
"#s": "status"
}
attribute_values = {
":s": status
}
if products is not None:
product_ids = [p["productId"] for p in products]
item = table.get_item(
Key={"orderId": order_id},
AttributesToGet=["products"]
)["Item"]
new_products = []
for product in item["products"]:
if product["productId"] in product_ids:
new_products.append(product)
update_expression += ", #p = :p"
attribute_names["#p"] = "products"
attribute_values[":p"] = products
table.update_item(
Key={"orderId": order_id},
UpdateExpression=update_expression,
ExpressionAttributeNames=attribute_names,
ExpressionAttributeValues=attribute_values
)