in emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java [118:212]
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
DynamoDBClient client =
new DynamoDBClient(conf, tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));
try {
String tableName = HiveDynamoDBUtil.getDynamoDBTableName(tableDesc.getProperties()
.getProperty(DynamoDBConstants.TABLE_NAME), tableDesc.getTableName());
TableDescription description = client.describeTable(tableName);
Double averageItemSize = DynamoDBUtil.calculateAverageItemSize(description);
log.info("Average item size: " + averageItemSize);
String endpoint = conf.get(DynamoDBConstants.ENDPOINT);
if (!Strings.isNullOrEmpty(tableDesc.getProperties().getProperty(DynamoDBConstants
.ENDPOINT))) {
endpoint = tableDesc.getProperties().getProperty(DynamoDBConstants.ENDPOINT);
}
if (!Strings.isNullOrEmpty(endpoint)) {
jobProperties.put(DynamoDBConstants.ENDPOINT, endpoint);
}
if (!Strings.isNullOrEmpty(tableDesc.getProperties().getProperty(DynamoDBConstants.REGION))) {
jobProperties.put(DynamoDBConstants.REGION,
tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));
}
jobProperties.put(DynamoDBConstants.OUTPUT_TABLE_NAME, tableName);
jobProperties.put(DynamoDBConstants.INPUT_TABLE_NAME, tableName);
jobProperties.put(DynamoDBConstants.TABLE_NAME, tableName);
Map<String, String> hiveToDynamoDBSchemaMapping = HiveDynamoDBUtil
.getHiveToDynamoDBMapping(tableDesc.getProperties().getProperty(DynamoDBConstants
.DYNAMODB_COLUMN_MAPPING));
// Column map can be null if only full backup is being used
if (hiveToDynamoDBSchemaMapping != null) {
jobProperties.put(DynamoDBConstants.DYNAMODB_COLUMN_MAPPING, HiveDynamoDBUtil
.toJsonString(hiveToDynamoDBSchemaMapping));
}
Map<String, String> hiveToDynamoDBTypeMapping = HiveDynamoDBUtil
.getHiveToDynamoDBMapping(tableDesc.getProperties().getProperty(DynamoDBConstants
.DYNAMODB_TYPE_MAPPING));
if (hiveToDynamoDBSchemaMapping != null) {
jobProperties.put(DynamoDBConstants.DYNAMODB_TYPE_MAPPING, HiveDynamoDBUtil
.toJsonString(hiveToDynamoDBTypeMapping));
}
boolean hiveToDynamoDBNullSerialization = Boolean.parseBoolean(
tableDesc.getProperties().getProperty(DynamoDBConstants.DYNAMODB_NULL_SERIALIZATION));
jobProperties.put(DynamoDBConstants.DYNAMODB_NULL_SERIALIZATION,
Boolean.toString(hiveToDynamoDBNullSerialization));
if (tableDesc.getProperties().getProperty(DynamoDBConstants.THROUGHPUT_READ_PERCENT)
!= null) {
jobProperties.put(DynamoDBConstants.THROUGHPUT_READ_PERCENT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.THROUGHPUT_READ_PERCENT));
}
if (tableDesc.getProperties().getProperty(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT)
!= null) {
jobProperties.put(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT));
}
if (description.getBillingModeSummary() == null
|| description.getBillingModeSummary().getBillingMode()
.equals(DynamoDBConstants.BILLING_MODE_PROVISIONED)) {
useExplicitThroughputIfRequired(jobProperties, tableDesc);
} else {
// If not specified at the table level, set default value
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.READ_THROUGHPUT,
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()));
jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.WRITE_THROUGHPUT,
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()));
}
jobProperties.put(DynamoDBConstants.ITEM_COUNT, description.getItemCount().toString());
jobProperties.put(DynamoDBConstants.TABLE_SIZE_BYTES, description.getTableSizeBytes()
.toString());
jobProperties.put(DynamoDBConstants.AVG_ITEM_SIZE, averageItemSize.toString());
log.info("Average item size: " + averageItemSize);
log.info("Item count: " + description.getItemCount());
log.info("Table size: " + description.getTableSizeBytes());
log.info("Read throughput: " + jobProperties.get(DynamoDBConstants.READ_THROUGHPUT));
log.info("Write throughput: " + jobProperties.get(DynamoDBConstants.WRITE_THROUGHPUT));
} finally {
client.close();
}
}