in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregatorUtils.java [142:194]
public static String getRedshiftCopyCommand(
final AmazonDynamoDB dynamoClient, String redshiftTableName,
String dynamoTable) throws Exception {
LOG.info("Generating Redshift Copy Command");
StringBuffer sb = new StringBuffer();
// generate the create table statement
sb.append(String.format("CREATE TABLE %S(\n", redshiftTableName));
int i = 0;
List<String> tableStructure = DynamoUtils.getDictionaryEntry(
dynamoClient, dynamoTable);
String columnSpec = null;
String dataType = null;
for (String s : tableStructure) {
i++;
switch (s) {
case StreamAggregator.LAST_WRITE_SEQ:
dataType = "BIGINT";
break;
case StreamAggregator.LAST_WRITE_TIME:
dataType = "TIMESTAMP";
break;
case StreamAggregator.EVENT_COUNT:
dataType = "INT";
break;
default:
if (s.contains("-SUM") || s.contains("-MIN")
|| s.contains("-MAX")) {
dataType = "INT";
} else {
dataType = "VARCHAR(1000)";
}
break;
}
;
columnSpec = s + " " + dataType;
if (i == tableStructure.size()) {
sb.append(columnSpec);
} else {
sb.append(columnSpec + ",");
}
}
sb.append(");\n\n");
// generate the copy command
sb.append(String
.format("copy %s from 'dynamodb://%s' credentials 'aws_access_key_id=<Your-Access-Key-ID>;aws_secret_access_key=<Your-Secret-Access-Key>' readratio 50 timeformat 'yyyy-MM-dd hh:mi:ss';",
redshiftTableName, dynamoTable, rsTimeformat));
return sb.toString();
}