protected BufferSendResult attemptSend()

in src/com/amazon/kinesis/streaming/agent/tailing/FirehoseSender.java [77:172]


    protected BufferSendResult<FirehoseRecord> attemptSend(RecordBuffer<FirehoseRecord> buffer) {
        activeBatchPutCalls.incrementAndGet();
        IMetricsScope metrics = agentContext.beginScope();
        metrics.addDimension(Metrics.DESTINATION_DIMENSION, "DeliveryStream:" + getDestination());
        if (!Strings.isNullOrEmpty(agentContext.getInstanceTag())) {
            metrics.addDimension(Metrics.INSTANCE_DIMENSION, agentContext.getInstanceTag());
        }
        try {
            BufferSendResult<FirehoseRecord> sendResult = null;
            List<Record> requestRecords = new ArrayList<>();
            for(FirehoseRecord data : buffer) {
                Record record = new Record();
                record.setData(data.data());
                requestRecords.add(record);
            }
            PutRecordBatchRequest request = new PutRecordBatchRequest();
            request.setRecords(requestRecords);
            request.setDeliveryStreamName(getDestination());
            PutRecordBatchResult result = null;
            Stopwatch timer = Stopwatch.createStarted();
            totalBatchPutCalls.incrementAndGet();
            try {
                logger.trace("{}: Sending buffer {} to firehose {}...", flow.getId(), buffer, getDestination());
                metrics.addCount(RECORDS_ATTEMPTED_METRIC, requestRecords.size());
                result = agentContext.getFirehoseClient().putRecordBatch(request);
                metrics.addCount(SERVICE_ERRORS_METRIC, 0);
            } catch (AmazonServiceException e) {
                metrics.addCount(SERVICE_ERRORS_METRIC, 1);
                totalBatchPutServiceErrors.incrementAndGet();
                throw e;
            } catch (Exception e) {
                metrics.addCount(SERVICE_ERRORS_METRIC, 1);
                totalBatchPutOtherErrors.incrementAndGet();
                throw e;
            } finally {
                totalBatchPutLatency.addAndGet(timer.elapsed(TimeUnit.MILLISECONDS));
            }
            if(sendResult == null) {
                List<Integer> sentRecords = new ArrayList<>(requestRecords.size());
                Multiset<String> errors = HashMultiset.<String> create();
                int index = 0;
                long totalBytesSent = 0;
                for (PutRecordBatchResponseEntry responseEntry : result.getRequestResponses()) {
                    Record record = requestRecords.get(index);
                    if (responseEntry.getErrorCode() == null) {
                        sentRecords.add(index);
                        totalBytesSent += record.getData().limit();
                    } else {
                        logger.trace("{}:{} Record {} returned error code {}: {}", flow.getId(), buffer, index, responseEntry.getErrorCode(),
                                responseEntry.getErrorMessage());
                        errors.add(responseEntry.getErrorCode());
                    }
                    ++index;
                }
                if(sentRecords.size() == requestRecords.size()) {
                    sendResult = BufferSendResult.succeeded(buffer);
                } else {
                    buffer = buffer.remove(sentRecords);
                    sendResult = BufferSendResult.succeeded_partially(buffer, requestRecords.size());
                }
                metrics.addData(BYTES_SENT_METRIC, totalBytesSent, StandardUnit.Bytes);
                metrics.addCount(RECORDS_SENT_METRIC, sentRecords.size());
                int failedRecordCount = requestRecords.size() - sentRecords.size();
                metrics.addCount(RECORD_ERRORS_METRIC, failedRecordCount);
                logger.debug("{}:{} Records sent firehose {}: {}. Failed records: {}",
                        flow.getId(),
                        buffer,
                        getDestination(),
                        sentRecords.size(),
                        failedRecordCount);
                totalRecordsAttempted.addAndGet(requestRecords.size());
                totalRecordsSent.addAndGet(sentRecords.size());
                totalRecordsFailed.addAndGet(failedRecordCount);

                if(logger.isDebugEnabled() && !errors.isEmpty()) {
                    synchronized(totalErrors) {
                        StringBuilder strErrors = new StringBuilder();
                        for(Multiset.Entry<String> err : errors.entrySet()) {
                            AtomicLong counter = totalErrors.get(err.getElement());
                            if (counter == null)
                                totalErrors.put(err.getElement(), counter = new AtomicLong());
                            counter.addAndGet(err.getCount());
                            if(strErrors.length() > 0)
                                strErrors.append(", ");
                            strErrors.append(err.getElement()).append(": ").append(err.getCount());
                        }
                        logger.debug("{}:{} Errors from firehose {}: {}", flow.getId(), buffer, flow.getDestination(), strErrors.toString());
                    }
                }
            }
            return sendResult;
        } finally {
            metrics.commit();
            activeBatchPutCalls.decrementAndGet();
        }
    }