private synchronized void lookupValues()

in plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java [66:545]


  private synchronized void lookupValues(Object[] row) throws HopException {

    // get operation for the current
    // do we insert, update or delete ?
    String operation = data.inputRowMeta.getString(row, data.indexOfOperationOrderField);

    boolean rowIsSafe = false;
    boolean sendToErrorRow = false;
    String errorMessage = null;
    int[] updateCounts = null;
    List<Exception> exceptionsList = null;
    boolean batchProblem = false;

    data.lookupFailure = false;
    boolean performInsert = false;
    boolean performUpdate = false;
    boolean performDelete = false;
    boolean lineSkipped = false;

    try {
      if (operation == null) {
        throw new HopException(
            BaseMessages.getString(
                PKG,
                "SynchronizeAfterMerge.Log.OperationFieldEmpty",
                meta.getOperationOrderField()));
      }

      if (meta.isTableNameInField()) {
        // get dynamic table name
        data.realTableName = data.inputRowMeta.getString(row, data.indexOfTableNameField);
        if (Utils.isEmpty(data.realTableName)) {
          throw new HopTransformException("The name of the table is not specified!");
        }
        data.realSchemaTable =
            data.db
                .getDatabaseMeta()
                .getQuotedSchemaTableCombination(this, data.realSchemaName, data.realTableName);
      }

      if (operation.equals(data.insertValue)) {
        // directly insert data into table
        /*
         *
         * INSERT ROW
         */

        if (isRowLevel()) {
          logRowlevel(
              BaseMessages.getString(PKG, "SynchronizeAfterMerge.InsertRow", Arrays.toString(row)));
        }

        // The values to insert are those in the update section
        //
        Object[] insertRowData = new Object[data.valuenrs.length];
        for (int i = 0; i < data.valuenrs.length; i++) {
          insertRowData[i] = row[data.valuenrs[i]];
        }

        if (meta.isTableNameInField()) {
          data.insertStatement = data.preparedStatements.get(data.realSchemaTable + CONST_INSERT);
          if (data.insertStatement == null) {
            String sql =
                data.db.getInsertStatement(
                    data.realSchemaName, data.realTableName, data.insertRowMeta);

            if (isDebug()) {
              logDebug("Preparation of the insert SQL statement: " + sql);
            }

            data.insertStatement = data.db.prepareSql(sql);
            data.preparedStatements.put(data.realSchemaTable + CONST_INSERT, data.insertStatement);
          }
        }

        // For PG & GP, we add a savepoint before the row.
        // Then revert to the savepoint afterwards... (not a transaction, so hopefully still fast)
        //
        if (data.specialErrorHandling && data.supportsSavepoints) {
          data.savepoint = data.db.setSavepoint();
        }

        // Set the values on the prepared statement...
        data.db.setValues(data.insertRowMeta, insertRowData, data.insertStatement);
        data.db.insertRow(data.insertStatement, data.batchMode);
        performInsert = true;
        if (!data.batchMode) {
          incrementLinesOutput();
        }
        if (isRowLevel()) {
          logRowlevel("Written row: " + data.insertRowMeta.getString(insertRowData));
        }

      } else {

        Object[] lookupRow = new Object[data.keynrs.length];
        int lookupIndex = 0;
        for (int i = 0; i < meta.getKeyStream().length; i++) {
          if (data.keynrs[i] >= 0) {
            lookupRow[lookupIndex] = row[data.keynrs[i]];
            lookupIndex++;
          }
          if (data.keynrs2[i] >= 0) {
            lookupRow[lookupIndex] = row[data.keynrs2[i]];
            lookupIndex++;
          }
        }
        boolean updateorDelete = false;
        if (meta.isPerformLookup()) {

          // LOOKUP

          if (meta.isTableNameInField()) {
            // Prepare Lookup statement
            data.lookupStatement = data.preparedStatements.get(data.realSchemaTable + CONST_LOOKUP);
            if (data.lookupStatement == null) {
              String sql = getLookupStatement(data.inputRowMeta);

              if (isDebug()) {
                logDebug("Preparating SQL for insert: " + sql);
              }

              data.lookupStatement = data.db.prepareSql(sql);
              data.preparedStatements.put(
                  data.realSchemaTable + CONST_LOOKUP, data.lookupStatement);
            }
          }

          data.db.setValues(data.lookupParameterRowMeta, lookupRow, data.lookupStatement);
          if (isRowLevel()) {
            logRowlevel(
                BaseMessages.getString(
                    PKG,
                    "SynchronizeAfterMerge.Log.ValuesSetForLookup",
                    data.lookupParameterRowMeta.getString(lookupRow)));
          }
          Object[] add = data.db.getLookup(data.lookupStatement);
          incrementLinesInput();

          if (add == null) {
            // nothing was found:

            if (data.stringErrorKeyNotFound == null) {
              data.stringErrorKeyNotFound =
                  BaseMessages.getString(PKG, "SynchronizeAfterMerge.Exception.KeyCouldNotFound")
                      + data.lookupParameterRowMeta.getString(lookupRow);
              data.stringFieldnames = "";
              for (int i = 0; i < data.lookupParameterRowMeta.size(); i++) {
                if (i > 0) {
                  data.stringFieldnames += ", ";
                }
                data.stringFieldnames += data.lookupParameterRowMeta.getValueMeta(i).getName();
              }
            }
            data.lookupFailure = true;
            throw new HopDatabaseException(
                BaseMessages.getString(
                    PKG,
                    "SynchronizeAfterMerge.Exception.KeyCouldNotFound",
                    data.lookupParameterRowMeta.getString(lookupRow)));
          } else {
            if (isRowLevel()) {
              logRowlevel(
                  BaseMessages.getString(
                      PKG,
                      "SynchronizeAfterMerge.Log.FoundRowForUpdate",
                      data.insertRowMeta.getString(row)));
            }

            for (int i = 0; i < data.valuenrs.length; i++) {
              if (meta.getUpdate()[i].booleanValue()) {
                IValueMeta valueMeta = data.inputRowMeta.getValueMeta(data.valuenrs[i]);
                IValueMeta retMeta = data.db.getReturnRowMeta().getValueMeta(i);

                Object rowvalue = row[data.valuenrs[i]];
                Object retvalue = add[i];

                if (valueMeta.compare(rowvalue, retMeta, retvalue) != 0) {
                  updateorDelete = true;
                }
              }
            }
          }
        } // end if perform lookup

        if (operation.equals(data.updateValue)) {
          if (!meta.isPerformLookup() || updateorDelete) {
            // UPDATE :

            if (meta.isTableNameInField()) {
              data.updateStatement =
                  data.preparedStatements.get(data.realSchemaTable + CONST_UPDATE);
              if (data.updateStatement == null) {
                String sql = getUpdateStatement(data.inputRowMeta);

                data.updateStatement = data.db.prepareSql(sql);
                data.preparedStatements.put(
                    data.realSchemaTable + CONST_UPDATE, data.updateStatement);
                if (isDebug()) {
                  logDebug("Preparation of the Update SQL statement : " + sql);
                }
              }
            }

            // Create the update row...
            Object[] updateRow = new Object[data.updateParameterRowMeta.size()];
            int j = 0;
            for (int i = 0; i < data.valuenrs.length; i++) {
              if (meta.getUpdate()[i].booleanValue()) {
                updateRow[j] = row[data.valuenrs[i]]; // the setters
                j++;
              }
            }

            // add the where clause parameters, they are exactly the same for lookup and update
            for (int i = 0; i < lookupRow.length; i++) {
              updateRow[j + i] = lookupRow[i];
            }

            // For PG & GP, we add a savepoint before the row.
            // Then revert to the savepoint afterwards... (not a transaction, so hopefully still
            // fast)
            //
            if (data.specialErrorHandling && data.supportsSavepoints) {
              data.savepoint = data.db.setSavepoint();
            }
            data.db.setValues(data.updateParameterRowMeta, updateRow, data.updateStatement);
            if (isRowLevel()) {
              logRowlevel(
                  BaseMessages.getString(
                      PKG,
                      "SynchronizeAfterMerge.Log.SetValuesForUpdate",
                      data.updateParameterRowMeta.getString(updateRow),
                      data.inputRowMeta.getString(row)));
            }
            data.db.insertRow(data.updateStatement, data.batchMode);
            performUpdate = true;
            incrementLinesUpdated();

          } else {
            // end if operation update
            incrementLinesSkipped();
            lineSkipped = true;
          }
        } else if (operation.equals(data.deleteValue)) {
          // DELETE

          if (meta.isTableNameInField()) {
            data.deleteStatement = data.preparedStatements.get(data.realSchemaTable + CONST_DELETE);

            if (data.deleteStatement == null) {
              String sql = getDeleteStatement(data.inputRowMeta);
              data.deleteStatement = data.db.prepareSql(sql);
              data.preparedStatements.put(
                  data.realSchemaTable + CONST_DELETE, data.deleteStatement);
              if (isDebug()) {
                logDebug("Preparation of the Delete SQL statement : " + sql);
              }
            }
          }

          Object[] deleteRow = new Object[data.deleteParameterRowMeta.size()];
          int deleteIndex = 0;

          for (int i = 0; i < meta.getKeyStream().length; i++) {
            if (data.keynrs[i] >= 0) {
              deleteRow[deleteIndex] = row[data.keynrs[i]];
              deleteIndex++;
            }
            if (data.keynrs2[i] >= 0) {
              deleteRow[deleteIndex] = row[data.keynrs2[i]];
              deleteIndex++;
            }
          }

          // For PG & GP, we add a savepoint before the row.
          // Then revert to the savepoint afterwards... (not a transaction, so hopefully still fast)
          //
          if (data.specialErrorHandling && data.supportsSavepoints) {
            data.savepoint = data.db.setSavepoint();
          }
          data.db.setValues(data.deleteParameterRowMeta, deleteRow, data.deleteStatement);
          if (isRowLevel()) {
            logRowlevel(
                BaseMessages.getString(
                    PKG,
                    "SynchronizeAfterMerge.Log.SetValuesForDelete",
                    data.deleteParameterRowMeta.getString(deleteRow),
                    data.inputRowMeta.getString(row)));
          }
          data.db.insertRow(data.deleteStatement, data.batchMode);
          performDelete = true;
          incrementLinesUpdated();
        } else {
          // endif operation delete
          incrementLinesSkipped();
          lineSkipped = true;
        }
      } // endif operation insert

      // If we skip a line we need to empty the buffer and skip the line in question.
      // The skipped line is never added to the buffer!
      //
      if (performInsert
          || performUpdate
          || performDelete
          || (!data.batchBuffer.isEmpty() && lineSkipped)) {
        // Get a commit counter per prepared statement to keep track of separate tables, etc.
        //
        String tableName = data.realSchemaTable;
        if (performInsert) {
          tableName += CONST_INSERT;
        } else if (performUpdate) {
          tableName += CONST_UPDATE;
        }
        if (performDelete) {
          tableName += CONST_DELETE;
        }

        Integer commitCounter = data.commitCounterMap.get(tableName);
        if (commitCounter == null) {
          commitCounter = Integer.valueOf(0);
        }
        data.commitCounterMap.put(tableName, Integer.valueOf(commitCounter.intValue() + 1));

        // Release the savepoint if needed
        //
        if (data.specialErrorHandling && data.supportsSavepoints && data.releaseSavepoint) {
          data.db.releaseSavepoint(data.savepoint);
        }

        // Perform a commit if needed
        //
        if (commitCounter > 0 && (commitCounter % data.commitSize) == 0) {
          if (data.batchMode) {
            try {
              if (performInsert) {
                data.insertStatement.executeBatch();
                data.db.commit();
                data.insertStatement.clearBatch();
              } else if (performUpdate) {
                data.updateStatement.executeBatch();
                data.db.commit();
                data.updateStatement.clearBatch();
              } else if (performDelete) {
                data.deleteStatement.executeBatch();
                data.db.commit();
                data.deleteStatement.clearBatch();
              }
            } catch (SQLException ex) {
              throw Database.createHopDatabaseBatchException(
                  BaseMessages.getString(PKG, "SynchronizeAfterMerge.Error.UpdatingBatch"), ex);
            } catch (Exception ex) {
              throw new HopDatabaseException("Unexpected error inserting row", ex);
            }
          } else {
            // insertRow normal commit
            data.db.commit();
          }
          // Clear the batch/commit counter...
          //
          data.commitCounterMap.put(tableName, Integer.valueOf(0));
          rowIsSafe = true;
        } else {
          rowIsSafe = false;
        }
      }
    } catch (HopDatabaseBatchException be) {
      errorMessage = be.toString();
      batchProblem = true;
      sendToErrorRow = true;
      updateCounts = be.getUpdateCounts();
      exceptionsList = be.getExceptionsList();

      if (data.insertStatement != null) {
        data.db.clearBatch(data.insertStatement);
      }
      if (data.updateStatement != null) {
        data.db.clearBatch(data.updateStatement);
      }
      if (data.deleteStatement != null) {
        data.db.clearBatch(data.deleteStatement);
      }

      if (getTransformMeta().isDoingErrorHandling()) {
        data.db.commit(true);
      } else {
        data.db.rollback();
        StringBuilder msg =
            new StringBuilder(
                "Error batch inserting rows into table [" + data.realTableName + "].");
        msg.append(Const.CR);
        msg.append("Errors encountered (first 10):").append(Const.CR);
        for (int x = 0; x < be.getExceptionsList().size() && x < 10; x++) {
          Exception exception = be.getExceptionsList().get(x);
          if (exception.getMessage() != null) {
            msg.append(exception.getMessage()).append(Const.CR);
          }
        }
        throw new HopException(msg.toString(), be);
      }
    } catch (HopDatabaseException dbe) {
      if (getTransformMeta().isDoingErrorHandling()) {
        if (isRowLevel()) {
          logRowlevel("Written row to error handling : " + getInputRowMeta().getString(row));
        }

        if (data.specialErrorHandling
            && data.supportsSavepoints
            && (data.savepoint != null || !data.lookupFailure)) {
          // do this when savepoint was set, and this is not lookup failure
          data.db.rollback(data.savepoint);
          if (data.releaseSavepoint) {
            data.db.releaseSavepoint(data.savepoint);
          }
        }
        sendToErrorRow = true;
        errorMessage = dbe.toString();
      } else {
        setErrors(getErrors() + 1);
        data.db.rollback();
        throw new HopException(
            "Error inserting row into table ["
                + data.realTableName
                + "] with values: "
                + data.inputRowMeta.getString(row),
            dbe);
      }
    }

    if (data.batchMode) {
      if (sendToErrorRow) {
        if (batchProblem) {
          data.batchBuffer.add(row);
          processBatchException(errorMessage, updateCounts, exceptionsList);
        } else {
          // Simply add this row to the error row
          putError(data.inputRowMeta, row, 1L, errorMessage, null, "SUYNC002");
        }
      } else {
        if (!lineSkipped) {
          data.batchBuffer.add(row);
        }

        if (rowIsSafe) { // A commit was done and the rows are all safe (no error)
          for (int i = 0; i < data.batchBuffer.size(); i++) {
            Object[] rowb = data.batchBuffer.get(i);
            putRow(data.outputRowMeta, rowb);
            if (data.inputRowMeta
                .getString(rowb, data.indexOfOperationOrderField)
                .equals(data.insertValue)) {
              incrementLinesOutput();
            }
          }
          // Clear the buffer
          data.batchBuffer.clear();
        }

        // Don't forget to pass this line to the following transforms
        //
        if (lineSkipped) {
          putRow(data.outputRowMeta, row);
        }
      }
    } else {
      if (sendToErrorRow) {
        if (data.lookupFailure) {
          putError(
              data.inputRowMeta,
              row,
              1,
              data.stringErrorKeyNotFound,
              data.stringFieldnames,
              "SUYNC001");
        } else {
          putError(data.inputRowMeta, row, 1, errorMessage, null, "SUYNC001");
        }
      }
    }
  }