in c3r-sdk-core/src/main/java/com/amazonaws/c3r/action/RowMarshaller.java [230:282]
void loadInput() {
try {
log.debug("Loading data from {}.", inputReader.getSourceName());
long commitFuel = INSERTS_PER_COMMIT;
final long startTime = System.currentTimeMillis();
final RowWriter<T> sqlRowWriter = new SqlRowWriter<>(columnInsights, nonceHeader, sqlTable);
List<Row<T>> batchedRows = new ArrayList<>();
// For bulk operations, we want to explicitly commit the transaction less often for performance.
sqlTable.getConnection().setAutoCommit(false);
while (inputReader.hasNext()) {
final Row<T> sourceRow = inputReader.next();
sourceRow.forEach((column, value) -> {
// Observe and validate current row
if (sourceMappedColumnInsights.containsKey(column)) {
for (var columnInsight : sourceMappedColumnInsights.get(column)) {
checkForInvalidNullDuplicates(columnInsight, value);
columnInsight.observe(value);
}
}
});
batchedRows.add(sourceRow);
// If batch size or end of input is met, write to SQL and reset batch.
if (batchedRows.size() == ROW_BATCH_SIZE || !inputReader.hasNext()) {
writeInputBatchToSql(sqlRowWriter, batchedRows);
commitFuel = commitFuel - batchedRows.size();
batchedRows = new ArrayList<>();
if (commitFuel <= 0) {
sqlTable.getConnection().commit();
commitFuel = INSERTS_PER_COMMIT;
}
}
if (inputReader.getReadRowCount() % LOG_ROW_UPDATE_FREQUENCY == 0) {
log.info("{} rows loaded.", inputReader.getReadRowCount());
}
}
sqlTable.getConnection().commit();
// We've completed our bulk insert, so turn autocommit back on
// so any future one-off commands execute immediately.
sqlTable.getConnection().setAutoCommit(true);
final long endTime = System.currentTimeMillis();
log.debug("Done loading {} rows in {} seconds.", inputReader.getReadRowCount(),
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
checkForInvalidDuplicates();
} catch (SQLException e) {
throw new C3rRuntimeException("Error accessing the SQL database.", e);
}
}