phoenix5-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java [47:337]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class PhoenixRecordUpdater implements RecordUpdater {

    private static final Logger LOG = LoggerFactory.getLogger(PhoenixRecordUpdater.class);

    private final Connection conn;
    private final PreparedStatement pstmt;
    private final long batchSize;
    private long numRecords = 0;

    private Configuration config;
    private String tableName;
    private MetaDataClient metaDataClient;
    private boolean restoreWalMode;

    private long rowCountDelta = 0;

    private PhoenixSerializer phoenixSerializer;
    private ObjectInspector objInspector;
    private PreparedStatement pstmtForDelete;

    public PhoenixRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException {
        this.config = options.getConfiguration();
        tableName = config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);

        Properties props = new Properties();

        try {
            // Disable WAL
            String walConfigName = tableName.toLowerCase() + PhoenixStorageHandlerConstants
                    .DISABLE_WAL;
            boolean disableWal = config.getBoolean(walConfigName, false);
            if (disableWal) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(walConfigName + " is true. batch.mode will be set true.");
                }

                props.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, "true");
            }

            this.conn = PhoenixConnectionUtil.getInputConnection(config, props);

            if (disableWal) {
                metaDataClient = new MetaDataClient((PhoenixConnection) conn);

                if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
                    // execute alter tablel statement if disable_wal is not true.
                    try {
                        PhoenixUtil.alterTableForWalDisable(conn, tableName, true);
                    } catch (ConcurrentTableMutationException e) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Concurrent modification of disableWAL");
                        }
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug(tableName + "s wal disabled.");
                    }

                    // restore original value of disable_wal at the end.
                    restoreWalMode = true;
                }
            }

            this.batchSize = PhoenixConfigurationUtil.getBatchSize(config);

            if (LOG.isDebugEnabled()) {
                LOG.debug("Batch-size : " + batchSize);
            }

            String upsertQuery = QueryUtil.constructUpsertStatement(tableName, PhoenixUtil
                    .getColumnInfoList(conn, tableName));

            if (LOG.isDebugEnabled()) {
                LOG.debug("Upsert-query : " + upsertQuery);
            }
            this.pstmt = this.conn.prepareStatement(upsertQuery);
        } catch (SQLException e) {
            throw new IOException(e);
        }

        this.objInspector = options.getInspector();
        try {
            phoenixSerializer = new PhoenixSerializer(config, options.getTableProperties());
        } catch (SerDeException e) {
            throw new IOException(e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#insert(long, java.lang.Object)
     */
    @Override
    public void insert(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Insert - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.INSERT);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        write(pResultWritable);

        rowCountDelta++;
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#update(long, java.lang.Object)
     */
    @Override
    public void update(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Update - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.UPDATE);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        write(pResultWritable);
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#delete(long, java.lang.Object)
     */
    @Override
    public void delete(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Delete - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.DELETE);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        if (pstmtForDelete == null) {
            try {
                String deleteQuery = PhoenixUtil.constructDeleteStatement(conn, tableName);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Delete query : " + deleteQuery);
                }

                pstmtForDelete = conn.prepareStatement(deleteQuery);
            } catch (SQLException e) {
                throw new IOException(e);
            }
        }

        delete(pResultWritable);

        rowCountDelta--;
    }

    private void delete(PhoenixResultWritable pResultWritable) throws IOException {
        try {
            pResultWritable.delete(pstmtForDelete);
            numRecords++;
            pstmtForDelete.executeUpdate();

            if (numRecords % batchSize == 0) {
                LOG.debug("Commit called on a batch of size : " + batchSize);
                conn.commit();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while deleting to table.", e);
        }
    }

    private void write(PhoenixResultWritable pResultWritable) throws IOException {
        try {
            pResultWritable.write(pstmt);
            numRecords++;
            pstmt.executeUpdate();

            if (numRecords % batchSize == 0) {
                LOG.debug("Commit called on a batch of size : " + batchSize);
                conn.commit();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while writing to table.", e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#flush()
     */
    @Override
    public void flush() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flush called");
        }

        try {
            conn.commit();

            if (LOG.isInfoEnabled()) {
                LOG.info("Written row : " + numRecords);
            }
        } catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#close(boolean)
     */
    @Override
    public void close(boolean abort) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("abort : " + abort);
        }

        try {
            conn.commit();

            if (LOG.isInfoEnabled()) {
                LOG.info("Written row : " + numRecords);
            }
        } catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        } finally {
            try {
                if (restoreWalMode && PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
                    try {
                        PhoenixUtil.alterTableForWalDisable(conn, tableName, false);
                    } catch (ConcurrentTableMutationException e) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Concurrent modification of disableWAL");
                        }
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug(tableName + "s wal enabled.");
                    }
                }

                // flush when [table-name].auto.flush is true.
                String autoFlushConfigName = tableName.toLowerCase() +
                        PhoenixStorageHandlerConstants.AUTO_FLUSH;
                boolean autoFlush = config.getBoolean(autoFlushConfigName, false);
                if (autoFlush) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("autoFlush is " + autoFlush);
                    }

                    PhoenixUtil.flush(conn, tableName);
                }

                PhoenixUtil.closeResource(pstmt);
                PhoenixUtil.closeResource(pstmtForDelete);
                PhoenixUtil.closeResource(conn);
            } catch (SQLException ex) {
                LOG.error("SQLException while closing the connection for the task.");
                throw new IOException(ex);
            }
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#getStats()
     */
    @Override
    public SerDeStats getStats() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getStats called");
        }

        SerDeStats stats = new SerDeStats();
        stats.setRowCount(rowCountDelta);
        // Don't worry about setting raw data size diff.  There is no reasonable way  to calculate
        // that without finding the row we are updating or deleting, which would be a mess.
        return stats;
    }

    public long getBufferedRowCount() {
        return numRecords;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-hive4/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java [47:337]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class PhoenixRecordUpdater implements RecordUpdater {

    private static final Logger LOG = LoggerFactory.getLogger(PhoenixRecordUpdater.class);

    private final Connection conn;
    private final PreparedStatement pstmt;
    private final long batchSize;
    private long numRecords = 0;

    private Configuration config;
    private String tableName;
    private MetaDataClient metaDataClient;
    private boolean restoreWalMode;

    private long rowCountDelta = 0;

    private PhoenixSerializer phoenixSerializer;
    private ObjectInspector objInspector;
    private PreparedStatement pstmtForDelete;

    public PhoenixRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException {
        this.config = options.getConfiguration();
        tableName = config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);

        Properties props = new Properties();

        try {
            // Disable WAL
            String walConfigName = tableName.toLowerCase() + PhoenixStorageHandlerConstants
                    .DISABLE_WAL;
            boolean disableWal = config.getBoolean(walConfigName, false);
            if (disableWal) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(walConfigName + " is true. batch.mode will be set true.");
                }

                props.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, "true");
            }

            this.conn = PhoenixConnectionUtil.getInputConnection(config, props);

            if (disableWal) {
                metaDataClient = new MetaDataClient((PhoenixConnection) conn);

                if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
                    // execute alter tablel statement if disable_wal is not true.
                    try {
                        PhoenixUtil.alterTableForWalDisable(conn, tableName, true);
                    } catch (ConcurrentTableMutationException e) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Concurrent modification of disableWAL");
                        }
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug(tableName + "s wal disabled.");
                    }

                    // restore original value of disable_wal at the end.
                    restoreWalMode = true;
                }
            }

            this.batchSize = PhoenixConfigurationUtil.getBatchSize(config);

            if (LOG.isDebugEnabled()) {
                LOG.debug("Batch-size : " + batchSize);
            }

            String upsertQuery = QueryUtil.constructUpsertStatement(tableName, PhoenixUtil
                    .getColumnInfoList(conn, tableName));

            if (LOG.isDebugEnabled()) {
                LOG.debug("Upsert-query : " + upsertQuery);
            }
            this.pstmt = this.conn.prepareStatement(upsertQuery);
        } catch (SQLException e) {
            throw new IOException(e);
        }

        this.objInspector = options.getInspector();
        try {
            phoenixSerializer = new PhoenixSerializer(config, options.getTableProperties());
        } catch (SerDeException e) {
            throw new IOException(e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#insert(long, java.lang.Object)
     */
    @Override
    public void insert(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Insert - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.INSERT);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        write(pResultWritable);

        rowCountDelta++;
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#update(long, java.lang.Object)
     */
    @Override
    public void update(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Update - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.UPDATE);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        write(pResultWritable);
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#delete(long, java.lang.Object)
     */
    @Override
    public void delete(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Delete - currentTranscation : " + currentTransaction + ", row : " +
                    PhoenixStorageHandlerUtil.toString(row));
        }

        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
                .serialize(row, objInspector, DmlType.DELETE);

        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }

        if (pstmtForDelete == null) {
            try {
                String deleteQuery = PhoenixUtil.constructDeleteStatement(conn, tableName);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Delete query : " + deleteQuery);
                }

                pstmtForDelete = conn.prepareStatement(deleteQuery);
            } catch (SQLException e) {
                throw new IOException(e);
            }
        }

        delete(pResultWritable);

        rowCountDelta--;
    }

    private void delete(PhoenixResultWritable pResultWritable) throws IOException {
        try {
            pResultWritable.delete(pstmtForDelete);
            numRecords++;
            pstmtForDelete.executeUpdate();

            if (numRecords % batchSize == 0) {
                LOG.debug("Commit called on a batch of size : " + batchSize);
                conn.commit();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while deleting to table.", e);
        }
    }

    private void write(PhoenixResultWritable pResultWritable) throws IOException {
        try {
            pResultWritable.write(pstmt);
            numRecords++;
            pstmt.executeUpdate();

            if (numRecords % batchSize == 0) {
                LOG.debug("Commit called on a batch of size : " + batchSize);
                conn.commit();
            }
        } catch (SQLException e) {
            throw new IOException("Exception while writing to table.", e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#flush()
     */
    @Override
    public void flush() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flush called");
        }

        try {
            conn.commit();

            if (LOG.isInfoEnabled()) {
                LOG.info("Written row : " + numRecords);
            }
        } catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#close(boolean)
     */
    @Override
    public void close(boolean abort) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("abort : " + abort);
        }

        try {
            conn.commit();

            if (LOG.isInfoEnabled()) {
                LOG.info("Written row : " + numRecords);
            }
        } catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        } finally {
            try {
                if (restoreWalMode && PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
                    try {
                        PhoenixUtil.alterTableForWalDisable(conn, tableName, false);
                    } catch (ConcurrentTableMutationException e) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Concurrent modification of disableWAL");
                        }
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug(tableName + "s wal enabled.");
                    }
                }

                // flush when [table-name].auto.flush is true.
                String autoFlushConfigName = tableName.toLowerCase() +
                        PhoenixStorageHandlerConstants.AUTO_FLUSH;
                boolean autoFlush = config.getBoolean(autoFlushConfigName, false);
                if (autoFlush) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("autoFlush is " + autoFlush);
                    }

                    PhoenixUtil.flush(conn, tableName);
                }

                PhoenixUtil.closeResource(pstmt);
                PhoenixUtil.closeResource(pstmtForDelete);
                PhoenixUtil.closeResource(conn);
            } catch (SQLException ex) {
                LOG.error("SQLException while closing the connection for the task.");
                throw new IOException(ex);
            }
        }
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#getStats()
     */
    @Override
    public SerDeStats getStats() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getStats called");
        }

        SerDeStats stats = new SerDeStats();
        stats.setRowCount(rowCountDelta);
        // Don't worry about setting raw data size diff.  There is no reasonable way  to calculate
        // that without finding the row we are updating or deleting, which would be a mess.
        return stats;
    }

    public long getBufferedRowCount() {
        return numRecords;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



