phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java [100:164]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    void commitBatchUpdates() throws SQLException {
        conn.commit();
    }

    @Override
    public void write(InternalRow internalRow) throws IOException {
        try {
            int i=0;
            Row row = SparkJdbcUtil.toRow(encoder, internalRow);
            for (StructField field : schema.fields()) {
                DataType dataType = field.dataType();
                if (internalRow.isNullAt(i)) {
                    statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
                            PhoenixJdbcDialect$.MODULE$).jdbcNullType());
                } else {
                	SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
                }
                ++i;
            }
            numRecords++;
            statement.execute();
            // Run batch wise commits only when the batch size is positive value.
            // Otherwise commit gets called at the end of task
            if (batchSize > 0 && numRecords % batchSize == 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("commit called on a batch of size : " + batchSize);
                }
                commitBatchUpdates();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while executing Phoenix prepared statement", e);
        }
    }

    @Override
    public WriterCommitMessage commit() {
        try {
            conn.commit();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                statement.close();
                conn.close();
            }
            catch (SQLException ex) {
                throw new RuntimeException(ex);
            }
        }
        return null;
    }

    @Override
    public void abort() {
        try {
            // To rollback any ongoing transactions
            conn.rollback();
        } catch (SQLException ex) {
            throw new RuntimeException(ex);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java [90:154]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    void commitBatchUpdates() throws SQLException {
        conn.commit();
    }

    @Override
    public void write(InternalRow internalRow) throws IOException {
        try {
            int i=0;
            Row row = SparkJdbcUtil.toRow(encoder, internalRow);
            for (StructField field : schema.fields()) {
                DataType dataType = field.dataType();
                if (internalRow.isNullAt(i)) {
                    statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
                            PhoenixJdbcDialect$.MODULE$).jdbcNullType());
                } else {
                	SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
                }
                ++i;
            }
            numRecords++;
            statement.execute();
            // Run batch wise commits only when the batch size is positive value.
            // Otherwise commit gets called at the end of task
            if (batchSize > 0 && numRecords % batchSize == 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("commit called on a batch of size : " + batchSize);
                }
                commitBatchUpdates();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while executing Phoenix prepared statement", e);
        }
    }

    @Override
    public WriterCommitMessage commit() {
        try {
            conn.commit();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                statement.close();
                conn.close();
            }
            catch (SQLException ex) {
                throw new RuntimeException(ex);
            }
        }
        return null;
    }

    @Override
    public void abort() {
        try {
            // To rollback any ongoing transactions
            conn.rollback();
        } catch (SQLException ex) {
            throw new RuntimeException(ex);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



