in products/src/validate/main.py [0:0]
def validate_products(products: List[dict]) -> Set[Union[List[dict], str]]:
"""
Takes a list of products and validate them
If all products are valid, this will return an empty list.
"""
validated_products = []
reasons = []
# batch_get_item only supports up to 100 items, so split the list of products in batches
# of 100 max.
for i in range(0, len(products), 100):
q_products = {}
for product in products[i:i+100]:
q_products[product["productId"]] = product
response = dynamodb.batch_get_item(RequestItems={
TABLE_NAME: {
"Keys": [
{"productId": {"S": product_id}}
# Only fetch by batch of 100 items
for product_id in q_products.keys()
],
"ProjectionExpression": "#productId, #name, #package, #price",
"ExpressionAttributeNames": {
"#productId": "productId",
"#name": "name",
"#package": "package",
"#price": "price"
}
}
})
ddb_products = {
p["productId"]["S"]: {k: type_deserializer.deserialize(v) for k, v in p.items()}
for p in response.get("Responses", {}).get(TABLE_NAME, [])
}
# Even if we ask less than 100 items, there is a 16MB response limit, so the call might
# return less items than expected.
while response.get("UnprocessedKeys", {}).get(TABLE_NAME, None) is not None:
response = dynamodb.batch_get_item(RequestItems=response["UnprocessedKeys"])
for product in response.get("Responses", {}).get(TABLE_NAME, []):
ddb_products[product["productId"]["S"]] = {k: type_deserializer.deserialize(v) for k, v in product.items()}
for product_id, product in q_products.items():
retval = compare_product(product, ddb_products.get(product_id, None))
if retval is not None:
validated_products.append(retval[0])
reasons.append(retval[1])
return validated_products, ". ".join(reasons)