in mozetl/taar/taar_dynamo.py [0:0]
def push_to_dynamo(self, data_tuple): # noqa
"""
This connects to DynamoDB and pushes records in `item_list` into
a table.
We accumulate a list of up to 50 elements long to allow debugging
of write errors.
"""
# Transform the data into something that DynamoDB will always
# accept
# Set TTL to 60 days from now
ttl = int(time.time()) + 60 * 60 * 24 * 60
self.hash_client_ids(data_tuple)
item_list = [
{
"client_id": item["client_id"],
"TTL": ttl,
"json_payload": DynamoBinary(
zlib.compress(json.dumps(item, default=json_serial).encode("utf8"))
),
}
for item in data_tuple[2]
]
# Obtain credentials from the singleton
print("Using prod iam role: %s" % self._prod_iam_role)
if self._prod_iam_role is not None:
cred_args = credentials.getInstance(self._prod_iam_role)
else:
cred_args = {}
conn = boto3.resource("dynamodb", region_name=self._region_name, **cred_args)
table = conn.Table(self._table_name)
try:
with table.batch_writer(overwrite_by_pkeys=["client_id"]) as batch:
for item in item_list:
batch.put_item(Item=item)
return []
except Exception:
# Something went wrong with the batch write write.
if len(data_tuple[3]) == 50:
# Too many errors already accumulated, just short circuit
# and return
return []
try:
error_accum = []
conn = boto3.resource("dynamodb", region_name=self._region_name)
table = conn.Table(self._table_name)
for item in item_list:
try:
table.put_item(Item=item)
except Exception:
error_accum.append(item)
return error_accum
except Exception:
# Something went wrong with the entire DynamoDB
# connection. Just return the entire list of
# JSON items
return item_list