function loadCluster()

in index.js [1076:1283]


    function loadCluster(config, thisBatchId, s3Info, manifestInfo, clusterInfo, callback) {
        /* build the redshift copy command */
        var copyCommand = '';

        // set the statement timeout to 10 seconds less than the remaining
        // execution time on the lambda function, or 60 seconds if we can't
        // resolve the time remaining. fail the lambda function if we have less
        // than 5 seconds remaining
        var remainingMillis;
        if (context) {
            remainingMillis = context.getRemainingTimeInMillis();

            if (remainingMillis < 10000) {
                failBatch("Remaining duration of " + remainingMillis + ' insufficient to load cluster', config, thisBatchId, s3Info, manifestInfo);
            } else {
                copyCommand = 'set statement_timeout to ' + (remainingMillis - 10000) + ';\n';
            }
        } else {
            copyCommand = 'set statement_timeout to 60000;\n';
        }

        // open a transaction so that all pre-sql, load, and post-sql commit at
        // once
        copyCommand += 'begin;\n';

        // if the presql option is set, insert it into the copyCommand
        if (clusterInfo.presql && clusterInfo.presql.S) {
            copyCommand += clusterInfo.presql.S + (clusterInfo.presql.S.slice(-1) == ";" ? "" : ";") + '\n'
        }

        var copyOptions = "manifest ";

        // add the truncate option if requested
        if (clusterInfo.truncateTarget && clusterInfo.truncateTarget.BOOL) {
            copyCommand += 'truncate table ' + clusterInfo.targetTable.S + ';\n';
        }

        var encryptedItems = {};
        var useLambdaCredentialsToLoad = true;
        const s3secretKeyMapEntry = "s3secretKey";
        const passwordKeyMapEntry = "clusterPassword";
        const symmetricKeyMapEntry = "symmetricKey";

        if (config.secretKeyForS3) {
            encryptedItems[s3secretKeyMapEntry] = Buffer.from(config.secretKeyForS3.S, 'base64');
            useLambdaCredentialsToLoad = false;
        }

        logger.debug("Loading Cluster " + clusterInfo.clusterEndpoint.S + " with " + (useLambdaCredentialsToLoad === true ? "Lambda" : "configured") + " credentials");

        // add the cluster password
        encryptedItems[passwordKeyMapEntry] = Buffer.from(clusterInfo.connectPassword.S, 'base64');

        // add the master encryption key to the list of items to be decrypted,
        // if there is one
        if (config.masterSymmetricKey) {
            encryptedItems[symmetricKeyMapEntry] = Buffer.from(config.masterSymmetricKey.S, 'base64');
        }

        // decrypt the encrypted items
        kmsCrypto.decryptMap(encryptedItems, function (err, decryptedConfigItems) {
            if (err) {
                callback(err, {
                    status: ERROR,
                    cluster: clusterInfo.clusterEndpoint.S
                });
            } else {
                // create the credentials section
                var credentials;

                if (useLambdaCredentialsToLoad === true) {
                    credentials = 'aws_access_key_id=' + aws.config.credentials.accessKeyId + ';aws_secret_access_key=' + aws.config.credentials.secretAccessKey;

                    if (aws.config.credentials.sessionToken) {
                        credentials += ';token=' + aws.config.credentials.sessionToken;
                    }
                } else {
                    credentials = 'aws_access_key_id=' + config.accessKeyForS3.S + ';aws_secret_access_key=' + decryptedConfigItems[s3secretKeyMapEntry].toString();
                }

                if (typeof clusterInfo.columnList === 'undefined') {
                    copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' from \'s3://' + manifestInfo.manifestPath + '\'';
                } else {
                    copyCommand = copyCommand + 'COPY ' + clusterInfo.targetTable.S + ' (' + clusterInfo.columnList.S + ') from \'s3://' + manifestInfo.manifestPath + '\'';
                }

                // add data formatting directives to copy
                // options
                if (config.dataFormat.S === 'CSV') {
                    // if removequotes or escape has been used in copy options, then we wont use the CSV formatter
                    if (!(config.copyOptions && (config.copyOptions.S.toUpperCase().indexOf('REMOVEQUOTES') > -1 || config.copyOptions.S.toUpperCase().indexOf('ESCAPE') > -1))) {
                        copyOptions = copyOptions + 'format csv ';
                    }

                    copyOptions = copyOptions + 'delimiter \'' + config.csvDelimiter.S + '\'\n';

                    // this will ignore the first line
                    if (config.ignoreCsvHeader && config.ignoreCsvHeader.BOOL) {
                        copyOptions = copyOptions + ' IGNOREHEADER 1 ' + '\n';
                    }

                } else if (config.dataFormat.S === 'JSON' || config.dataFormat.S === 'AVRO') {
                    copyOptions = copyOptions + ' format ' + config.dataFormat.S;

                    if (!(config.jsonPath === undefined || config.jsonPath === null)) {
                        copyOptions = copyOptions + ' \'' + config.jsonPath.S + '\' \n';
                    } else {
                        copyOptions = copyOptions + ' \'auto\' \n';
                    }
                } else if (config.dataFormat.S === 'PARQUET' || config.dataFormat.S === 'ORC') {
                    copyOptions = copyOptions + ' format as ' + config.dataFormat.S;
                } else {
                    callback(null, {
                        status: ERROR,
                        error: 'Unsupported data format ' + config.dataFormat.S,
                        cluster: clusterInfo.clusterEndpoint.S
                    });
                }

                // add compression directives
                if (config.compression) {
                    copyOptions = copyOptions + ' ' + config.compression.S + '\n';
                }

                // add copy options
                if (config.copyOptions !== undefined) {
                    copyOptions = copyOptions + config.copyOptions.S + '\n';
                }

                // add the encryption option to the copy command, and the master
                // symmetric key clause to the credentials
                if (config.masterSymmetricKey) {
                    copyOptions = copyOptions + "encrypted\n";

                    if (decryptedConfigItems[symmetricKeyMapEntry]) {
                        credentials = credentials + ";master_symmetric_key=" + decryptedConfigItems[symmetricKeyMapEntry].toString();
                    } else {
                        // we didn't get a decrypted symmetric key back so fail
                        callback(null, {
                            status: ERROR,
                            error: "KMS did not return a Decrypted Master Symmetric Key Value from: " + config.masterSymmetricKey.S,
                            cluster: clusterInfo.clusterEndpoint.S
                        });
                    }
                }

                // build the final copy command
                copyCommand = copyCommand + " with credentials as \'" + credentials + "\' " + copyOptions + ";\n";

                // if the post-sql option is set, insert it into the copyCommand
                if (clusterInfo.postsql && clusterInfo.postsql.S) {
                    copyCommand += clusterInfo.postsql.S + (clusterInfo.postsql.S.slice(-1) == ";" ? "" : ";") + '\n'
                }

                copyCommand += 'commit;';

                logger.debug("Copy Command Assembly Complete");
                logger.debug(copyCommand);

                // build the connection string
                var dbString = 'postgres://' + clusterInfo.connectUser.S + ":" + encodeURIComponent(decryptedConfigItems[passwordKeyMapEntry].toString()) + "@" + clusterInfo.clusterEndpoint.S + ":"
                    + clusterInfo.clusterPort.N;
                if (clusterInfo.clusterDB) {
                    dbString = dbString + '/' + clusterInfo.clusterDB.S;
                }
                if (clusterInfo.useSSL && clusterInfo.useSSL.BOOL === true) {
                    dbString = dbString + '?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory';
                }

                let overrideDbString = process.env['_OVERRIDE_DBSTRING'];
                if (overrideDbString && overrideDbString !== null) {
                    dbString = overrideDbString;
                    logger.info("Using Override Database String: " + overrideDbString);
                } else {
                    logger.info("Connecting to Database " + clusterInfo.clusterEndpoint.S + ":" + clusterInfo.clusterPort.N);
                }

                /*
				 * connect to database and run the copy command
				 */
                const pgClient = new Client({
                    connectionString: dbString
                });

                pgClient.connect((err) => {
                    if (err) {
                        logger.error(err);

                        callback(null, {
                            status: ERROR,
                            error: err,
                            cluster: clusterInfo.clusterEndpoint.S
                        });
                    } else {
                        /*
						 * run the copy command. We will allow 5 retries when
						 * the 'specified key does not exist' error is
						 * encountered, as this means an issue with eventual
						 * consistency in US Std. We will use an exponential
						 * backoff from 30ms with 5 retries - giving a max retry
						 * duration of ~ 1 second
						 */
                        runPgCommand(clusterInfo, pgClient, copyCommand, 5, ["S3ServiceException:The specified key does not exist.,Status 404"], 30, callback);
                    }
                });
            }
        });
    };