in index.js [1076:1283]
function loadCluster(config, thisBatchId, s3Info, manifestInfo, clusterInfo, callback) {
/* build the redshift copy command */
var copyCommand = '';
// set the statement timeout to 10 seconds less than the remaining
// execution time on the lambda function, or 60 seconds if we can't
// resolve the time remaining. fail the lambda function if we have less
// than 5 seconds remaining
var remainingMillis;
if (context) {
remainingMillis = context.getRemainingTimeInMillis();
if (remainingMillis < 10000) {
failBatch("Remaining duration of " + remainingMillis + ' insufficient to load cluster', config, thisBatchId, s3Info, manifestInfo);
} else {
copyCommand = 'set statement_timeout to ' + (remainingMillis - 10000) + ';\n';
}
} else {
copyCommand = 'set statement_timeout to 60000;\n';
}
// open a transaction so that all pre-sql, load, and post-sql commit at
// once
copyCommand += 'begin;\n';
// if the presql option is set, insert it into the copyCommand
if (clusterInfo.presql && clusterInfo.presql.S) {
copyCommand += clusterInfo.presql.S + (clusterInfo.presql.S.slice(-1) == ";" ? "" : ";") + '\n'
}
var copyOptions = "manifest ";
// add the truncate option if requested
if (clusterInfo.truncateTarget && clusterInfo.truncateTarget.BOOL) {
copyCommand += 'truncate table ' + clusterInfo.targetTable.S + ';\n';
}
var encryptedItems = {};
var useLambdaCredentialsToLoad = true;
const s3secretKeyMapEntry = "s3secretKey";
const passwordKeyMapEntry = "clusterPassword";
const symmetricKeyMapEntry = "symmetricKey";
if (config.secretKeyForS3) {
encryptedItems[s3secretKeyMapEntry] = Buffer.from(config.secretKeyForS3.S, 'base64');
useLambdaCredentialsToLoad = false;
}
logger.debug("Loading Cluster " + clusterInfo.clusterEndpoint.S + " with " + (useLambdaCredentialsToLoad === true ? "Lambda" : "configured") + " credentials");
// add the cluster password
encryptedItems[passwordKeyMapEntry] = Buffer.from(clusterInfo.connectPassword.S, 'base64');
// add the master encryption key to the list of items to be decrypted,
// if there is one
if (config.masterSymmetricKey) {
encryptedItems[symmetricKeyMapEntry] = Buffer.from(config.masterSymmetricKey.S, 'base64');
}
// decrypt the encrypted items
kmsCrypto.decryptMap(encryptedItems, function (err, decryptedConfigItems) {
if (err) {
callback(err, {
status: ERROR,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
// create the credentials section
var credentials;
if (useLambdaCredentialsToLoad === true) {
credentials = 'aws_access_key_id=' + aws.config.credentials.accessKeyId + ';aws_secret_access_key=' + aws.config.credentials.secretAccessKey;
if (aws.config.credentials.sessionToken) {
credentials += ';token=' + aws.config.credentials.sessionToken;
}
} else {
credentials = 'aws_access_key_id=' + config.accessKeyForS3.S + ';aws_secret_access_key=' + decryptedConfigItems[s3secretKeyMapEntry].toString();
}
if (typeof clusterInfo.columnList === 'undefined') {
copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' from \'s3://' + manifestInfo.manifestPath + '\'';
} else {
copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' (' + clusterInfo.columnList.S + ') from \'s3://' + manifestInfo.manifestPath + '\'';
}
// add data formatting directives to copy
// options
if (config.dataFormat.S === 'CSV') {
// if removequotes or escape has been used in copy options, then we wont use the CSV formatter
if (!(config.copyOptions && (config.copyOptions.S.toUpperCase().indexOf('REMOVEQUOTES') > -1 || config.copyOptions.S.toUpperCase().indexOf('ESCAPE') > -1))) {
copyOptions = copyOptions + 'format csv ';
}
copyOptions = copyOptions + 'delimiter \'' + config.csvDelimiter.S + '\'\n';
// this will ignore the first line
if (config.ignoreCsvHeader && config.ignoreCsvHeader.BOOL) {
copyOptions = copyOptions + ' IGNOREHEADER 1 ' + '\n';
}
} else if (config.dataFormat.S === 'JSON' || config.dataFormat.S === 'AVRO') {
copyOptions = copyOptions + ' format ' + config.dataFormat.S;
if (!(config.jsonPath === undefined || config.jsonPath === null)) {
copyOptions = copyOptions + ' \'' + config.jsonPath.S + '\' \n';
} else {
copyOptions = copyOptions + ' \'auto\' \n';
}
} else if (config.dataFormat.S === 'PARQUET' || config.dataFormat.S === 'ORC') {
copyOptions = copyOptions + ' format as ' + config.dataFormat.S;
} else {
callback(null, {
status: ERROR,
error: 'Unsupported data format ' + config.dataFormat.S,
cluster: clusterInfo.clusterEndpoint.S
});
}
// add compression directives
if (config.compression) {
copyOptions = copyOptions + ' ' + config.compression.S + '\n';
}
// add copy options
if (config.copyOptions !== undefined) {
copyOptions = copyOptions + config.copyOptions.S + '\n';
}
// add the encryption option to the copy command, and the master
// symmetric key clause to the credentials
if (config.masterSymmetricKey) {
copyOptions = copyOptions + "encrypted\n";
if (decryptedConfigItems[symmetricKeyMapEntry]) {
credentials = credentials + ";master_symmetric_key=" + decryptedConfigItems[symmetricKeyMapEntry].toString();
} else {
// we didn't get a decrypted symmetric key back so fail
callback(null, {
status: ERROR,
error: "KMS did not return a Decrypted Master Symmetric Key Value from: " + config.masterSymmetricKey.S,
cluster: clusterInfo.clusterEndpoint.S
});
}
}
// build the final copy command
copyCommand = copyCommand + " with credentials as \'" + credentials + "\' " + copyOptions + ";\n";
// if the post-sql option is set, insert it into the copyCommand
if (clusterInfo.postsql && clusterInfo.postsql.S) {
copyCommand += clusterInfo.postsql.S + (clusterInfo.postsql.S.slice(-1) == ";" ? "" : ";") + '\n'
}
copyCommand += 'commit;';
logger.debug("Copy Command Assembly Complete");
logger.debug(copyCommand);
// build the connection string
var dbString = 'postgres://' + clusterInfo.connectUser.S + ":" + encodeURIComponent(decryptedConfigItems[passwordKeyMapEntry].toString()) + "@" + clusterInfo.clusterEndpoint.S + ":"
+ clusterInfo.clusterPort.N;
if (clusterInfo.clusterDB) {
dbString = dbString + '/' + clusterInfo.clusterDB.S;
}
if (clusterInfo.useSSL && clusterInfo.useSSL.BOOL === true) {
dbString = dbString + '?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory';
}
let overrideDbString = process.env['_OVERRIDE_DBSTRING'];
if (overrideDbString && overrideDbString !== null) {
dbString = overrideDbString;
logger.info("Using Override Database String: " + overrideDbString);
} else {
logger.info("Connecting to Database " + clusterInfo.clusterEndpoint.S + ":" + clusterInfo.clusterPort.N);
}
/*
* connect to database and run the copy command
*/
const pgClient = new Client({
connectionString: dbString
});
pgClient.connect((err) => {
if (err) {
logger.error(err);
callback(null, {
status: ERROR,
error: err,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
/*
* run the copy command. We will allow 5 retries when
* the 'specified key does not exist' error is
* encountered, as this means an issue with eventual
* consistency in US Std. We will use an exponential
* backoff from 30ms with 5 retries - giving a max retry
* duration of ~ 1 second
*/
runPgCommand(clusterInfo, pgClient, copyCommand, 5, ["S3ServiceException:The specified key does not exist.,Status 404"], 30, callback);
}
});
}
});
};