def push_to_dynamo()

in mozetl/taar/taar_dynamo.py [0:0]


    def push_to_dynamo(self, data_tuple):  # noqa
        """
        This connects to DynamoDB and pushes records in `item_list` into
        a table.

        We accumulate a list of up to 50 elements long to allow debugging
        of write errors.
        """
        # Transform the data into something that DynamoDB will always
        # accept
        # Set TTL to 60 days from now
        ttl = int(time.time()) + 60 * 60 * 24 * 60

        self.hash_client_ids(data_tuple)

        item_list = [
            {
                "client_id": item["client_id"],
                "TTL": ttl,
                "json_payload": DynamoBinary(
                    zlib.compress(json.dumps(item, default=json_serial).encode("utf8"))
                ),
            }
            for item in data_tuple[2]
        ]

        # Obtain credentials from the singleton

        print("Using prod iam role: %s" % self._prod_iam_role)
        if self._prod_iam_role is not None:
            cred_args = credentials.getInstance(self._prod_iam_role)
        else:
            cred_args = {}

        conn = boto3.resource("dynamodb", region_name=self._region_name, **cred_args)

        table = conn.Table(self._table_name)
        try:
            with table.batch_writer(overwrite_by_pkeys=["client_id"]) as batch:
                for item in item_list:
                    batch.put_item(Item=item)
            return []
        except Exception:
            # Something went wrong with the batch write write.
            if len(data_tuple[3]) == 50:
                # Too many errors already accumulated, just short circuit
                # and return
                return []
            try:
                error_accum = []
                conn = boto3.resource("dynamodb", region_name=self._region_name)
                table = conn.Table(self._table_name)
                for item in item_list:
                    try:
                        table.put_item(Item=item)
                    except Exception:
                        error_accum.append(item)
                return error_accum
            except Exception:
                # Something went wrong with the entire DynamoDB
                # connection. Just return the entire list of
                # JSON items
                return item_list