function loadRedshiftWithManifest()

in index.js [878:972]


    function loadRedshiftWithManifest(config, thisBatchId, s3Info, manifestInfo, err, data) {
        if (err) {
            logger.error("Error on Manifest Creation");
            logger.error(err);
            failBatch(err, config, thisBatchId, s3Info, manifestInfo);
        } else {
            logger.info("Created Manifest " + manifestInfo.manifestPath + " Successfully");

            // add the manifest file to the batch - this will NOT stop
            // processing if it fails
            addManifestToBatch(config, thisBatchId, s3Info, manifestInfo);

            // convert the config.loadClusters list into a format that
            // looks like a native dynamo entry
            var clustersToLoad = [];
            for (var i = 0; i < config.loadClusters.L.length; i++) {
                clustersToLoad[clustersToLoad.length] = config.loadClusters.L[i].M;
            }

            logger.info("Loading " + clustersToLoad.length + " Clusters");

            // run all the cluster loaders in parallel
            async.map(clustersToLoad, function (item, callback) {
                // call the load cluster function, passing it the continuation
                // callback
                loadCluster(config, thisBatchId, s3Info, manifestInfo, item, callback);
            }, function (err, results) {
                if (err) {
                    logger.error(err);
                }

                // go through all the results - if they were all
                // OK, then close the batch OK - otherwise fail
                var allOK = true;
                var loadState = {};

                for (var i = 0; i < results.length; i++) {
                    if (!results[i] || results[i].status === ERROR) {
                        allOK = false;

                        logger.error("Cluster Load Failure " + results[i].error + " on Cluster " + results[i].cluster);
                    }
                    // log the response state for each cluster
                    loadState[results[i].cluster] = {
                        status: results[i].status,
                        error: results[i].error
                    };
                }

                var loadStateRequest = {
                    Key: {
                        batchId: {
                            S: thisBatchId
                        },
                        s3Prefix: {
                            S: s3Info.prefix
                        }
                    },
                    TableName: batchTable,
                    AttributeUpdates: {
                        clusterLoadStatus: {
                            Action: 'PUT',
                            Value: {
                                S: JSON.stringify(loadState)
                            }
                        },
                        lastUpdate: {
                            Action: 'PUT',
                            Value: {
                                N: '' + common.now()
                            }
                        }
                    }
                };

                logger.debug("Linking Batch load state for cluster");
                logger.debug(JSON.stringify(loadStateRequest));

                common.retryableUpdate(dynamoDB, loadStateRequest, function (err, data) {
                    if (err) {
                        logger.error("Error while attaching per-Cluster Load State");
                        failBatch(err, config, thisBatchId, s3Info, manifestInfo);
                    } else {
                        if (allOK === true) {
                            // close the batch as OK
                            closeBatch(null, config, thisBatchId, s3Info, manifestInfo, loadState);
                        } else {
                            // close the batch as failure
                            failBatch(loadState, config, thisBatchId, s3Info, manifestInfo);
                        }
                    }
                });
            });
        }
    };