public void configureTableJobProperties()

in emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java [118:212]


  public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
    DynamoDBClient client =
        new DynamoDBClient(conf, tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));

    try {
      String tableName = HiveDynamoDBUtil.getDynamoDBTableName(tableDesc.getProperties()
          .getProperty(DynamoDBConstants.TABLE_NAME), tableDesc.getTableName());
      TableDescription description = client.describeTable(tableName);
      Double averageItemSize = DynamoDBUtil.calculateAverageItemSize(description);
      log.info("Average item size: " + averageItemSize);

      String endpoint = conf.get(DynamoDBConstants.ENDPOINT);
      if (!Strings.isNullOrEmpty(tableDesc.getProperties().getProperty(DynamoDBConstants
          .ENDPOINT))) {
        endpoint = tableDesc.getProperties().getProperty(DynamoDBConstants.ENDPOINT);
      }

      if (!Strings.isNullOrEmpty(endpoint)) {
        jobProperties.put(DynamoDBConstants.ENDPOINT, endpoint);
      }

      if (!Strings.isNullOrEmpty(tableDesc.getProperties().getProperty(DynamoDBConstants.REGION))) {
        jobProperties.put(DynamoDBConstants.REGION,
            tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));
      }

      jobProperties.put(DynamoDBConstants.OUTPUT_TABLE_NAME, tableName);
      jobProperties.put(DynamoDBConstants.INPUT_TABLE_NAME, tableName);
      jobProperties.put(DynamoDBConstants.TABLE_NAME, tableName);

      Map<String, String> hiveToDynamoDBSchemaMapping = HiveDynamoDBUtil
          .getHiveToDynamoDBMapping(tableDesc.getProperties().getProperty(DynamoDBConstants
              .DYNAMODB_COLUMN_MAPPING));

      // Column map can be null if only full backup is being used
      if (hiveToDynamoDBSchemaMapping != null) {
        jobProperties.put(DynamoDBConstants.DYNAMODB_COLUMN_MAPPING, HiveDynamoDBUtil
            .toJsonString(hiveToDynamoDBSchemaMapping));
      }

      Map<String, String> hiveToDynamoDBTypeMapping = HiveDynamoDBUtil
          .getHiveToDynamoDBMapping(tableDesc.getProperties().getProperty(DynamoDBConstants
              .DYNAMODB_TYPE_MAPPING));

      if (hiveToDynamoDBSchemaMapping != null) {
        jobProperties.put(DynamoDBConstants.DYNAMODB_TYPE_MAPPING, HiveDynamoDBUtil
            .toJsonString(hiveToDynamoDBTypeMapping));
      }

      boolean hiveToDynamoDBNullSerialization = Boolean.parseBoolean(
          tableDesc.getProperties().getProperty(DynamoDBConstants.DYNAMODB_NULL_SERIALIZATION));
      jobProperties.put(DynamoDBConstants.DYNAMODB_NULL_SERIALIZATION,
          Boolean.toString(hiveToDynamoDBNullSerialization));

      if (tableDesc.getProperties().getProperty(DynamoDBConstants.THROUGHPUT_READ_PERCENT)
          != null) {
        jobProperties.put(DynamoDBConstants.THROUGHPUT_READ_PERCENT, tableDesc.getProperties()
            .getProperty(DynamoDBConstants.THROUGHPUT_READ_PERCENT));
      }

      if (tableDesc.getProperties().getProperty(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT)
          != null) {
        jobProperties.put(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, tableDesc.getProperties()
            .getProperty(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT));
      }

      if (description.getBillingModeSummary() == null
          || description.getBillingModeSummary().getBillingMode()
          .equals(DynamoDBConstants.BILLING_MODE_PROVISIONED)) {
        useExplicitThroughputIfRequired(jobProperties, tableDesc);
      } else {
        // If not specified at the table level, set default value
        jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
            .getProperty(DynamoDBConstants.READ_THROUGHPUT,
                DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()));
        jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, tableDesc.getProperties()
            .getProperty(DynamoDBConstants.WRITE_THROUGHPUT,
                DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()));
      }

      jobProperties.put(DynamoDBConstants.ITEM_COUNT, description.getItemCount().toString());
      jobProperties.put(DynamoDBConstants.TABLE_SIZE_BYTES, description.getTableSizeBytes()
          .toString());
      jobProperties.put(DynamoDBConstants.AVG_ITEM_SIZE, averageItemSize.toString());

      log.info("Average item size: " + averageItemSize);
      log.info("Item count: " + description.getItemCount());
      log.info("Table size: " + description.getTableSizeBytes());
      log.info("Read throughput: " + jobProperties.get(DynamoDBConstants.READ_THROUGHPUT));
      log.info("Write throughput: " + jobProperties.get(DynamoDBConstants.WRITE_THROUGHPUT));

    } finally {
      client.close();
    }
  }