in plugins/misc/mail/src/main/java/org/apache/hop/mail/pipeline/transforms/mail/Mail.java [82:597]
public boolean processRow() throws HopException {
Object[] r = getRow(); // get row, set busy!
if (r == null) { // no more input to be expected...
setOutputDone();
return false;
}
if (first) {
first = false;
// Making sure the mail API finds the right resources
//
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// get the RowMeta
data.previousRowMeta = getInputRowMeta().clone();
if (meta.isAddMessageToOutput()) {
if (Utils.isEmpty(variables.resolve(meta.getMessageOutputField()))) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.OutputFieldEmpty"));
}
data.previousRowMeta.addValueMeta(
new ValueMetaString(variables.resolve(meta.getMessageOutputField())));
}
// Check is filename field is provided
if (Utils.isEmpty(meta.getDestination())) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.DestinationFieldEmpty"));
}
// Check is replyname field is provided
if (Utils.isEmpty(meta.getReplyAddress())) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.ReplyFieldEmpty"));
}
// Check is SMTP server is provided
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (Utils.isEmpty(meta.getServer())) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.ServerFieldEmpty"));
}
}
// Check Attached filenames when dynamic
if (meta.isFilenameDynamic() && Utils.isEmpty(meta.getDynamicFieldname())) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.DynamicFilenameFielddEmpty"));
}
// Check Attached zipfilename when dynamic
if (meta.isZipFilenameDynamic() && Utils.isEmpty(meta.getDynamicZipFilename())) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Log.DynamicZipFilenameFieldEmpty"));
}
if (meta.isZipFiles() && Utils.isEmpty(meta.getZipFilename())) {
throw new HopException(BaseMessages.getString(PKG, "Mail.Log.ZipFilenameEmpty"));
}
// check authentication
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (meta.isUsingAuthentication()) {
// check authentication user
if (Utils.isEmpty(meta.getAuthenticationUser())) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Log.AuthenticationUserFieldEmpty"));
}
// check authentication pass
if (Utils.isEmpty(meta.getAuthenticationPassword())) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Log.AuthenticationPasswordFieldEmpty"));
}
}
}
// cache the position of the destination field
if (data.indexOfDestination < 0) {
String realDestinationFieldname = meta.getDestination();
data.indexOfDestination = data.previousRowMeta.indexOfValue(realDestinationFieldname);
if (data.indexOfDestination < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindDestinationField", realDestinationFieldname));
}
}
// Cc
if (!Utils.isEmpty(meta.getDestinationCc()) && data.indexOfDestinationCc < 0) {
// cache the position of the Cc field
String realDestinationCcFieldname = meta.getDestinationCc();
data.indexOfDestinationCc = data.previousRowMeta.indexOfValue(realDestinationCcFieldname);
if (data.indexOfDestinationCc < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotFindDestinationCcField",
realDestinationCcFieldname));
}
}
// BCc
if (!Utils.isEmpty(meta.getDestinationBCc()) && data.indexOfDestinationBCc < 0) {
// cache the position of the BCc field
String realDestinationBCcFieldname = meta.getDestinationBCc();
data.indexOfDestinationBCc = data.previousRowMeta.indexOfValue(realDestinationBCcFieldname);
if (data.indexOfDestinationBCc < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotFindDestinationBCcField",
realDestinationBCcFieldname));
}
}
// Sender Name
if (!Utils.isEmpty(meta.getReplyName()) && data.indexOfSenderName < 0) {
// cache the position of the sender field
String realSenderName = meta.getReplyName();
data.indexOfSenderName = data.previousRowMeta.indexOfValue(realSenderName);
if (data.indexOfSenderName < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindReplyNameField", realSenderName));
}
}
// Sender address
// cache the position of the sender field
if (data.indexOfSenderAddress < 0) {
String realSenderAddress = meta.getReplyAddress();
data.indexOfSenderAddress = data.previousRowMeta.indexOfValue(realSenderAddress);
if (data.indexOfSenderAddress < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindReplyAddressField", realSenderAddress));
}
}
// Reply to
if (!Utils.isEmpty(meta.getReplyToAddresses()) && data.indexOfReplyToAddresses < 0) {
// cache the position of the reply to field
String realReplyToAddresses = meta.getReplyToAddresses();
data.indexOfReplyToAddresses = data.previousRowMeta.indexOfValue(realReplyToAddresses);
if (data.indexOfReplyToAddresses < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindReplyToAddressesField", realReplyToAddresses));
}
}
// Contact Person
if (!Utils.isEmpty(meta.getContactPerson()) && data.indexOfContactPerson < 0) {
// cache the position of the destination field
String realContactPerson = meta.getContactPerson();
data.indexOfContactPerson = data.previousRowMeta.indexOfValue(realContactPerson);
if (data.indexOfContactPerson < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindContactPersonField", realContactPerson));
}
}
// Contact Phone
if (!Utils.isEmpty(meta.getContactPhone()) && data.indexOfContactPhone < 0) {
// cache the position of the destination field
String realContactPhone = meta.getContactPhone();
data.indexOfContactPhone = data.previousRowMeta.indexOfValue(realContactPhone);
if (data.indexOfContactPhone < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindContactPhoneField", realContactPhone));
}
}
// cache the position of the Server field
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (data.indexOfServer < 0) {
String realServer = meta.getServer();
data.indexOfServer = data.previousRowMeta.indexOfValue(realServer);
if (data.indexOfServer < 0) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.CouldnotFindServerField", realServer));
}
}
}
// Port
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (!Utils.isEmpty(meta.getPort()) && data.indexOfPort < 0) {
// cache the position of the port field
String realPort = meta.getPort();
data.indexOfPort = data.previousRowMeta.indexOfValue(realPort);
if (data.indexOfPort < 0) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.CouldnotFindPortField", realPort));
}
}
}
// Authentication
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (meta.isUsingAuthentication()) {
// cache the position of the Authentication user field
if (data.indexOfAuthenticationUser < 0) {
String realAuthenticationUser = meta.getAuthenticationUser();
data.indexOfAuthenticationUser =
data.previousRowMeta.indexOfValue(realAuthenticationUser);
if (data.indexOfAuthenticationUser < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotFindAuthenticationUserField",
realAuthenticationUser));
}
}
// cache the position of the Authentication password field
if (data.indexOfAuthenticationPass < 0) {
String realAuthenticationPassword = meta.getAuthenticationPassword();
data.indexOfAuthenticationPass =
data.previousRowMeta.indexOfValue(realAuthenticationPassword);
if (data.indexOfAuthenticationPass < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotFindAuthenticationPassField",
realAuthenticationPassword));
}
}
}
}
// Mail Subject
if (!Utils.isEmpty(meta.getSubject()) && data.indexOfSubject < 0) {
// cache the position of the subject field
String realSubject = meta.getSubject();
data.indexOfSubject = data.previousRowMeta.indexOfValue(realSubject);
if (data.indexOfSubject < 0) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.CouldnotFindSubjectField", realSubject));
}
}
// Mail Comment
if (!Utils.isEmpty(meta.getComment()) && data.indexOfComment < 0) {
// cache the position of the comment field
String realComment = meta.getComment();
data.indexOfComment = data.previousRowMeta.indexOfValue(realComment);
if (data.indexOfComment < 0) {
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.CouldnotFindCommentField", realComment));
}
}
if (meta.isAttachContentFromField()) {
// We are dealing with file content directly loaded from file
// and not physical file
String attachedContentField = meta.getAttachContentField();
if (Utils.isEmpty(attachedContentField)) {
// Empty Field
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.AttachedContentFieldEmpty"));
}
data.indexOfAttachedContent = data.previousRowMeta.indexOfValue(attachedContentField);
if (data.indexOfAttachedContent < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotFindAttachedContentField", attachedContentField));
}
// Attached content filename
String attachedContentFileNameField = meta.getAttachContentFileNameField();
if (Utils.isEmpty(attachedContentFileNameField)) {
// Empty Field
throw new HopException(
BaseMessages.getString(PKG, "Mail.Exception.AttachedContentFileNameFieldEmpty"));
}
data.indexOfAttachedFilename =
data.previousRowMeta.indexOfValue(attachedContentFileNameField);
if (data.indexOfAttachedFilename < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotFindAttachedContentFileNameField",
attachedContentFileNameField));
}
} else {
// Dynamic Zipfilename
if (meta.isZipFilenameDynamic() && data.indexOfDynamicZipFilename < 0) {
// cache the position of the attached source filename field
String realZipFilename = meta.getDynamicZipFilename();
data.indexOfDynamicZipFilename = data.previousRowMeta.indexOfValue(realZipFilename);
if (data.indexOfDynamicZipFilename < 0) {
throw new HopException(
BaseMessages.getString(
PKG, "Mail.Exception.CouldnotSourceAttachedZipFilenameField", realZipFilename));
}
}
data.zipFileLimit = Const.toLong(resolve(meta.getZiplimitsize()), 0);
if (data.zipFileLimit > 0) {
data.zipFileLimit = data.zipFileLimit * 1048576; // Mo
}
if (!meta.isZipFilenameDynamic()) {
data.ZipFilename = resolve(meta.getZipFilename());
}
// Attached files
if (meta.isFilenameDynamic()) {
// cache the position of the attached source filename field
if (data.indexOfSourceFilename < 0) {
String realSourceattachedFilename = meta.getDynamicFieldname();
data.indexOfSourceFilename =
data.previousRowMeta.indexOfValue(realSourceattachedFilename);
if (data.indexOfSourceFilename < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotSourceAttachedFilenameField",
realSourceattachedFilename));
}
}
// cache the position of the attached wildcard field
if (!Utils.isEmpty(meta.getSourcewildcard()) && data.indexOfSourceWildcard < 0) {
String realSourceattachedWildcard = meta.getDynamicWildcard();
data.indexOfSourceWildcard =
data.previousRowMeta.indexOfValue(realSourceattachedWildcard);
if (data.indexOfSourceWildcard < 0) {
throw new HopException(
BaseMessages.getString(
PKG,
"Mail.Exception.CouldnotSourceAttachedWildcard",
realSourceattachedWildcard));
}
}
} else {
// static attached filenames
data.realSourceFileFoldername = resolve(meta.getSourcefilefoldername());
data.realSourceWildcard = resolve(meta.getSourcewildcard());
}
}
// check embedded images
if (meta.getEmbeddedImages() != null && meta.getEmbeddedImages().size() > 0) {
FileObject image = null;
data.embeddedMimePart = new HashSet<>();
try {
for (int i = 0; i < meta.getEmbeddedImages().size(); i++) {
String imageFile = resolve(meta.getEmbeddedImages().get(i).getEmbeddedimage());
String contentID = resolve(meta.getEmbeddedImages().get(i).getContentId());
image = HopVfs.getFileObject(imageFile, variables);
if (image.exists() && image.getType() == FileType.FILE) {
// Create part for the image
MimeBodyPart imagePart = new MimeBodyPart();
// Load the image
URLDataSource fds = new URLDataSource(image.getURL());
imagePart.setDataHandler(new DataHandler(fds));
// Setting the header
imagePart.setHeader("Content-ID", "<" + contentID + ">");
// keep this part for further user
data.embeddedMimePart.add(imagePart);
logBasic(BaseMessages.getString(PKG, "Mail.Log.ImageAdded", imageFile));
} else {
logError(BaseMessages.getString(PKG, "Mail.Log.WrongImage", imageFile));
}
}
} catch (Exception e) {
logError(BaseMessages.getString(PKG, "Mail.Error.AddingImage", e.getMessage()));
} finally {
if (image != null) {
try {
image.close();
} catch (Exception e) {
/* Ignore */
}
}
}
}
} // end if first
boolean sendToErrorRow = false;
String errorMessage = null;
try {
// get values
String maildestination = data.previousRowMeta.getString(r, data.indexOfDestination);
if (Utils.isEmpty(maildestination)) {
throw new HopException("Mail.Error.MailDestinationEmpty");
}
String maildestinationCc = null;
if (data.indexOfDestinationCc > -1) {
maildestinationCc = data.previousRowMeta.getString(r, data.indexOfDestinationCc);
}
String maildestinationBCc = null;
if (data.indexOfDestinationBCc > -1) {
maildestinationBCc = data.previousRowMeta.getString(r, data.indexOfDestinationBCc);
}
String mailsendername = null;
if (data.indexOfSenderName > -1) {
mailsendername = data.previousRowMeta.getString(r, data.indexOfSenderName);
}
String mailsenderaddress = data.previousRowMeta.getString(r, data.indexOfSenderAddress);
// reply addresses
String mailreplyToAddresses = null;
if (data.indexOfReplyToAddresses > -1) {
mailreplyToAddresses = data.previousRowMeta.getString(r, data.indexOfReplyToAddresses);
}
String contactperson = null;
if (data.indexOfContactPerson > -1) {
contactperson = data.previousRowMeta.getString(r, data.indexOfContactPerson);
}
String contactphone = null;
if (data.indexOfContactPhone > -1) {
contactphone = data.previousRowMeta.getString(r, data.indexOfContactPhone);
}
String servername = null;
int port = -1;
String authuser = null;
String authpass = null;
if (StringUtils.isEmpty(meta.getConnectionName())) {
servername = data.previousRowMeta.getString(r, data.indexOfServer);
if (Utils.isEmpty(servername)) {
throw new HopException("Mail.Error.MailServerEmpty");
}
if (data.indexOfPort > -1) {
port = Const.toInt("" + data.previousRowMeta.getInteger(r, data.indexOfPort), -1);
}
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (data.indexOfAuthenticationUser > -1) {
authuser = data.previousRowMeta.getString(r, data.indexOfAuthenticationUser);
}
}
if (StringUtils.isEmpty(meta.getConnectionName())) {
if (data.indexOfAuthenticationPass > -1) {
authpass =
Utils.resolvePassword(
variables, data.previousRowMeta.getString(r, data.indexOfAuthenticationPass));
}
}
}
String subject = null;
if (data.indexOfSubject > -1) {
subject = data.previousRowMeta.getString(r, data.indexOfSubject);
}
String comment = null;
if (data.indexOfComment > -1) {
comment = data.previousRowMeta.getString(r, data.indexOfComment);
}
// send email...
String message =
sendMail(
r,
servername,
port,
mailsenderaddress,
mailsendername,
maildestination,
maildestinationCc,
maildestinationBCc,
contactperson,
contactphone,
authuser,
authpass,
subject,
comment,
mailreplyToAddresses);
if (meta.isAddMessageToOutput()) {
int index = data.previousRowMeta.indexOfValue(resolve(meta.getMessageOutputField()));
Object[] outputRowData = RowDataUtil.createResizedCopy(r, data.previousRowMeta.size());
outputRowData[index] = message;
putRow(data.previousRowMeta, outputRowData);
} else {
putRow(data.previousRowMeta, r);
}
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG,
"Mail.Log.LineNumber",
getLinesRead() + " : " + getInputRowMeta().getString(r)));
}
} catch (Exception e) {
if (getTransformMeta().isDoingErrorHandling()) {
sendToErrorRow = true;
errorMessage = e.toString();
} else {
throw new HopException(BaseMessages.getString(PKG, "Mail.Error.General"), e);
}
if (sendToErrorRow) {
// Simply add this row to the error row
putError(getInputRowMeta(), r, 1, errorMessage, null, "MAIL001");
}
}
return true;
}