function writeToFirehose()

in index.js [541:594]


function writeToFirehose(firehoseBatch, streamName, deliveryStreamName, callback, retries) {
    if (retries === undefined) {
        retries = 0;
    }

    // write the batch to firehose with putRecordBatch
    var putRecordBatchParams = {
        DeliveryStreamName: deliveryStreamName.substring(0, 64),
        Records: firehoseBatch
    };

    if (debug) {
        console.log('Writing to firehose delivery stream (' + retries + ')');
        console.log(JSON.stringify(putRecordBatchParams));
    }

    var startTime = new Date().getTime();
    exports.firehose.putRecordBatch(putRecordBatchParams, function (err, data) {
        if (err) {
            console.log(JSON.stringify(err));
            callback(err);
        } else {
            if (data.FailedPutCount !== 0) {
                console.log("Failed to write " + data.FailedPutCount + "/" + firehoseBatch.length + " records. Retrying to write...");
                if (retries < MAX_RETRY_ON_FAILED_PUT) {
                    // extract the failed records
                    var failedBatch = [];
                    data.RequestResponses.map(function (item, index) {
                        if (item.hasOwnProperty('ErrorCode')) {
                            failedBatch.push(firehoseBatch[index]);
                        }
                    });

                    setTimeout(exports.writeToFirehose.bind(undefined, failedBatch, streamName, deliveryStreamName, function (err) {
                        if (err) {
                            callback(err);
                        } else {
                            callback();
                        }
                    }, retries + 1), RETRY_INTERVAL_MS);
                } else {
                    console.log('Maximum retries reached, giving up');
                    callback(data);
                }
            } else {
                if (debug) {
                    var elapsedMs = new Date().getTime() - startTime;
                    console.log("Successfully wrote " + firehoseBatch.length + " records to Firehose " + deliveryStreamName + " in " + elapsedMs + " ms");
                }
                callback();
            }
        }
    });
}