in plugins/transforms/dimensionlookup/src/main/java/org/apache/hop/pipeline/transforms/dimensionlookup/DimensionLookup.java [959:1334]
public Long dimInsert(
IRowMeta inputRowMeta,
Object[] row,
Long technicalKey,
boolean newEntry,
Long versionNr,
Date dateFrom,
Date dateTo)
throws HopException {
DLFields f = meta.getFields();
if (data.prepStatementInsert == null
&& data.prepStatementUpdate == null) { // first time: construct prepared statement
IRowMeta insertRowMeta = new RowMeta();
/*
* Construct the SQL statement...
*
* INSERT INTO d_customer(keyfield, versionfield, datefrom, dateto, key[], fieldlookup[], last_updated,
* last_inserted, last_version) VALUES (val_key ,val_version , val_datfrom, val_datto, keynrs[], fieldnrs[],
* last_updated, last_inserted, last_version)
*/
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ").append(data.schemaTable).append("( ");
if (!isAutoIncrement()) {
// NO AUTOINCREMENT
sql.append(data.databaseMeta.quoteField(f.getReturns().getKeyField())).append(", ");
insertRowMeta.addValueMeta(
data.outputRowMeta.getValueMeta(inputRowMeta.size())); // the first return value
// after the input
} else {
if (data.databaseMeta.needsPlaceHolder()) {
sql.append("0, "); // placeholder on informix!
}
}
sql.append(data.databaseMeta.quoteField(f.getReturns().getVersionField()))
.append(", ")
.append(data.databaseMeta.quoteField(f.getDate().getFrom()))
.append(", ")
.append(data.databaseMeta.quoteField(f.getDate().getTo()));
insertRowMeta.addValueMeta(new ValueMetaInteger(f.getReturns().getVersionField()));
insertRowMeta.addValueMeta(new ValueMetaDate(f.getDate().getFrom()));
insertRowMeta.addValueMeta(new ValueMetaDate(f.getDate().getTo()));
for (int i = 0; i < f.getKeys().size(); i++) {
DLKey key = f.getKeys().get(i);
sql.append(", ").append(data.databaseMeta.quoteField(key.getLookup()));
insertRowMeta.addValueMeta(inputRowMeta.getValueMeta(data.keynrs[i]));
}
for (int i = 0; i < f.getFields().size(); i++) {
DLField field = f.getFields().get(i);
// Ignore last_version, last_updated etc, they are handled below (at the
// back of the row).
//
if (meta.isUpdate() && field.getUpdateType().isWithArgument()) {
sql.append(", ").append(data.databaseMeta.quoteField(field.getLookup()));
insertRowMeta.addValueMeta(inputRowMeta.getValueMeta(data.fieldnrs[i]));
}
}
// Finally, the special update fields...
//
for (int i = 0; i < f.getFields().size(); i++) {
DLField field = f.getFields().get(i);
IValueMeta valueMeta = null;
switch (field.getUpdateType()) {
case DATE_INSERTED_UPDATED, DATE_INSERTED:
valueMeta = new ValueMetaDate(field.getLookup());
break;
case LAST_VERSION:
valueMeta = new ValueMetaBoolean(field.getLookup());
break;
default:
break;
}
if (valueMeta != null) {
sql.append(", ").append(data.databaseMeta.quoteField(valueMeta.getName()));
insertRowMeta.addValueMeta(valueMeta);
}
}
sql.append(") VALUES (");
if (!isAutoIncrement()) {
sql.append("?, ");
}
sql.append("?, ?, ?");
for (int i = 0; i < data.keynrs.length; i++) {
sql.append(", ?");
}
for (DLField field : f.getFields()) {
// Ignore last_version, last_updated, etc. These are handled below...
//
if (meta.isUpdate() && field.getUpdateType().isWithArgument()) {
sql.append(", ?");
}
}
// The special update fields...
//
for (DLField field : f.getFields()) {
switch (field.getUpdateType()) {
case DATE_INSERTED_UPDATED, DATE_INSERTED, LAST_VERSION:
sql.append(", ?");
break;
default:
break;
}
}
sql.append(" )");
try {
if (technicalKey == null && data.databaseMeta.supportsAutoGeneratedKeys()) {
logDetailed("SQL w/ return keys=[" + sql + "]");
data.prepStatementInsert =
data.db
.getConnection()
.prepareStatement(
data.databaseMeta.stripCR(sql), Statement.RETURN_GENERATED_KEYS);
} else {
logDetailed("SQL=[" + sql + "]");
data.prepStatementInsert =
data.db.getConnection().prepareStatement(data.databaseMeta.stripCR(sql));
}
} catch (SQLException ex) {
throw new HopDatabaseException("Unable to prepare dimension insert :" + Const.CR + sql, ex);
}
/*
* UPDATE d_customer SET dateto = val_datnow, last_updated = <now> last_version = false WHERE keylookup[] =
* keynrs[] AND versionfield = val_version - 1
*/
IRowMeta updateRowMeta = new RowMeta();
StringBuilder sqlUpdate = new StringBuilder();
sqlUpdate.append(CONST_UPDATE).append(data.schemaTable).append(Const.CR);
// The end of the date range
//
sqlUpdate
.append("SET ")
.append(data.databaseMeta.quoteField(f.getDate().getTo()))
.append(" = ?")
.append(Const.CR);
updateRowMeta.addValueMeta(new ValueMetaDate(f.getDate().getTo()));
// The special update fields...
//
for (DLField field : f.getFields()) {
IValueMeta valueMeta = null;
switch (field.getUpdateType()) {
case DATE_INSERTED_UPDATED, DATE_UPDATED:
valueMeta = new ValueMetaDate(field.getLookup());
break;
case LAST_VERSION:
valueMeta = new ValueMetaBoolean(field.getLookup());
break;
default:
break;
}
if (valueMeta != null) {
sqlUpdate
.append(", ")
.append(data.databaseMeta.quoteField(valueMeta.getName()))
.append(" = ?")
.append(Const.CR);
updateRowMeta.addValueMeta(valueMeta);
}
}
sqlUpdate.append("WHERE ");
for (int i = 0; i < f.getKeys().size(); i++) {
DLKey key = f.getKeys().get(i);
if (i > 0) {
sqlUpdate.append(CONST_AND);
}
sqlUpdate
.append(data.databaseMeta.quoteField(key.getLookup()))
.append(" = ?")
.append(Const.CR);
updateRowMeta.addValueMeta(inputRowMeta.getValueMeta(data.keynrs[i]));
}
sqlUpdate
.append(CONST_AND)
.append(data.databaseMeta.quoteField(f.getReturns().getVersionField()))
.append(" = ? ");
updateRowMeta.addValueMeta(new ValueMetaInteger(f.getReturns().getVersionField()));
try {
logDetailed("Preparing update: " + Const.CR + sqlUpdate + Const.CR);
data.prepStatementUpdate =
data.db.getConnection().prepareStatement(data.databaseMeta.stripCR(sqlUpdate));
} catch (SQLException ex) {
throw new HopDatabaseException(
"Unable to prepare dimension update :" + Const.CR + sqlUpdate, ex);
}
data.insertRowMeta = insertRowMeta;
data.updateRowMeta = updateRowMeta;
}
Object[] insertRow = new Object[data.insertRowMeta.size()];
int insertIndex = 0;
if (!isAutoIncrement()) {
insertRow[insertIndex++] = technicalKey;
}
// Caller is responsible for setting proper version number depending
// on if newEntry == true
insertRow[insertIndex++] = versionNr;
switch (data.startDateAlternative) {
case NONE:
insertRow[insertIndex++] = dateFrom;
break;
case SYSTEM_DATE:
// use the time the transform execution begins as the date from (passed in as dateFrom).
// before, the current system time was used. this caused an exclusion of the row in the
// lookup portion of the transform that uses this 'valueDate' and not the current time.
// the result was multiple inserts for what should have been 1
insertRow[insertIndex++] = dateFrom;
break;
case PIPELINE_START:
insertRow[insertIndex++] = getPipeline().getExecutionStartDate();
break;
case NULL:
insertRow[insertIndex++] = null;
break;
case COLUMN_VALUE:
insertRow[insertIndex++] = inputRowMeta.getDate(row, data.startDateFieldIndex);
break;
default:
throw new HopTransformException(
BaseMessages.getString(
PKG,
"DimensionLookup.Exception.IllegalStartDateSelection",
data.startDateAlternative.getDescription()));
}
insertRow[insertIndex++] = dateTo;
for (int i = 0; i < data.keynrs.length; i++) {
insertRow[insertIndex++] = row[data.keynrs[i]];
}
for (int i = 0; i < data.fieldnrs.length; i++) {
if (data.fieldnrs[i] >= 0) {
// Ignore last_version, last_updated, etc. These are handled below...
//
insertRow[insertIndex++] = row[data.fieldnrs[i]];
}
}
// The special update fields...
//
for (DLField field : f.getFields()) {
switch (field.getUpdateType()) {
case DATE_INSERTED_UPDATED, DATE_INSERTED:
insertRow[insertIndex++] = new Date();
break;
case LAST_VERSION:
insertRow[insertIndex++] = Boolean.TRUE;
break; // Always the last version on insert.
default:
break;
}
}
if (isDebug()) {
logDebug(
"rins, size="
+ data.insertRowMeta.size()
+ ", values="
+ data.insertRowMeta.getString(insertRow));
}
// INSERT NEW VALUE!
data.db.setValues(data.insertRowMeta, insertRow, data.prepStatementInsert);
data.db.insertRow(data.prepStatementInsert);
if (isDebug()) {
logDebug("Row inserted!");
}
if (technicalKey == null && data.databaseMeta.supportsAutoGeneratedKeys()) {
try {
RowMetaAndData keys = data.db.getGeneratedKeys(data.prepStatementInsert);
if (keys.getRowMeta().size() > 0) {
technicalKey = keys.getRowMeta().getInteger(keys.getData(), 0);
} else {
throw new HopDatabaseException(
"Unable to retrieve value of auto-generated technical key : no value found!");
}
} catch (Exception e) {
throw new HopDatabaseException(
"Unable to retrieve value of auto-generated technical key : unexpected error: ", e);
}
}
if (!newEntry) { // we have to update the previous version in the dimension!
/*
* UPDATE d_customer SET dateto = val_datfrom , last_updated = <now> , last_version = false WHERE keylookup[] =
* keynrs[] AND versionfield = val_version - 1
*/
Object[] updateRow = new Object[data.updateRowMeta.size()];
int updateIndex = 0;
switch (data.startDateAlternative) {
case NONE:
updateRow[updateIndex++] = dateFrom;
break;
case SYSTEM_DATE:
updateRow[updateIndex++] = new Date();
break;
case PIPELINE_START:
updateRow[updateIndex++] = getPipeline().getExecutionStartDate();
break;
case NULL:
updateRow[updateIndex++] = null;
break;
case COLUMN_VALUE:
updateRow[updateIndex++] = inputRowMeta.getDate(row, data.startDateFieldIndex);
break;
default:
throw new HopTransformException(
BaseMessages.getString(
"DimensionLookup.Exception.IllegalStartDateSelection",
data.startDateAlternative.getDescription()));
}
// The special update fields...
//
for (DLField field : f.getFields()) {
switch (field.getUpdateType()) {
case DATE_INSERTED_UPDATED, DATE_UPDATED:
updateRow[updateIndex++] = new Date();
break;
case LAST_VERSION:
updateRow[updateIndex++] = Boolean.FALSE;
break; // Never the last version on this update
default:
break;
}
}
for (int i = 0; i < data.keynrs.length; i++) {
updateRow[updateIndex++] = row[data.keynrs[i]];
}
updateRow[updateIndex] = versionNr - 1;
if (isRowLevel()) {
logRowlevel("UPDATE using rupd=" + data.updateRowMeta.getString(updateRow));
}
// UPDATE VALUES
// set values for update
//
data.db.setValues(data.updateRowMeta, updateRow, data.prepStatementUpdate);
if (isDebug()) {
logDebug("Values set for update (" + data.updateRowMeta.size() + ")");
}
data.db.insertRow(data.prepStatementUpdate); // do the actual update
if (isDebug()) {
logDebug("Row updated!");
}
}
return technicalKey;
}