in index.js [878:972]
function loadRedshiftWithManifest(config, thisBatchId, s3Info, manifestInfo, err, data) {
if (err) {
logger.error("Error on Manifest Creation");
logger.error(err);
failBatch(err, config, thisBatchId, s3Info, manifestInfo);
} else {
logger.info("Created Manifest " + manifestInfo.manifestPath + " Successfully");
// add the manifest file to the batch - this will NOT stop
// processing if it fails
addManifestToBatch(config, thisBatchId, s3Info, manifestInfo);
// convert the config.loadClusters list into a format that
// looks like a native dynamo entry
var clustersToLoad = [];
for (var i = 0; i < config.loadClusters.L.length; i++) {
clustersToLoad[clustersToLoad.length] = config.loadClusters.L[i].M;
}
logger.info("Loading " + clustersToLoad.length + " Clusters");
// run all the cluster loaders in parallel
async.map(clustersToLoad, function (item, callback) {
// call the load cluster function, passing it the continuation
// callback
loadCluster(config, thisBatchId, s3Info, manifestInfo, item, callback);
}, function (err, results) {
if (err) {
logger.error(err);
}
// go through all the results - if they were all
// OK, then close the batch OK - otherwise fail
var allOK = true;
var loadState = {};
for (var i = 0; i < results.length; i++) {
if (!results[i] || results[i].status === ERROR) {
allOK = false;
logger.error("Cluster Load Failure " + results[i].error + " on Cluster " + results[i].cluster);
}
// log the response state for each cluster
loadState[results[i].cluster] = {
status: results[i].status,
error: results[i].error
};
}
var loadStateRequest = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
clusterLoadStatus: {
Action: 'PUT',
Value: {
S: JSON.stringify(loadState)
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
}
};
logger.debug("Linking Batch load state for cluster");
logger.debug(JSON.stringify(loadStateRequest));
common.retryableUpdate(dynamoDB, loadStateRequest, function (err, data) {
if (err) {
logger.error("Error while attaching per-Cluster Load State");
failBatch(err, config, thisBatchId, s3Info, manifestInfo);
} else {
if (allOK === true) {
// close the batch as OK
closeBatch(null, config, thisBatchId, s3Info, manifestInfo, loadState);
} else {
// close the batch as failure
failBatch(loadState, config, thisBatchId, s3Info, manifestInfo);
}
}
});
});
}
};