in ddbtools/table.py [0:0]
def get_table_pricing_data(self, table_name: str) -> dict:
"""Return table data useful for determining pricing"""
try:
response = self.dynamodb_client.describe_table(TableName=table_name)
except Exception as e:
raise Exception(f"Failed to describe table {table_name}: {e}.") from None
table_data = response['Table']
table_arn = table_data['TableArn']
if 'BillingModeSummary' in table_data:
billing_mode = table_data['BillingModeSummary']['BillingMode']
else:
billing_mode = constants.PROVISIONED_BILLING
table_bytes = table_data['TableSizeBytes']
table_gb = Decimal(table_bytes / constants.GB_IN_BYTES)
throughput_data = table_data['ProvisionedThroughput']
provisioned_rcus = throughput_data['ReadCapacityUnits']
provisioned_wcus = throughput_data['WriteCapacityUnits']
table_pricing_data = {constants.BILLING_MODE: billing_mode,
constants.SIZE_IN_GB: table_gb,
constants.PROVISIONED_RCUS: provisioned_rcus,
constants.PROVISIONED_WCUS: provisioned_wcus,
constants.TABLE_ARN: table_arn}
if 'GlobalSecondaryIndexes' in table_data:
gsi_list = table_data['GlobalSecondaryIndexes']
gsi_pricing_data = []
for gsi_data in gsi_list:
if gsi_data['IndexStatus'] == 'ACTIVE':
gsi = {}
gsi[constants.INDEX_ARN] = gsi_data['IndexArn']
gsi[constants.INDEX_NAME] = gsi_data['IndexName']
gsi_throughput = gsi_data['ProvisionedThroughput']
gsi[constants.PROVISIONED_RCUS] = gsi_throughput['ReadCapacityUnits']
gsi[constants.PROVISIONED_WCUS] = gsi_throughput['WriteCapacityUnits']
gsi_bytes = gsi_data['IndexSizeBytes']
gsi_gb = Decimal(gsi_bytes / constants.GB_IN_BYTES)
gsi[constants.SIZE_IN_GB] = gsi_gb
gsi_pricing_data.append(gsi)
table_pricing_data[constants.GSIS] = gsi_pricing_data
if 'Replicas' in table_data:
for replica in table_data['Replicas']:
if replica['ReplicaStatus'] == 'ACTIVE':
if constants.REPLICAS in table_pricing_data:
table_pricing_data[constants.REPLICAS].append(replica['RegionName'])
else:
table_pricing_data[constants.REPLICAS]= [replica['RegionName']]
if 'TableClassSummary' in table_data:
table_pricing_data[constants.TABLE_CLASS] = table_data['TableClassSummary']['TableClass']
else:
table_pricing_data[constants.TABLE_CLASS] = constants.STD_TABLE_CLASS
return table_pricing_data