parquet_flask/aws/aws_ddb.py (215 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import decimal
import logging
import boto3
from boto3.dynamodb.conditions import Attr
from parquet_flask.aws.aws_cred import AwsCred
LOGGER = logging.getLogger(__name__)
VALID_KEY_TYPE = ['S', 'N', 'B']
class AwsDdbProps:
def __init__(self):
self.__tbl_name = None
self.__hash_key = None
self.__range_key = None
self.__hash_key_type = 'S'
self.__range_key_type = 'S'
@property
def tbl_name(self):
return self.__tbl_name
@tbl_name.setter
def tbl_name(self, val):
"""
:param val: str
:return: None
"""
self.__tbl_name = val
return
@property
def hash_key(self):
return self.__hash_key
@hash_key.setter
def hash_key(self, val):
"""
:param val: str
:return: None
"""
self.__hash_key = val
return
@property
def range_key(self):
return self.__range_key
@range_key.setter
def range_key(self, val):
"""
:param val: str
:return: None
"""
self.__range_key = val
return
@property
def hash_key_type(self):
return self.__hash_key_type
@hash_key_type.setter
def hash_key_type(self, val):
"""
:param val: str - 'S', 'N', or 'B'
:return: None
"""
if val not in VALID_KEY_TYPE:
raise ValueError('input is not valid type. {} vs. {}'.format(val, VALID_KEY_TYPE))
self.__hash_key_type = val
return
@property
def range_key_type(self):
return self.__range_key_type
@range_key_type.setter
def range_key_type(self, val):
"""
:param val: str - 'S', 'N', or 'B'
:return: None
"""
if val not in VALID_KEY_TYPE:
raise ValueError('input is not valid type. {} vs. {}'.format(val, VALID_KEY_TYPE))
self.__range_key_type = val
return
class AwsDdb(AwsCred):
def __init__(self, props=AwsDdbProps()):
super().__init__()
self.__props = props
self._ddb_client = self.get_client('dynamodb')
self._ddb_resource = self.get_resource('dynamodb')
def has_table(self):
if self.__props.tbl_name is None:
raise ValueError('missing tbl_name')
try:
tbl_details = self._ddb_client.describe_table(TableName=self.__props.tbl_name)
return tbl_details
except Exception as e:
# TODO should check if exception is this one "ResourceNotFoundException". if not, throw the error
return None
def create_table(self, gsi_list=[]):
"""
ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.create_table
:param gsi_list: list - [{'IndexName': 'string','KeySchema': [{'AttributeName': 'string','KeyType': 'HASH'|'RANGE'},]}]
:param primary_key: str - Hash Key
:param secondary_key: str - Range Key (optional)
:param primary_key_type: str - 'S', 'N', or 'B'
:param secondary_key_type: str - 'S', 'N', or 'B'
:return: dict - create table result
"""
if self.__props.tbl_name is None:
raise ValueError('missing tbl_name')
if self.__props.hash_key is None:
raise ValueError('missing hash_key')
LOGGER.info('creating a table: {}'.format(self.__props.tbl_name))
attribute_definitions = [
{
'AttributeName': self.__props.hash_key,
'AttributeType': self.__props.hash_key_type,
}
]
key_schema = [
{
'AttributeName': self.__props.hash_key,
'KeyType': 'HASH', # 'RANGE' if there is secondary key
}
]
for each in gsi_list:
each['Projection'] = {'ProjectionType': 'ALL'}
if self.__props.range_key is not None:
attribute_definitions.append({
'AttributeName': self.__props.range_key,
'AttributeType': self.__props.range_key_type,
})
key_schema.append({
'AttributeName': self.__props.range_key,
'KeyType': 'RANGE',
})
create_tbl_params = {
'TableName': self.__props.tbl_name,
'AttributeDefinitions': attribute_definitions,
'KeySchema': key_schema,
'BillingMode': 'PAY_PER_REQUEST', # TODO setting it to on-demand. might need to re-visit later
'SSESpecification': {'Enabled': False} # TODO had to disable it since it does not support 'AES256' yet.
}
if len(gsi_list) > 0:
create_tbl_params['GlobalSecondaryIndexes'] = gsi_list
create_result = self._ddb_client.create_table(**create_tbl_params)
return create_result
def _replace_decimals(self, obj):
"""
Ref:
https://stackoverflow.com/a/46738251 in the comments
https://github.com/boto/boto3/issues/369#issuecomment-157205696
:param obj:
:return:
"""
if isinstance(obj, list):
for i in range(len(obj)):
obj[i] = self._replace_decimals(obj[i])
return obj
elif isinstance(obj, dict):
for k in obj.keys():
obj[k] = self._replace_decimals(obj[k])
return obj
elif isinstance(obj, decimal.Decimal):
if obj % 1 == 0:
return int(obj)
else:
return float(obj)
else:
return obj
def get_one_item(self, hash_val, range_val=None):
"""
retrieving a single item based on hash key
:param hash_val:
:param range_val:
:return:
"""
LOGGER.info('retrieving one item from DDB using they key')
query_key = {self.__props.hash_key: hash_val}
if range_val is not None and self.__props.range_key is not None:
query_key[self.__props.range_key] = range_val
item_result = self._ddb_resource.Table(self.__props.tbl_name).get_item(
Key=query_key
)
if 'Item' not in item_result:
return None
return self._replace_decimals(item_result['Item'])
def delete_one_item(self, hash_val, range_val=None):
"""
Sample Response:
{'Attributes': {...}}
{'RequestId': '70PUK7HSNQI6VLHRM1Q7VPESJ3VV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Mon, 08 Mar 2021 18:04:35 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '70PUK7HSNQI6VLHRM1Q7VPESJ3VV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}
:param hash_val:
:param range_val:
:return:
"""
LOGGER.info('deleting one item from DDB using they key')
query_key = {self.__props.hash_key: hash_val}
if range_val is not None and self.__props.range_key is not None:
query_key[self.__props.range_key] = range_val
item_result = self._ddb_resource.Table(self.__props.tbl_name).delete_item(Key=query_key, ReturnValues='ALL_OLD')
if 'Attributes' not in item_result:
LOGGER.warning('cannot retrieved deleted attributes.')
return None
return item_result['Attributes']
def add_one_item(self, item_dict, hash_val, range_val=None, replace=False):
LOGGER.info('adding one item from DDB using they key')
item_dict[self.__props.hash_key] = hash_val
if range_val is not None and self.__props.range_key is not None:
item_dict[self.__props.range_key] = range_val
addition_arguments = {
'Item': item_dict,
'ReturnValues': 'ALL_OLD',
}
if replace is True:
if range_val is not None and self.__props.range_key is not None:
condition = Attr(self.__props.hash_key).eq(hash_val) and Attr(self.__props.range_key).ne(range_val)
else:
condition = Attr(self.__props.hash_key).eq(hash_val)
else:
if range_val is not None and self.__props.range_key is not None:
condition = Attr(self.__props.hash_key).ne(hash_val) and Attr(self.__props.range_key).ne(range_val)
else:
condition = Attr(self.__props.hash_key).ne(hash_val)
addition_arguments['ConditionExpression'] = condition
item_result = self._ddb_resource.Table(self.__props.tbl_name).put_item(**addition_arguments)
"""
{'ResponseMetadata': {'RequestId': '49876A3IFHPMRFIEUMANGFAO8VVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Mon, 08 Mar 2021 17:58:08 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '49876A3IFHPMRFIEUMANGFAO8VVV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}}
"""
# TODO check result
return
def scan_tbl(self, conditions_dict):
LOGGER.info('scanning items from DDB using they key')
current_tbl = self._ddb_resource.Table(self.__props.tbl_name)
item_result = current_tbl.scan(
Limit=1,
ScanFilter=conditions_dict,
Select='ALL_ATTRIBUTES')
all_results = item_result['Items']
while 'LastEvaluatedKey' in item_result and item_result['LastEvaluatedKey'] is not None: # pagination
item_result = current_tbl.scan(
Limit=100,
ScanFilter=conditions_dict,
ExclusiveStartKey=item_result['LastEvaluatedKey'],
Select='ALL_ATTRIBUTES')
all_results.extend(item_result['Items'])
return self._replace_decimals(all_results)
def update_one_item(self, update_expression, expression_names, expression_vals, hash_val, range_val=None, retrieve_new_val=True):
"""
Usage : increment or decrement
ddb.update_one_item('SET #created_at_key = #created_at_key + :created_at_val', {'#created_at_key': 'created_at'}, {':created_at_val': -50}, '61725b56-3016-42c6-9006-c0b5d9017fee')
:param update_expression: str - example: add #created_key = :created_val
:param expression_names: dict - {'#created_key': 'created_at'}
:param expression_vals: dict - {':created_val': 123}
:param hash_val: str
:param range_val: str
:return:
"""
LOGGER.info('updating one item from DDB using they key')
query_key = {self.__props.hash_key: hash_val}
if range_val is not None and self.__props.range_key is not None:
query_key[self.__props.range_key] = range_val
item_result = self._ddb_resource.Table(self.__props.tbl_name).update_item(
Key=query_key,
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_names,
ExpressionAttributeValues=expression_vals,
ReturnValues='ALL_NEW' if retrieve_new_val is True else 'ALL_OLD'
)
if 'Attributes' not in item_result:
return None
return self._replace_decimals(item_result['Attributes'])
def __get_ddb_type(self, input_val):
if isinstance(input_val, str):
return 'S'
if isinstance(input_val, bool):
return 'B'
return 'N'
def get_from_index(self, index_name: str, hash_dict: dict):
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.query
:param index_name: str - name of a secondary index
:param hash_dict: dictionary of {'name': 'value'} of a hash key. Only 1 is allowed
:return:
"""
hash_val = [v for v in hash_dict.values()][0]
query_dict = {
'IndexName': index_name,
'Select': 'ALL_ATTRIBUTES', # 'ALL_ATTRIBUTES'|'ALL_PROJECTED_ATTRIBUTES'|'SPECIFIC_ATTRIBUTES'|'COUNT'
'Limit': 1,
'ConsistentRead': False,
'KeyConditionExpression': boto3.dynamodb.conditions.Key([k for k in hash_dict.keys()][0]).between(hash_val),
}
item_result = self._ddb_resource.Table(self.__props.tbl_name).query(**query_dict)
updated_result = [self._replace_decimals(k) for k in item_result['Items']]
return updated_result