in plugins/actions/mssqlbulkload/src/main/java/org/apache/hop/workflow/actions/mssqlbulkload/ActionMssqlBulkLoad.java [193:473]
public Result execute(Result previousResult, int nr) {
String takeFirstNbrLines = "";
String lineTerminatedby = "";
String fieldTerminatedby = "";
boolean useFieldSeparator = false;
String useCodepage = "";
String errorfileName = "";
Result result = previousResult;
result.setResult(false);
String vfsFilename = resolve(fileName);
FileObject fileObject = null;
// Let's check the filename ...
if (!Utils.isEmpty(vfsFilename)) {
try {
// User has specified a file, We can continue ...
//
// This is running over VFS but we need a normal file.
// As such, we're going to verify that it's a local file...
// We're also going to convert VFS FileObject to File
//
fileObject = HopVfs.getFileObject(vfsFilename);
if (!(fileObject instanceof LocalFile)) {
// MSSQL BUKL INSERT can only use local files, so that's what we limit ourselves to.
//
throw new HopException(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.Error.OnlyLocalFileSupported", vfsFilename));
}
// Convert it to a regular platform specific file name
//
String realFilename = HopVfs.getFilename(fileObject);
// Here we go... back to the regular scheduled program...
//
File file = new File(realFilename);
if (file.exists() && file.canRead()) {
// User has specified an existing file, We can continue ...
if (isDetailed()) {
logDetailed(
BaseMessages.getString(PKG, "ActionMssqlBulkLoad.FileExists.Label", realFilename));
}
if (connection != null) {
DatabaseMeta dbMeta = DatabaseMeta.loadDatabase(getMetadataProvider(), connection);
// User has specified a connection, We can continue ...
String pluginId = dbMeta.getPluginId();
if (!("MSSQL".equals(pluginId) || "MSSQLNATIVE".equals(pluginId))) {
logError(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.Error.DbNotMSSQL", dbMeta.getDatabaseName()));
return result;
}
try (Database db = new Database(this, this, dbMeta)) {
db.connect();
// Get schemaname
String realSchemaname = resolve(schemaName);
// Get tablename
String realTablename = resolve(tableName);
if (db.checkTableExists(realSchemaname, realTablename)) {
// The table existe, We can continue ...
if (isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.TableExists.Label", realTablename));
}
// FIELDTERMINATOR
String fieldTerminator = getRealFieldTerminator();
if (Utils.isEmpty(fieldTerminator)
&& (dataFileType.equals("char") || dataFileType.equals("widechar"))) {
logError(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.Error.FieldTerminatorMissing"));
return result;
} else {
if (dataFileType.equals("char") || dataFileType.equals("widechar")) {
useFieldSeparator = true;
fieldTerminatedby = "FIELDTERMINATOR='" + fieldTerminator + "'";
}
}
// Check Specific Code page
if (codePage.equals("Specific")) {
String realCodePage = resolve(codePage);
if (specificCodePage.length() < 0) {
logError(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.Error.SpecificCodePageMissing"));
return result;
} else {
useCodepage = "CODEPAGE = '" + realCodePage + "'";
}
} else {
useCodepage = "CODEPAGE = '" + codePage + "'";
}
// Check Error file
String realErrorFile = resolve(errorFileName);
if (realErrorFile != null) {
File errorfile = new File(realErrorFile);
if (errorfile.exists() && !addDatetime) {
// The error file is created when the command is executed. An error occurs if
// the file already
// exists.
logError(
BaseMessages.getString(PKG, "ActionMssqlBulkLoad.Error.ErrorFileExists"));
return result;
}
if (addDatetime) {
// Add date time to filename...
SimpleDateFormat daf = new SimpleDateFormat();
Date now = new Date();
daf.applyPattern("yyyMMdd_HHmmss");
String d = daf.format(now);
errorfileName = "ERRORFILE ='" + realErrorFile + "_" + d + "'";
} else {
errorfileName = "ERRORFILE ='" + realErrorFile + "'";
}
}
// ROWTERMINATOR
String rowterminator = getRealLineterminated();
if (!Utils.isEmpty(rowterminator)) {
lineTerminatedby = "ROWTERMINATOR='" + rowterminator + "'";
}
// Start file at
if (startFile > 0) {
takeFirstNbrLines = "FIRSTROW=" + startFile;
}
// End file at
if (endFile > 0) {
takeFirstNbrLines = "LASTROW=" + endFile;
}
// Truncate table?
String sqlBulkLoad = "";
if (truncate) {
sqlBulkLoad = "TRUNCATE TABLE " + realTablename + ";";
}
// Build BULK Command
sqlBulkLoad =
sqlBulkLoad
+ "BULK INSERT "
+ realTablename
+ " FROM "
+ "'"
+ realFilename.replace('\\', '/')
+ "'";
sqlBulkLoad = sqlBulkLoad + " WITH (";
if (useFieldSeparator) {
sqlBulkLoad = sqlBulkLoad + fieldTerminatedby;
} else {
sqlBulkLoad = sqlBulkLoad + "DATAFILETYPE ='" + dataFileType + "'";
}
if (lineTerminatedby.length() > 0) {
sqlBulkLoad = sqlBulkLoad + "," + lineTerminatedby;
}
if (takeFirstNbrLines.length() > 0) {
sqlBulkLoad = sqlBulkLoad + "," + takeFirstNbrLines;
}
if (useCodepage.length() > 0) {
sqlBulkLoad = sqlBulkLoad + "," + useCodepage;
}
String realFormatFile = resolve(formatFileName);
if (realFormatFile != null) {
sqlBulkLoad = sqlBulkLoad + ", FORMATFILE='" + realFormatFile + "'";
}
if (fireTriggers) {
sqlBulkLoad = sqlBulkLoad + ",FIRE_TRIGGERS";
}
if (keepNulls) {
sqlBulkLoad = sqlBulkLoad + ",KEEPNULLS";
}
if (keepIdentity) {
sqlBulkLoad = sqlBulkLoad + ",KEEPIDENTITY";
}
if (checkConstraints) {
sqlBulkLoad = sqlBulkLoad + ",CHECK_CONSTRAINTS";
}
if (tabLock) {
sqlBulkLoad = sqlBulkLoad + ",TABLOCK";
}
if (orderBy != null) {
sqlBulkLoad = sqlBulkLoad + ",ORDER ( " + orderBy + " " + orderDirection + ")";
}
if (errorfileName.length() > 0) {
sqlBulkLoad = sqlBulkLoad + ", " + errorfileName;
}
if (maxErrors > 0) {
sqlBulkLoad = sqlBulkLoad + ", MAXERRORS=" + maxErrors;
}
if (batchSize > 0) {
sqlBulkLoad = sqlBulkLoad + ", BATCHSIZE=" + batchSize;
}
if (rowsPerBatch > 0) {
sqlBulkLoad = sqlBulkLoad + ", ROWS_PER_BATCH=" + rowsPerBatch;
}
// End of Bulk command
sqlBulkLoad = sqlBulkLoad + ")";
try {
// Run the SQL
db.execStatement(sqlBulkLoad);
// Everything is OK...we can disconnect now
db.disconnect();
if (isAddFileToResult()) {
// Add filename to output files
ResultFile resultFile =
new ResultFile(
ResultFile.FILE_TYPE_GENERAL,
HopVfs.getFileObject(realFilename),
parentWorkflow.getWorkflowName(),
toString());
result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
}
result.setResult(true);
} catch (HopDatabaseException je) {
result.setNrErrors(1);
logError("An error occurred executing this action : " + je.getMessage(), je);
} catch (HopFileException e) {
logError("An error occurred executing this action : " + e.getMessage(), e);
result.setNrErrors(1);
}
} else {
// Of course, the table should have been created already before the bulk load
// operation
result.setNrErrors(1);
logError(
BaseMessages.getString(
PKG, "ActionMssqlBulkLoad.Error.TableNotExists", realTablename));
}
} catch (HopDatabaseException dbe) {
result.setNrErrors(1);
logError("An error occurred executing this entry: " + dbe.getMessage());
}
} else {
// No database connection is defined
result.setNrErrors(1);
logError(BaseMessages.getString(PKG, "ActionMssqlBulkLoad.Nodatabase.Label"));
}
} else {
// the file doesn't exist
result.setNrErrors(1);
logError(
BaseMessages.getString(PKG, "ActionMssqlBulkLoad.Error.FileNotExists", realFilename));
}
} catch (Exception e) {
// An unexpected error occurred
result.setNrErrors(1);
logError(BaseMessages.getString(PKG, "ActionMssqlBulkLoad.UnexpectedError.Label"), e);
} finally {
try {
if (fileObject != null) {
fileObject.close();
}
} catch (Exception e) {
// Ignore errors
}
}
} else {
// No file was specified
result.setNrErrors(1);
logError(BaseMessages.getString(PKG, "ActionMssqlBulkLoad.Nofilename.Label"));
}
return result;
}