in phoenix5-hive4/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordWriter.java [104:156]
private void initialize(Configuration config, Properties properties) throws SQLException {
this.config = config;
tableName = config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
// Disable WAL
String walConfigName = tableName.toLowerCase() + PhoenixStorageHandlerConstants.DISABLE_WAL;
boolean disableWal = config.getBoolean(walConfigName, false);
if (disableWal) {
if (LOG.isDebugEnabled()) {
LOG.debug("Property " + walConfigName + " is true. batch.mode will be set true. ");
}
properties.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, "true");
}
this.conn = PhoenixConnectionUtil.getInputConnection(config, properties);
if (disableWal) {
metaDataClient = new MetaDataClient((PhoenixConnection) conn);
if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
// execute alter tablel statement if disable_wal is not true.
try {
PhoenixUtil.alterTableForWalDisable(conn, tableName, true);
} catch (ConcurrentTableMutationException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Another mapper or task processing wal disable");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(tableName + "s wal disabled.");
}
// restore original value of disable_wal at the end.
restoreWalMode = true;
}
}
this.batchSize = PhoenixConfigurationUtil.getBatchSize(config);
if (LOG.isDebugEnabled()) {
LOG.debug("Batch-size : " + batchSize);
}
String upsertQuery = QueryUtil.constructUpsertStatement(tableName, PhoenixUtil
.getColumnInfoList(conn, tableName));
if (LOG.isDebugEnabled()) {
LOG.debug("Upsert-query : " + upsertQuery);
}
this.pstmt = this.conn.prepareStatement(upsertQuery);
}