in index.js [174:1610]
function handler(event, context) {
/** runtime functions * */
/*
* Function which performs all version upgrades over time - must be able to
* do a forward migration from any version to 'current' at all times!
*/
function upgradeConfig(s3Info, currentConfig, callback) {
// v 1.x to 2.x upgrade for multi-cluster loaders
if (semver.lt(currentConfig.version.S, pjson.version)) {
logger.debug(`Performing version upgrade from ${currentConfig.version.S} to ${pjson.version}`);
upgrade.upgradeAll(dynamoDB, s3Info, currentConfig, callback);
} else {
// no upgrade needed
callback(null, s3Info, currentConfig);
}
}
/* callback run when we find a configuration for load in Dynamo DB */
function foundConfig(s3Info, err, data) {
if (err) {
logger.error(err);
var msg = `Error getting Redshift Configuration for ${s3Info.prefix} from DynamoDB`;
logger.error(msg);
context.done(error, msg);
}
logger.info(`Found Redshift Load Configuration for ${s3Info.prefix}`);
var config = data.Item;
var thisBatchId = config.currentBatch.S;
// run all configuration upgrades required
upgradeConfig(s3Info, config, function (err, s3Info, useConfig) {
if (err) {
console.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
} else {
if (useConfig.filenameFilterRegex) {
var isFilterRegexMatch = true;
try {
isFilterRegexMatch = s3Info.key.match(useConfig.filenameFilterRegex.S);
} catch (e) {
// suppress this error - it may have been a malformed
// regex as well as just a non-match
// exceptions are treated as a file match here - we'd
// rather process a file and have the batch
// fail than erroneously ignore it
logger.error("Error on filename filter evaluation. File will be included for processing");
logger.error(e);
}
if (isFilterRegexMatch) {
checkFileProcessed(useConfig, thisBatchId, s3Info);
} else {
logger.info('Object ' + s3Info.key + ' excluded by filename filter \'' + useConfig.filenameFilterRegex.S + '\'');
// scan the current batch to decide if it needs to be
// flushed due to batch timeout
processPendingBatch(useConfig, thisBatchId, s3Info);
}
} else {
// no filter, so we'll load the data
checkFileProcessed(useConfig, thisBatchId, s3Info);
}
}
});
}
/*
* function to add a file to the pending batch set and then call the success
* callback
*/
function checkFileProcessed(config, thisBatchId, s3Info) {
var itemEntry = s3Info.bucket + '/' + s3Info.key;
// perform the idempotency check for the file before we put it
// into a manifest
var fileEntry = {
Key: {
loadFile: {
S: itemEntry
}
},
TableName: filesTable,
ExpressionAttributeNames: {
"#rcvDate": "receiveDateTime"
},
ExpressionAttributeValues: {
":rcvDate": {
S: common.readableTime(common.now())
},
":incr": {
N: "1"
}
},
UpdateExpression: "set #rcvDate = :rcvDate add timesReceived :incr",
ReturnValues: "ALL_NEW"
};
logger.debug("Checking whether File is already processed");
logger.debug(JSON.stringify(fileEntry));
// add the file to the processed list
dynamoDB.updateItem(fileEntry, function (err, data) {
var msg;
if (err) {
msg = "Error " + err.code + " for " + fileEntry;
logger.error(msg);
context.done(error, msg);
} else {
if (!data) {
msg = "Update failed to return data from Processed File Check";
logger.error(msg);
context.done(error, msg);
} else {
if (data.Attributes.batchId && data.Attributes.batchId.S) {
// there's already a pending batch link, so this is a
// full duplicate and we'll discard
logger.info("File " + itemEntry + " Already Processed");
context.done(null, null);
} else {
// update was successful, and either this is the first
// event and there was no batch ID
// specified, or the file is a reprocess but the batch
// ID attachment didn't work - proceed
// with adding the entry to the pending batch
addFileToPendingBatch(config, thisBatchId, s3Info, itemEntry);
}
}
}
});
};
/**
* Function run to add a file to the existing open batch. This will
* repeatedly try to write and if unsuccessful it will requery the batch ID
* on the configuration
*/
function addFileToPendingBatch(config, thisBatchId, s3Info, itemEntry) {
console.log("Adding Pending Batch Entry for " + itemEntry);
var proceed = false;
var asyncError;
var addFileRetryLimit = 100;
var tryNumber = 0;
var configReloads = 0;
async.whilst(
function (test_cb) {
// return OK if the proceed flag has been set, or if we've hit
// the retry count
test_cb(null, !proceed && tryNumber < addFileRetryLimit);
},
function (callback) {
tryNumber++;
// build the reference to the pending batch, with an atomic add
// of the current file
var now = common.now();
var item = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
UpdateExpression: "add writeDates :appendFileDate, size :size set #stat = :open, lastUpdate = :updateTime, entryMap = list_append(if_not_exists(entryMap, :emptyList), :entry)",
ExpressionAttributeNames: {
"#stat": 'status'
},
ExpressionAttributeValues: {
":entry": {
L: [{
M: {
"file": {S: itemEntry},
"size": {N: '' + s3Info.size}
}
}
]
},
":emptyList": {
L: []
},
":appendFileDate": {
NS: ['' + now]
},
":updateTime": {
N: '' + now
},
":open": {
S: open
},
":size": {
N: '' + s3Info.size
}
},
/*
* current batch can't be locked
*/
ConditionExpression: "#stat = :open or attribute_not_exists(#stat)"
};
logger.debug(JSON.stringify(item));
// add the file to the pending batch
dynamoDB.updateItem(item, function (err, data) {
if (err) {
let waitFor = Math.min(Math.pow(tryNumber, 2) * 10, maxRetryMS);
if (err.code === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on addition of " + s3Info.prefix + " to pending batch " + thisBatchId + ". Trying again in " + waitFor + " ms");
setTimeout(callback, waitFor);
} else if (err.code === conditionCheckFailed) {
// the batch I have a reference to was locked so
// reload the current batch ID from the config
var configReloadRequest = {
Key: {
s3Prefix: {
S: s3Info.prefix
}
},
TableName: configTable,
/*
* we need a consistent read here to ensure we
* get the latest batch ID
*/
ConsistentRead: true
};
dynamoDB.getItem(configReloadRequest, function (err, data) {
configReloads++;
if (err) {
if (err === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on reload of " + configTable + " due to locked batch write");
callback();
} else {
console.log(err);
callback(err);
}
} else {
if (data.Item.currentBatch.S === thisBatchId) {
// we've obtained the same batch ID back
// from the configuration as we have
// now, meaning it hasn't yet rotated
logger.warn("Batch " + thisBatchId + " still current after configuration reload attempt " + configReloads + ". Recycling in " + waitFor + " ms.");
// because the batch hasn't been
// reloaded on the configuration, we'll
// backoff here for a moment to let that
// happen
setTimeout(callback, waitFor);
} else {
// we've got an updated batch id, so use
// this in the next cycle of file add
thisBatchId = data.Item.currentBatch.S;
logger.warn("Obtained new Batch ID " + thisBatchId + " after configuration reload. Attempt " + configReloads);
/*
* callback immediately, as we should
* now have a valid and open batch to
* use
*/
callback();
}
}
});
} else {
asyncError = err;
proceed = true;
callback();
}
} else {
// no error - the file was added to the batch, so mark
// the operation as OK so async will not retry
proceed = true;
callback();
}
});
},
function (err) {
if (err) {
// throw presented errors
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
} else {
if (asyncError) {
/*
* throw errors which were encountered during the async
* calls
*/
logger.error(JSON.stringify(asyncError));
context.done(error, JSON.stringify(asyncError));
} else {
if (!proceed) {
/*
* process what happened if the iterative request to
* write to the open pending batch timed out
*
* TODO Can we force a rotation of the current batch
* at this point?
*/
var e = "Unable to write "
+ itemEntry
+ " in "
+ addFileRetryLimit
+ " attempts. Failing further processing to Batch "
+ thisBatchId
+ " which may be stuck in '"
+ locked
+ "' state. If so, unlock the back using `node unlockBatch.js <batch ID>`, delete the processed file marker with `node processedFiles.js -d <filename>`, and then re-store the file in S3";
logger.error(e);
var msg = "Lambda Redshift Loader unable to write to Open Pending Batch";
if (config.failureTopicARN) {
sendSNS(config.failureTopicARN.S, msg, e, function () {
context.done(error, e);
}, function (err) {
logger.error(err);
context.done(error, "Unable to Send SNS Notification");
});
} else {
logger.error("Unable to send failure notifications");
logger.error(msg);
context.done(error, msg);
}
} else {
// the add of the file was successful,
// so we
linkProcessedFileToBatch(itemEntry, thisBatchId);
// which is async, so may fail but we'll
// still sweep
// the pending batch
processPendingBatch(config, thisBatchId, s3Info);
}
}
}
});
};
/**
* Function which will link the deduplication table entry for the file to
* the batch into which the file was finally added
*/
function linkProcessedFileToBatch(itemEntry, batchId) {
var updateProcessedFile = {
Key: {
loadFile: {
S: itemEntry
}
},
TableName: filesTable,
AttributeUpdates: {
batchId: {
Action: 'PUT',
Value: {
S: batchId
}
}
}
};
logger.debug("Linking file to current batch");
logger.debug(JSON.stringify(updateProcessedFile));
common.retryableUpdate(dynamoDB, updateProcessedFile, function (err, data) {
// because this is an async call which doesn't affect
// process flow, we'll just log the error and do nothing with the OK
// response
if (err) {
logger.error(err);
}
});
};
/**
* Function which links the manifest name used to load redshift onto the
* batch table entry
*/
function addManifestToBatch(config, thisBatchId, s3Info, manifestInfo) {
// build the reference to the pending batch, with an atomic
// add of the current file
var item = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
manifestFile: {
Action: 'PUT',
Value: {
S: manifestInfo.manifestPath
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
}
};
logger.debug("Linking manifest file pointer to Batch");
logger.debug(JSON.stringify(item));
common.retryableUpdate(dynamoDB, item, function (err, data) {
if (err) {
logger.error(err);
} else {
logger.info("Linked Manifest " + manifestInfo.manifestName + " to Batch " + thisBatchId);
}
});
};
/**
* Function to process the current pending batch, and create a batch load
* process if required on the basis of size or timeout
*/
function processPendingBatch(config, thisBatchId, s3Info) {
// make the request for the current batch
var currentBatchRequest = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
ConsistentRead: true
};
logger.debug("Loading current Batch record from prefix config");
logger.debug(JSON.stringify(currentBatchRequest));
dynamoDB.getItem(currentBatchRequest, function (err, data) {
if (err) {
if (err === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on read of " + batchTable);
callback();
} else {
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
}
} else if (!data || !data.Item) {
var msg = "No open pending Batch " + thisBatchId;
logger.error(msg);
context.done(null, msg);
} else {
// first step is to resolve the earliest writeDate as the batch
// creation date
var batchCreateDate;
data.Item.writeDates.NS.map(function (item) {
var t = parseInt(item);
if (!batchCreateDate || t < batchCreateDate) {
batchCreateDate = t;
}
});
var lastUpdateTime = data.Item.lastUpdate.N;
/*
* grab the pending entries from the locked
* batch. We have 2 copies - a batch that uses StringSet from pre 2.7.8, and a List from 2.7.9
*/
let pendingEntries = {};
let pendingEntryCount = 0;
if (data.Item.entryMap) {
pendingEntryCount += data.Item.entryMap.L.length;
pendingEntries["entryMap"] = data.Item.entryMap.L;
}
if (data.Item.entrySet) {
pendingEntryCount += data.Item.entries.SS.length;
pendingEntries["entrySet"] = data.Item.entries.SS;
}
var doProcessBatch = false;
if (pendingEntryCount >= parseInt(config.batchSize.N)) {
logger.info("Batch count " + config.batchSize.N + " reached");
doProcessBatch = true;
} else {
if (config.batchSize && config.batchSize.N) {
logger.debug("Current batch count of " + pendingEntryCount + " below batch limit of " + config.batchSize.N);
}
}
// check whether the current batch is bigger than the configured
// max count, size, or older than configured max age
if (config.batchTimeoutSecs && config.batchTimeoutSecs.N && pendingEntryCount > 0 && common.now() - batchCreateDate > parseInt(config.batchTimeoutSecs.N)) {
logger.info("Batch age " + config.batchTimeoutSecs.N + " seconds reached");
doProcessBatch = true;
} else {
if (config.batchTimeoutSecs && config.batchTimeoutSecs.N) {
logger.debug("Current batch age of " + (common.now() - batchCreateDate) + " seconds below batch timeout: "
+ (config.batchTimeoutSecs.N ? config.batchTimeoutSecs.N : "None Defined"));
}
}
if (config.batchSizeBytes && config.batchSizeBytes.N && pendingEntryCount > 0 && parseInt(config.batchSizeBytes.N) <= parseInt(data.Item.size.N)) {
logger.info("Batch size " + config.batchSizeBytes.N + " bytes reached");
doProcessBatch = true;
} else {
if (data.Item.size.N) {
logger.debug("Current batch size of " + data.Item.size.N + " below batch threshold or not configured");
}
}
if (doProcessBatch) {
// set the current batch to locked status
var updateCurrentBatchStatus = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
status: {
Action: 'PUT',
Value: {
S: locked
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
},
/*
* the batch to be processed has to be 'open', otherwise
* we'll have multiple processes all handling a single
* batch
*/
Expected: {
status: {
AttributeValueList: [{
S: open
}],
ComparisonOperator: 'EQ'
}
},
/*
* add the ALL_NEW return values so we have the most up
* to date version of the entries string set
*/
ReturnValues: "ALL_NEW"
};
logger.debug("Attempting to lock Batch for processing");
logger.debug(JSON.stringify(updateCurrentBatchStatus));
common.retryableUpdate(dynamoDB, updateCurrentBatchStatus, function (err, data) {
if (err) {
if (err.code === conditionCheckFailed) {
// some other Lambda function has locked the
// batch - this is OK and we'll just exit
// quietly
logger.debug("Batch is ready to be processed, but another thread has locked it for loading");
context.done(null, null);
} else if (err.code === provisionedThroughputExceeded) {
logger.error("Provisioned Throughput Exceeded on " + batchTable + " while trying to lock Batch");
context.done(error, JSON.stringify(err));
} else {
logger.error("Unhandled exception while trying to lock Batch " + thisBatchId);
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
}
} else {
if (!data || !data.Attributes) {
var e = "Unable to extract latest pending entries set from Locked batch";
logger.error(e);
context.done(error, e);
} else {
/*
* assign the loaded configuration a new batch
* ID
*/
var allocateNewBatchRequest = {
Key: {
s3Prefix: {
S: s3Info.prefix
}
},
TableName: configTable,
AttributeUpdates: {
currentBatch: {
Action: 'PUT',
Value: {
S: uuid.v4()
}
},
lastBatchRotation: {
Action: 'PUT',
Value: {
S: common.getFormattedDate()
}
}
}
};
logger.debug("Allocating new Batch ID for future processing");
logger.debug(JSON.stringify(allocateNewBatchRequest));
common.retryableUpdate(dynamoDB, allocateNewBatchRequest, function (err, data) {
if (err) {
logger.error("Error while allocating new Pending Batch ID");
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
} else {
// OK - let's create the manifest file
createManifest(config, thisBatchId, s3Info, pendingEntries);
}
});
}
}
});
} else {
logger.info("No pending batch flush required");
context.done(null, null);
}
}
});
};
/**
* Function which will create the manifest for a given batch and entries
*/
function createManifest(config, thisBatchId, s3Info, batchEntries) {
logger.info("Creating Manifest for Batch " + thisBatchId);
var manifestInfo = common.createManifestInfo(config);
// create the manifest file for the file to be loaded
var manifestContents = {
entries: []
};
logger.debug("Building new COPY Manifest");
function addEntry(url, contentLength) {
manifestContents.entries.push({
/*
* fix url encoding for files with spaces. Space values come in from
* Lambda with '+' and plus values come in as %2B. Redshift wants
* the original S3 value
*/
url: 's3://' + url.replace(/\+/g, ' ').replace(/%2B/g, '+'),
mandatory: true,
meta: {
content_length: contentLength
}
});
}
// process the batch contents which are structured as a map listing filename and file size
if (batchEntries.entryMap) {
batchEntries.entryMap.map(function (batchEntry) {
addEntry(batchEntry.M.file.S, parseInt(batchEntry.M.size.N));
});
}
// process batch contents which are structured as a StringSet
if (batchEntries.entrySet) {
batchEntries.entrySet.map(function (batchEntry) {
addEntry(batchEntry, s3Info.size);
});
}
let s3PutParams = {
Bucket: manifestInfo.manifestBucket,
Key: manifestInfo.manifestPrefix,
Body: JSON.stringify(manifestContents)
};
logger.info("Writing manifest to " + manifestInfo.manifestBucket + "/" + manifestInfo.manifestPrefix);
/*
* save the manifest file to S3 and build the rest of the copy command
* in the callback letting us know that the manifest was created
* correctly
*/
s3.putObject(s3PutParams, loadRedshiftWithManifest.bind(undefined, config, thisBatchId, s3Info, manifestInfo));
};
/**
* Function run when the Redshift manifest write completes successfully
*/
function loadRedshiftWithManifest(config, thisBatchId, s3Info, manifestInfo, err, data) {
if (err) {
logger.error("Error on Manifest Creation");
logger.error(err);
failBatch(err, config, thisBatchId, s3Info, manifestInfo);
} else {
logger.info("Created Manifest " + manifestInfo.manifestPath + " Successfully");
// add the manifest file to the batch - this will NOT stop
// processing if it fails
addManifestToBatch(config, thisBatchId, s3Info, manifestInfo);
// convert the config.loadClusters list into a format that
// looks like a native dynamo entry
var clustersToLoad = [];
for (var i = 0; i < config.loadClusters.L.length; i++) {
clustersToLoad[clustersToLoad.length] = config.loadClusters.L[i].M;
}
logger.info("Loading " + clustersToLoad.length + " Clusters");
// run all the cluster loaders in parallel
async.map(clustersToLoad, function (item, callback) {
// call the load cluster function, passing it the continuation
// callback
loadCluster(config, thisBatchId, s3Info, manifestInfo, item, callback);
}, function (err, results) {
if (err) {
logger.error(err);
}
// go through all the results - if they were all
// OK, then close the batch OK - otherwise fail
var allOK = true;
var loadState = {};
for (var i = 0; i < results.length; i++) {
if (!results[i] || results[i].status === ERROR) {
allOK = false;
logger.error("Cluster Load Failure " + results[i].error + " on Cluster " + results[i].cluster);
}
// log the response state for each cluster
loadState[results[i].cluster] = {
status: results[i].status,
error: results[i].error
};
}
var loadStateRequest = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
clusterLoadStatus: {
Action: 'PUT',
Value: {
S: JSON.stringify(loadState)
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
}
};
logger.debug("Linking Batch load state for cluster");
logger.debug(JSON.stringify(loadStateRequest));
common.retryableUpdate(dynamoDB, loadStateRequest, function (err, data) {
if (err) {
logger.error("Error while attaching per-Cluster Load State");
failBatch(err, config, thisBatchId, s3Info, manifestInfo);
} else {
if (allOK === true) {
// close the batch as OK
closeBatch(null, config, thisBatchId, s3Info, manifestInfo, loadState);
} else {
// close the batch as failure
failBatch(loadState, config, thisBatchId, s3Info, manifestInfo);
}
}
});
});
}
};
/**
* Function which will run a postgres command with retries
*/
function runPgCommand(clusterInfo, client, command, retries, retryableErrorTraps, retryBackoffBaseMs, callback) {
var completed = false;
var retryCount = 0;
var lastError;
async.until(function (test_cb) {
test_cb(null, completed || !retries || retryCount >= retries);
}, function (asyncCallback) {
logger.debug("Performing Database Command:");
logger.debug(command);
client.query(command, function (queryCommandErr, result) {
if (queryCommandErr) {
lastError = queryCommandErr;
// check all the included retryable error traps to see if
// this is a retryable error
var retryable = false;
if (retryableErrorTraps) {
retryableErrorTraps.map(function (retryableError) {
if (queryCommandErr.detail && queryCommandErr.detail.indexOf(retryableError) > -1) {
retryable = true;
}
});
}
// if the error is not retryable, then fail by calling the
// async callback with the specified error
if (!retryable) {
completed = true;
if (queryCommandErr && queryCommandErr.detail) {
logger.error(queryCommandErr.detail);
}
asyncCallback(queryCommandErr);
} else {
// incre ment the retry count
retryCount += 1;
logger.warn("Retryable Error detected. Try Attempt " + retryCount);
// exponential backoff
// if a backoff time is
// provided
if (retryBackoffBaseMs) {
setTimeout(function () {
// call the async callback
asyncCallback(null);
}, Math.pow(2, retryCount) * retryBackoffBaseMs);
}
}
} else {
completed = true;
asyncCallback(queryCommandErr);
}
});
}, function (afterQueryCompletedErr) {
// close the server connection
client.end((disconnectErr) => {
if (disconnectErr) {
logger.error("Error during server disconnect: " + disconnectErr.stack);
logger.error("Watch for database connection count increasing without limit!!!");
}
/*
* check the status of the query completion, but don't worry
* about disconnection errors here. we can't fix them, and
* hopefully the server will just close them effectively :/
*/
if (afterQueryCompletedErr) {
// callback as error
callback(null, {
status: ERROR,
error: afterQueryCompletedErr,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
if (!completed) {
// we were unable to complete the command
callback(null, {
status: ERROR,
error: lastError,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
// command ok
callback(null, {
status: OK,
error: null,
cluster: clusterInfo.clusterEndpoint.S
});
}
}
});
});
};
/**
* Function which loads a redshift cluster
*
*/
function loadCluster(config, thisBatchId, s3Info, manifestInfo, clusterInfo, callback) {
/* build the redshift copy command */
var copyCommand = '';
// set the statement timeout to 10 seconds less than the remaining
// execution time on the lambda function, or 60 seconds if we can't
// resolve the time remaining. fail the lambda function if we have less
// than 5 seconds remaining
var remainingMillis;
if (context) {
remainingMillis = context.getRemainingTimeInMillis();
if (remainingMillis < 10000) {
failBatch("Remaining duration of " + remainingMillis + ' insufficient to load cluster', config, thisBatchId, s3Info, manifestInfo);
} else {
copyCommand = 'set statement_timeout to ' + (remainingMillis - 10000) + ';\n';
}
} else {
copyCommand = 'set statement_timeout to 60000;\n';
}
// open a transaction so that all pre-sql, load, and post-sql commit at
// once
copyCommand += 'begin;\n';
// if the presql option is set, insert it into the copyCommand
if (clusterInfo.presql && clusterInfo.presql.S) {
copyCommand += clusterInfo.presql.S + (clusterInfo.presql.S.slice(-1) == ";" ? "" : ";") + '\n'
}
var copyOptions = "manifest ";
// add the truncate option if requested
if (clusterInfo.truncateTarget && clusterInfo.truncateTarget.BOOL) {
copyCommand += 'truncate table ' + clusterInfo.targetTable.S + ';\n';
}
var encryptedItems = {};
var useLambdaCredentialsToLoad = true;
const s3secretKeyMapEntry = "s3secretKey";
const passwordKeyMapEntry = "clusterPassword";
const symmetricKeyMapEntry = "symmetricKey";
if (config.secretKeyForS3) {
encryptedItems[s3secretKeyMapEntry] = Buffer.from(config.secretKeyForS3.S, 'base64');
useLambdaCredentialsToLoad = false;
}
logger.debug("Loading Cluster " + clusterInfo.clusterEndpoint.S + " with " + (useLambdaCredentialsToLoad === true ? "Lambda" : "configured") + " credentials");
// add the cluster password
encryptedItems[passwordKeyMapEntry] = Buffer.from(clusterInfo.connectPassword.S, 'base64');
// add the master encryption key to the list of items to be decrypted,
// if there is one
if (config.masterSymmetricKey) {
encryptedItems[symmetricKeyMapEntry] = Buffer.from(config.masterSymmetricKey.S, 'base64');
}
// decrypt the encrypted items
kmsCrypto.decryptMap(encryptedItems, function (err, decryptedConfigItems) {
if (err) {
callback(err, {
status: ERROR,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
// create the credentials section
var credentials;
if (useLambdaCredentialsToLoad === true) {
credentials = 'aws_access_key_id=' + aws.config.credentials.accessKeyId + ';aws_secret_access_key=' + aws.config.credentials.secretAccessKey;
if (aws.config.credentials.sessionToken) {
credentials += ';token=' + aws.config.credentials.sessionToken;
}
} else {
credentials = 'aws_access_key_id=' + config.accessKeyForS3.S + ';aws_secret_access_key=' + decryptedConfigItems[s3secretKeyMapEntry].toString();
}
if (typeof clusterInfo.columnList === 'undefined') {
copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' from \'s3://' + manifestInfo.manifestPath + '\'';
} else {
copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' (' + clusterInfo.columnList.S + ') from \'s3://' + manifestInfo.manifestPath + '\'';
}
// add data formatting directives to copy
// options
if (config.dataFormat.S === 'CSV') {
// if removequotes or escape has been used in copy options, then we wont use the CSV formatter
if (!(config.copyOptions && (config.copyOptions.S.toUpperCase().indexOf('REMOVEQUOTES') > -1 || config.copyOptions.S.toUpperCase().indexOf('ESCAPE') > -1))) {
copyOptions = copyOptions + 'format csv ';
}
copyOptions = copyOptions + 'delimiter \'' + config.csvDelimiter.S + '\'\n';
// this will ignore the first line
if (config.ignoreCsvHeader && config.ignoreCsvHeader.BOOL) {
copyOptions = copyOptions + ' IGNOREHEADER 1 ' + '\n';
}
} else if (config.dataFormat.S === 'JSON' || config.dataFormat.S === 'AVRO') {
copyOptions = copyOptions + ' format ' + config.dataFormat.S;
if (!(config.jsonPath === undefined || config.jsonPath === null)) {
copyOptions = copyOptions + ' \'' + config.jsonPath.S + '\' \n';
} else {
copyOptions = copyOptions + ' \'auto\' \n';
}
} else if (config.dataFormat.S === 'PARQUET' || config.dataFormat.S === 'ORC') {
copyOptions = copyOptions + ' format as ' + config.dataFormat.S;
} else {
callback(null, {
status: ERROR,
error: 'Unsupported data format ' + config.dataFormat.S,
cluster: clusterInfo.clusterEndpoint.S
});
}
// add compression directives
if (config.compression) {
copyOptions = copyOptions + ' ' + config.compression.S + '\n';
}
// add copy options
if (config.copyOptions !== undefined) {
copyOptions = copyOptions + config.copyOptions.S + '\n';
}
// add the encryption option to the copy command, and the master
// symmetric key clause to the credentials
if (config.masterSymmetricKey) {
copyOptions = copyOptions + "encrypted\n";
if (decryptedConfigItems[symmetricKeyMapEntry]) {
credentials = credentials + ";master_symmetric_key=" + decryptedConfigItems[symmetricKeyMapEntry].toString();
} else {
// we didn't get a decrypted symmetric key back so fail
callback(null, {
status: ERROR,
error: "KMS did not return a Decrypted Master Symmetric Key Value from: " + config.masterSymmetricKey.S,
cluster: clusterInfo.clusterEndpoint.S
});
}
}
// build the final copy command
copyCommand = copyCommand + " with credentials as \'" + credentials + "\' " + copyOptions + ";\n";
// if the post-sql option is set, insert it into the copyCommand
if (clusterInfo.postsql && clusterInfo.postsql.S) {
copyCommand += clusterInfo.postsql.S + (clusterInfo.postsql.S.slice(-1) == ";" ? "" : ";") + '\n'
}
copyCommand += 'commit;';
logger.debug("Copy Command Assembly Complete");
logger.debug(copyCommand);
// build the connection string
var dbString = 'postgres://' + clusterInfo.connectUser.S + ":" + encodeURIComponent(decryptedConfigItems[passwordKeyMapEntry].toString()) + "@" + clusterInfo.clusterEndpoint.S + ":"
+ clusterInfo.clusterPort.N;
if (clusterInfo.clusterDB) {
dbString = dbString + '/' + clusterInfo.clusterDB.S;
}
if (clusterInfo.useSSL && clusterInfo.useSSL.BOOL === true) {
dbString = dbString + '?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory';
}
let overrideDbString = process.env['_OVERRIDE_DBSTRING'];
if (overrideDbString && overrideDbString !== null) {
dbString = overrideDbString;
logger.info("Using Override Database String: " + overrideDbString);
} else {
logger.info("Connecting to Database " + clusterInfo.clusterEndpoint.S + ":" + clusterInfo.clusterPort.N);
}
/*
* connect to database and run the copy command
*/
const pgClient = new Client({
connectionString: dbString
});
pgClient.connect((err) => {
if (err) {
logger.error(err);
callback(null, {
status: ERROR,
error: err,
cluster: clusterInfo.clusterEndpoint.S
});
} else {
/*
* run the copy command. We will allow 5 retries when
* the 'specified key does not exist' error is
* encountered, as this means an issue with eventual
* consistency in US Std. We will use an exponential
* backoff from 30ms with 5 retries - giving a max retry
* duration of ~ 1 second
*/
runPgCommand(clusterInfo, pgClient, copyCommand, 5, ["S3ServiceException:The specified key does not exist.,Status 404"], 30, callback);
}
});
}
});
};
/**
* Function which marks a batch as failed and sends notifications
* accordingly
*/
function failBatch(loadState, config, thisBatchId, s3Info, manifestInfo) {
logger.info("Failing Batch " + thisBatchId + " due to " + JSON.stringify(loadState));
if (config.failedManifestKey && manifestInfo) {
// copy the manifest to the failed location
manifestInfo.failedManifestPrefix = manifestInfo.manifestPrefix.replace(manifestInfo.manifestKey + '/', config.failedManifestKey.S + '/');
manifestInfo.failedManifestPath = manifestInfo.manifestBucket + '/' + manifestInfo.failedManifestPrefix;
var copySpec = {
Bucket: manifestInfo.manifestBucket,
Key: manifestInfo.failedManifestPrefix,
CopySource: manifestInfo.manifestPath,
Metadata: {
'x-amz-meta-load-date': common.readableTime(common.now())
}
};
logger.debug("Moving manifest file to failure manifest prefix");
logger.debug(JSON.stringify(copySpec));
s3.copyObject(copySpec, function (err, data) {
if (err) {
logger.error(err);
closeBatch(err, config, thisBatchId, s3Info, manifestInfo);
} else {
logger.info('Created new Failed Manifest ' + manifestInfo.failedManifestPath);
// update the batch entry showing the failed
// manifest location
var manifestModification = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
manifestFile: {
Action: 'PUT',
Value: {
S: manifestInfo.failedManifestPath
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
}
};
logger.debug("Marking new failure manifest location on Batch entry");
logger.debug(JSON.stringify(manifestModification));
common.retryableUpdate(dynamoDB, manifestModification, function (err, data) {
if (err) {
console.log(err);
// add this new error to the original failed load
// state
closeBatch(loadState + " " + err, config, thisBatchId, s3Info, manifestInfo);
} else {
// close the batch with the original
// calling error
closeBatch(loadState, config, thisBatchId, s3Info, manifestInfo);
}
});
}
});
} else {
logger.info('Not requesting copy of Manifest to Failed S3 Location');
closeBatch(loadState, config, thisBatchId, s3Info, manifestInfo);
}
};
/**
* Function which closes the batch to mark it as done, including
* notifications
*/
function closeBatch(batchError, config, thisBatchId, s3Info, manifestInfo) {
var item = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
status: {
Action: 'PUT',
Value: {
S: complete
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
}
};
// add the error message to the updates if we had one
if (batchError && batchError !== null) {
item.AttributeUpdates.errorMessage = {
Action: 'PUT',
Value: {
S: JSON.stringify(batchError)
}
};
item.AttributeUpdates.status = {
Action: 'PUT',
Value: {
S: error
}
};
}
logger.debug("Marking Batch entry as completed");
logger.debug(JSON.stringify(item));
// mark the batch as closed
common.retryableUpdate(dynamoDB, item, function (err, data) {
// ugh, the batch closure didn't finish - this is not a good
// place to be
if (err) {
logger.error("Batch closure failed. Batch will remain in load state and must be manually reset. Check whether database load completed before moving to final state.")
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
} else {
// send notifications
notify(config, thisBatchId, s3Info, manifestInfo, batchError, function (err) {
if (err) {
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err) + " " + JSON.stringify(batchError));
} else if (batchError) {
logger.error(JSON.stringify(batchError));
// allow for an environment variable to suppress failure
// end status if failure notifications were correctly
// sent
if (config.failureTopicARN && process.env[SUPPRESS_FAILURE_ON_OK_NOTIFICATION] === 'true') {
logger.info("Suppressing failed end state due to environment setting " + SUPPRESS_FAILURE_ON_OK_NOTIFICATION);
context.done(null, null);
} else {
context.done(error, JSON.stringify(batchError));
}
} else {
logger.info("Batch Load " + thisBatchId + " Complete");
context.done(null, null)
}
});
}
});
};
/** send an SNS message to a topic */
function sendSNS(topic, subj, msg, callback) {
var m = {
Message: JSON.stringify(msg),
Subject: subj,
TopicArn: topic
};
logger.debug(`Sending SNS Notification to ${topic}`);
logger.debug(JSON.stringify(m));
sns.publish(m, function (err, data) {
callback(err);
});
};
/** Send SNS notifications if configured for OK vs Failed status */
function notify(config, thisBatchId, s3Info, manifestInfo, batchError, callback) {
var statusMessage = batchError ? 'error' : 'ok';
var errorMessage = batchError ? JSON.stringify(batchError) : null;
var messageBody = {
error: errorMessage,
status: statusMessage,
batchId: thisBatchId,
s3Prefix: s3Info.prefix,
key: s3Info.key
};
if (manifestInfo) {
messageBody.originalManifest = manifestInfo.manifestPath;
messageBody.failedManifest = manifestInfo.failedManifestPath;
}
var sendNotifications = [];
if (batchError && batchError !== null) {
logger.error(JSON.stringify(batchError));
if (config.failureTopicARN) {
sendNotifications.push(sendSNS.bind(undefined, config.failureTopicARN.S, "Lambda Redshift Batch Load " + thisBatchId + " Failure", messageBody));
}
}
if (config.successTopicARN) {
sendNotifications.push(sendSNS.bind(undefined, config.successTopicARN.S, "Lambda Redshift Batch Load " + thisBatchId + " OK", messageBody));
}
async.waterfall(sendNotifications, function (err) {
callback(err);
});
}
/* end of runtime functions */
try {
logger.debug(JSON.stringify(event));
if (!event.Records) {
// filter out unsupported events
logger.error("Event type unsupported by Lambda Redshift Loader");
logger.info(JSON.stringify(event));
context.done(null, null);
} else {
if (event.Records.length > 1) {
context.done(error, "Unable to process multi-record events");
} else {
var r = event.Records[0];
// ensure that we can process this event based on a variety
// of criteria
var noProcessReason;
if (r.eventSource !== "aws:s3") {
noProcessReason = "Invalid Event Source " + r.eventSource;
}
if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) {
noProcessReason = "Invalid Event Name " + r.eventName;
}
if (r.s3.s3SchemaVersion !== "1.0") {
noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion;
}
if (noProcessReason) {
logger.error(noProcessReason);
context.done(error, noProcessReason);
} else {
// extract the s3 details from the event
var inputInfo = {
bucket: undefined,
key: undefined,
prefix: undefined,
size: undefined,
inputFilename: undefined
};
inputInfo.bucket = r.s3.bucket.name;
inputInfo.key = decodeURIComponent(r.s3.object.key);
inputInfo.size = r.s3.object.size;
// remove the bucket name from the key, if we have
// received it - this happens on object copy
inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", "");
var keyComponents = inputInfo.key.split('/');
inputInfo.inputFilename = keyComponents[keyComponents.length - 1];
// remove the filename from the prefix value
var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, '');
// transform hive style dynamic prefixes into static
// match prefixes and set the prefix in inputInfo
inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix(suppressWildcardPrefixes);
// add the object size to inputInfo
inputInfo.size = r.s3.object.size;
resolveConfig(inputInfo.prefix, function (err, configData) {
/*
* we did get a configuration found by the resolveConfig
* method
*/
if (err) {
logger.error(JSON.stringify(err));
context.done(err, JSON.stringify(err));
} else {
// update the inputInfo prefix to match the
// resolved
// config entry
inputInfo.prefix = configData.Item.s3Prefix.S;
logger.debug(JSON.stringify(inputInfo));
// call the foundConfig method with the data
// item
foundConfig(inputInfo, null, configData);
}
}, function (err) {
// finish with no exception - where this file sits
// in the S3 structure is not configured for redshift
// loads, or there was an access issue that prevented us
// querying DDB
logger.error("No Configuration Found for " + inputInfo.prefix);
if (err) {
logger.error(err);
}
context.done(err, JSON.stringify(err));
});
}
}
}
} catch (e) {
logger.error("Unhandled Exception");
logger.error(JSON.stringify(e));
logger.error(JSON.stringify(event));
context.done(error, JSON.stringify(e));
}
}