void loadInput()

in c3r-sdk-core/src/main/java/com/amazonaws/c3r/action/RowMarshaller.java [230:282]


    void loadInput() {
        try {
            log.debug("Loading data from {}.", inputReader.getSourceName());
            long commitFuel = INSERTS_PER_COMMIT;
            final long startTime = System.currentTimeMillis();
            final RowWriter<T> sqlRowWriter = new SqlRowWriter<>(columnInsights, nonceHeader, sqlTable);
            List<Row<T>> batchedRows = new ArrayList<>();

            // For bulk operations, we want to explicitly commit the transaction less often for performance.
            sqlTable.getConnection().setAutoCommit(false);
            while (inputReader.hasNext()) {
                final Row<T> sourceRow = inputReader.next();
                sourceRow.forEach((column, value) -> {
                    // Observe and validate current row
                    if (sourceMappedColumnInsights.containsKey(column)) {
                        for (var columnInsight : sourceMappedColumnInsights.get(column)) {
                            checkForInvalidNullDuplicates(columnInsight, value);
                            columnInsight.observe(value);
                        }
                    }
                });

                batchedRows.add(sourceRow);

                // If batch size or end of input is met, write to SQL and reset batch.
                if (batchedRows.size() == ROW_BATCH_SIZE || !inputReader.hasNext()) {
                    writeInputBatchToSql(sqlRowWriter, batchedRows);
                    commitFuel = commitFuel - batchedRows.size();
                    batchedRows = new ArrayList<>();

                    if (commitFuel <= 0) {
                        sqlTable.getConnection().commit();
                        commitFuel = INSERTS_PER_COMMIT;
                    }
                }

                if (inputReader.getReadRowCount() % LOG_ROW_UPDATE_FREQUENCY == 0) {
                    log.info("{} rows loaded.", inputReader.getReadRowCount());
                }
            }
            sqlTable.getConnection().commit();
            // We've completed our bulk insert, so turn autocommit back on
            // so any future one-off commands execute immediately.
            sqlTable.getConnection().setAutoCommit(true);
            final long endTime = System.currentTimeMillis();
            log.debug("Done loading {} rows in {} seconds.", inputReader.getReadRowCount(),
                    TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));

            checkForInvalidDuplicates();
        } catch (SQLException e) {
            throw new C3rRuntimeException("Error accessing the SQL database.", e);
        }
    }