in plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java [66:545]
private synchronized void lookupValues(Object[] row) throws HopException {
// get operation for the current
// do we insert, update or delete ?
String operation = data.inputRowMeta.getString(row, data.indexOfOperationOrderField);
boolean rowIsSafe = false;
boolean sendToErrorRow = false;
String errorMessage = null;
int[] updateCounts = null;
List<Exception> exceptionsList = null;
boolean batchProblem = false;
data.lookupFailure = false;
boolean performInsert = false;
boolean performUpdate = false;
boolean performDelete = false;
boolean lineSkipped = false;
try {
if (operation == null) {
throw new HopException(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Log.OperationFieldEmpty",
meta.getOperationOrderField()));
}
if (meta.isTableNameInField()) {
// get dynamic table name
data.realTableName = data.inputRowMeta.getString(row, data.indexOfTableNameField);
if (Utils.isEmpty(data.realTableName)) {
throw new HopTransformException("The name of the table is not specified!");
}
data.realSchemaTable =
data.db
.getDatabaseMeta()
.getQuotedSchemaTableCombination(this, data.realSchemaName, data.realTableName);
}
if (operation.equals(data.insertValue)) {
// directly insert data into table
/*
*
* INSERT ROW
*/
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(PKG, "SynchronizeAfterMerge.InsertRow", Arrays.toString(row)));
}
// The values to insert are those in the update section
//
Object[] insertRowData = new Object[data.valuenrs.length];
for (int i = 0; i < data.valuenrs.length; i++) {
insertRowData[i] = row[data.valuenrs[i]];
}
if (meta.isTableNameInField()) {
data.insertStatement = data.preparedStatements.get(data.realSchemaTable + CONST_INSERT);
if (data.insertStatement == null) {
String sql =
data.db.getInsertStatement(
data.realSchemaName, data.realTableName, data.insertRowMeta);
if (isDebug()) {
logDebug("Preparation of the insert SQL statement: " + sql);
}
data.insertStatement = data.db.prepareSql(sql);
data.preparedStatements.put(data.realSchemaTable + CONST_INSERT, data.insertStatement);
}
}
// For PG & GP, we add a savepoint before the row.
// Then revert to the savepoint afterwards... (not a transaction, so hopefully still fast)
//
if (data.specialErrorHandling && data.supportsSavepoints) {
data.savepoint = data.db.setSavepoint();
}
// Set the values on the prepared statement...
data.db.setValues(data.insertRowMeta, insertRowData, data.insertStatement);
data.db.insertRow(data.insertStatement, data.batchMode);
performInsert = true;
if (!data.batchMode) {
incrementLinesOutput();
}
if (isRowLevel()) {
logRowlevel("Written row: " + data.insertRowMeta.getString(insertRowData));
}
} else {
Object[] lookupRow = new Object[data.keynrs.length];
int lookupIndex = 0;
for (int i = 0; i < meta.getKeyStream().length; i++) {
if (data.keynrs[i] >= 0) {
lookupRow[lookupIndex] = row[data.keynrs[i]];
lookupIndex++;
}
if (data.keynrs2[i] >= 0) {
lookupRow[lookupIndex] = row[data.keynrs2[i]];
lookupIndex++;
}
}
boolean updateorDelete = false;
if (meta.isPerformLookup()) {
// LOOKUP
if (meta.isTableNameInField()) {
// Prepare Lookup statement
data.lookupStatement = data.preparedStatements.get(data.realSchemaTable + CONST_LOOKUP);
if (data.lookupStatement == null) {
String sql = getLookupStatement(data.inputRowMeta);
if (isDebug()) {
logDebug("Preparating SQL for insert: " + sql);
}
data.lookupStatement = data.db.prepareSql(sql);
data.preparedStatements.put(
data.realSchemaTable + CONST_LOOKUP, data.lookupStatement);
}
}
data.db.setValues(data.lookupParameterRowMeta, lookupRow, data.lookupStatement);
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Log.ValuesSetForLookup",
data.lookupParameterRowMeta.getString(lookupRow)));
}
Object[] add = data.db.getLookup(data.lookupStatement);
incrementLinesInput();
if (add == null) {
// nothing was found:
if (data.stringErrorKeyNotFound == null) {
data.stringErrorKeyNotFound =
BaseMessages.getString(PKG, "SynchronizeAfterMerge.Exception.KeyCouldNotFound")
+ data.lookupParameterRowMeta.getString(lookupRow);
data.stringFieldnames = "";
for (int i = 0; i < data.lookupParameterRowMeta.size(); i++) {
if (i > 0) {
data.stringFieldnames += ", ";
}
data.stringFieldnames += data.lookupParameterRowMeta.getValueMeta(i).getName();
}
}
data.lookupFailure = true;
throw new HopDatabaseException(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Exception.KeyCouldNotFound",
data.lookupParameterRowMeta.getString(lookupRow)));
} else {
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Log.FoundRowForUpdate",
data.insertRowMeta.getString(row)));
}
for (int i = 0; i < data.valuenrs.length; i++) {
if (meta.getUpdate()[i].booleanValue()) {
IValueMeta valueMeta = data.inputRowMeta.getValueMeta(data.valuenrs[i]);
IValueMeta retMeta = data.db.getReturnRowMeta().getValueMeta(i);
Object rowvalue = row[data.valuenrs[i]];
Object retvalue = add[i];
if (valueMeta.compare(rowvalue, retMeta, retvalue) != 0) {
updateorDelete = true;
}
}
}
}
} // end if perform lookup
if (operation.equals(data.updateValue)) {
if (!meta.isPerformLookup() || updateorDelete) {
// UPDATE :
if (meta.isTableNameInField()) {
data.updateStatement =
data.preparedStatements.get(data.realSchemaTable + CONST_UPDATE);
if (data.updateStatement == null) {
String sql = getUpdateStatement(data.inputRowMeta);
data.updateStatement = data.db.prepareSql(sql);
data.preparedStatements.put(
data.realSchemaTable + CONST_UPDATE, data.updateStatement);
if (isDebug()) {
logDebug("Preparation of the Update SQL statement : " + sql);
}
}
}
// Create the update row...
Object[] updateRow = new Object[data.updateParameterRowMeta.size()];
int j = 0;
for (int i = 0; i < data.valuenrs.length; i++) {
if (meta.getUpdate()[i].booleanValue()) {
updateRow[j] = row[data.valuenrs[i]]; // the setters
j++;
}
}
// add the where clause parameters, they are exactly the same for lookup and update
for (int i = 0; i < lookupRow.length; i++) {
updateRow[j + i] = lookupRow[i];
}
// For PG & GP, we add a savepoint before the row.
// Then revert to the savepoint afterwards... (not a transaction, so hopefully still
// fast)
//
if (data.specialErrorHandling && data.supportsSavepoints) {
data.savepoint = data.db.setSavepoint();
}
data.db.setValues(data.updateParameterRowMeta, updateRow, data.updateStatement);
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Log.SetValuesForUpdate",
data.updateParameterRowMeta.getString(updateRow),
data.inputRowMeta.getString(row)));
}
data.db.insertRow(data.updateStatement, data.batchMode);
performUpdate = true;
incrementLinesUpdated();
} else {
// end if operation update
incrementLinesSkipped();
lineSkipped = true;
}
} else if (operation.equals(data.deleteValue)) {
// DELETE
if (meta.isTableNameInField()) {
data.deleteStatement = data.preparedStatements.get(data.realSchemaTable + CONST_DELETE);
if (data.deleteStatement == null) {
String sql = getDeleteStatement(data.inputRowMeta);
data.deleteStatement = data.db.prepareSql(sql);
data.preparedStatements.put(
data.realSchemaTable + CONST_DELETE, data.deleteStatement);
if (isDebug()) {
logDebug("Preparation of the Delete SQL statement : " + sql);
}
}
}
Object[] deleteRow = new Object[data.deleteParameterRowMeta.size()];
int deleteIndex = 0;
for (int i = 0; i < meta.getKeyStream().length; i++) {
if (data.keynrs[i] >= 0) {
deleteRow[deleteIndex] = row[data.keynrs[i]];
deleteIndex++;
}
if (data.keynrs2[i] >= 0) {
deleteRow[deleteIndex] = row[data.keynrs2[i]];
deleteIndex++;
}
}
// For PG & GP, we add a savepoint before the row.
// Then revert to the savepoint afterwards... (not a transaction, so hopefully still fast)
//
if (data.specialErrorHandling && data.supportsSavepoints) {
data.savepoint = data.db.setSavepoint();
}
data.db.setValues(data.deleteParameterRowMeta, deleteRow, data.deleteStatement);
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG,
"SynchronizeAfterMerge.Log.SetValuesForDelete",
data.deleteParameterRowMeta.getString(deleteRow),
data.inputRowMeta.getString(row)));
}
data.db.insertRow(data.deleteStatement, data.batchMode);
performDelete = true;
incrementLinesUpdated();
} else {
// endif operation delete
incrementLinesSkipped();
lineSkipped = true;
}
} // endif operation insert
// If we skip a line we need to empty the buffer and skip the line in question.
// The skipped line is never added to the buffer!
//
if (performInsert
|| performUpdate
|| performDelete
|| (!data.batchBuffer.isEmpty() && lineSkipped)) {
// Get a commit counter per prepared statement to keep track of separate tables, etc.
//
String tableName = data.realSchemaTable;
if (performInsert) {
tableName += CONST_INSERT;
} else if (performUpdate) {
tableName += CONST_UPDATE;
}
if (performDelete) {
tableName += CONST_DELETE;
}
Integer commitCounter = data.commitCounterMap.get(tableName);
if (commitCounter == null) {
commitCounter = Integer.valueOf(0);
}
data.commitCounterMap.put(tableName, Integer.valueOf(commitCounter.intValue() + 1));
// Release the savepoint if needed
//
if (data.specialErrorHandling && data.supportsSavepoints && data.releaseSavepoint) {
data.db.releaseSavepoint(data.savepoint);
}
// Perform a commit if needed
//
if (commitCounter > 0 && (commitCounter % data.commitSize) == 0) {
if (data.batchMode) {
try {
if (performInsert) {
data.insertStatement.executeBatch();
data.db.commit();
data.insertStatement.clearBatch();
} else if (performUpdate) {
data.updateStatement.executeBatch();
data.db.commit();
data.updateStatement.clearBatch();
} else if (performDelete) {
data.deleteStatement.executeBatch();
data.db.commit();
data.deleteStatement.clearBatch();
}
} catch (SQLException ex) {
throw Database.createHopDatabaseBatchException(
BaseMessages.getString(PKG, "SynchronizeAfterMerge.Error.UpdatingBatch"), ex);
} catch (Exception ex) {
throw new HopDatabaseException("Unexpected error inserting row", ex);
}
} else {
// insertRow normal commit
data.db.commit();
}
// Clear the batch/commit counter...
//
data.commitCounterMap.put(tableName, Integer.valueOf(0));
rowIsSafe = true;
} else {
rowIsSafe = false;
}
}
} catch (HopDatabaseBatchException be) {
errorMessage = be.toString();
batchProblem = true;
sendToErrorRow = true;
updateCounts = be.getUpdateCounts();
exceptionsList = be.getExceptionsList();
if (data.insertStatement != null) {
data.db.clearBatch(data.insertStatement);
}
if (data.updateStatement != null) {
data.db.clearBatch(data.updateStatement);
}
if (data.deleteStatement != null) {
data.db.clearBatch(data.deleteStatement);
}
if (getTransformMeta().isDoingErrorHandling()) {
data.db.commit(true);
} else {
data.db.rollback();
StringBuilder msg =
new StringBuilder(
"Error batch inserting rows into table [" + data.realTableName + "].");
msg.append(Const.CR);
msg.append("Errors encountered (first 10):").append(Const.CR);
for (int x = 0; x < be.getExceptionsList().size() && x < 10; x++) {
Exception exception = be.getExceptionsList().get(x);
if (exception.getMessage() != null) {
msg.append(exception.getMessage()).append(Const.CR);
}
}
throw new HopException(msg.toString(), be);
}
} catch (HopDatabaseException dbe) {
if (getTransformMeta().isDoingErrorHandling()) {
if (isRowLevel()) {
logRowlevel("Written row to error handling : " + getInputRowMeta().getString(row));
}
if (data.specialErrorHandling
&& data.supportsSavepoints
&& (data.savepoint != null || !data.lookupFailure)) {
// do this when savepoint was set, and this is not lookup failure
data.db.rollback(data.savepoint);
if (data.releaseSavepoint) {
data.db.releaseSavepoint(data.savepoint);
}
}
sendToErrorRow = true;
errorMessage = dbe.toString();
} else {
setErrors(getErrors() + 1);
data.db.rollback();
throw new HopException(
"Error inserting row into table ["
+ data.realTableName
+ "] with values: "
+ data.inputRowMeta.getString(row),
dbe);
}
}
if (data.batchMode) {
if (sendToErrorRow) {
if (batchProblem) {
data.batchBuffer.add(row);
processBatchException(errorMessage, updateCounts, exceptionsList);
} else {
// Simply add this row to the error row
putError(data.inputRowMeta, row, 1L, errorMessage, null, "SUYNC002");
}
} else {
if (!lineSkipped) {
data.batchBuffer.add(row);
}
if (rowIsSafe) { // A commit was done and the rows are all safe (no error)
for (int i = 0; i < data.batchBuffer.size(); i++) {
Object[] rowb = data.batchBuffer.get(i);
putRow(data.outputRowMeta, rowb);
if (data.inputRowMeta
.getString(rowb, data.indexOfOperationOrderField)
.equals(data.insertValue)) {
incrementLinesOutput();
}
}
// Clear the buffer
data.batchBuffer.clear();
}
// Don't forget to pass this line to the following transforms
//
if (lineSkipped) {
putRow(data.outputRowMeta, row);
}
}
} else {
if (sendToErrorRow) {
if (data.lookupFailure) {
putError(
data.inputRowMeta,
row,
1,
data.stringErrorKeyNotFound,
data.stringFieldnames,
"SUYNC001");
} else {
putError(data.inputRowMeta, row, 1, errorMessage, null, "SUYNC001");
}
}
}
}