protected BufferSendResult attemptSend()

in src/com/amazon/kinesis/streaming/agent/tailing/KinesisSender.java [78:174]


    protected BufferSendResult<KinesisRecord> attemptSend(RecordBuffer<KinesisRecord> buffer) {
        activePutRecordsCalls.incrementAndGet();
        IMetricsScope metrics = agentContext.beginScope();
        metrics.addDimension(Metrics.DESTINATION_DIMENSION, "KinesisStream:" + getDestination());
        if (!Strings.isNullOrEmpty(agentContext.getInstanceTag())) {
            metrics.addDimension(Metrics.INSTANCE_DIMENSION, agentContext.getInstanceTag());
        }
        try {
            BufferSendResult<KinesisRecord> sendResult = null;
            List<PutRecordsRequestEntry> requestRecords = new ArrayList<>();
            for(KinesisRecord data : buffer) {
                PutRecordsRequestEntry record = new PutRecordsRequestEntry();
                record.setData(data.data());
                record.setPartitionKey(data.partitionKey());
                requestRecords.add(record);
            }
            PutRecordsRequest request = new PutRecordsRequest();
            request.setStreamName(getDestination());
            request.setRecords(requestRecords);
            PutRecordsResult result = null;
            Stopwatch timer = Stopwatch.createStarted();
            totalPutRecordsCalls.incrementAndGet();
            try {
                logger.trace("{}: Sending buffer {} to kinesis stream {}...", flow.getId(), buffer, getDestination());
                metrics.addCount(RECORDS_ATTEMPTED_METRIC, requestRecords.size());
                result = agentContext.getKinesisClient().putRecords(request);
                metrics.addCount(SERVICE_ERRORS_METRIC, 0);
            } catch (AmazonServiceException e) {
                metrics.addCount(SERVICE_ERRORS_METRIC, 1);
                totalPutRecordsServiceErrors.incrementAndGet();
                throw e;
            } catch (Exception e) {
                metrics.addCount(SERVICE_ERRORS_METRIC, 1);
                totalPutRecordsOtherErrors.incrementAndGet();
                throw e;
            } finally {
                totalPutRecordsLatency.addAndGet(timer.elapsed(TimeUnit.MILLISECONDS));
            }
            if(sendResult == null) {
                List<Integer> sentRecords = new ArrayList<>(requestRecords.size());
                Multiset<String> errors = HashMultiset.<String> create();
                int index = 0;
                long totalBytesSent = 0;
                for (final PutRecordsResultEntry responseEntry : result.getRecords()) {
                	final PutRecordsRequestEntry record = requestRecords.get(index);
                    if (responseEntry.getErrorCode() == null) {
                        sentRecords.add(index);
                        totalBytesSent += record.getData().limit();
                    } else {
                        logger.trace("{}:{} Record {} returned error code {}: {}", flow.getId(), buffer, index, responseEntry.getErrorCode(),
                                responseEntry.getErrorMessage());
                        errors.add(responseEntry.getErrorCode());
                    }
                    ++index;
                }
                if(sentRecords.size() == requestRecords.size()) {
                    sendResult = BufferSendResult.succeeded(buffer);
                } else {
                    buffer = buffer.remove(sentRecords);
                    sendResult = BufferSendResult.succeeded_partially(buffer, requestRecords.size());
                }
                metrics.addData(BYTES_SENT_METRIC, totalBytesSent, StandardUnit.Bytes);
                metrics.addCount(RECORDS_SENT_METRIC, sentRecords.size());
                int failedRecordCount = requestRecords.size() - sentRecords.size();
                metrics.addCount(RECORD_ERRORS_METRIC, failedRecordCount);
                logger.debug("{}:{} Records sent to kinesis stream {}: {}. Failed records: {}",
                        flow.getId(),
                        buffer,
                        getDestination(),
                        sentRecords.size(),
                        failedRecordCount);
                totalRecordsAttempted.addAndGet(requestRecords.size());
                totalRecordsSent.addAndGet(sentRecords.size());
                totalRecordsFailed.addAndGet(failedRecordCount);

                if(logger.isDebugEnabled() && !errors.isEmpty()) {
                    synchronized(totalErrors) {
                        StringBuilder strErrors = new StringBuilder();
                        for(Multiset.Entry<String> err : errors.entrySet()) {
                            AtomicLong counter = totalErrors.get(err.getElement());
                            if (counter == null)
                                totalErrors.put(err.getElement(), counter = new AtomicLong());
                            counter.addAndGet(err.getCount());
                            if(strErrors.length() > 0)
                                strErrors.append(", ");
                            strErrors.append(err.getElement()).append(": ").append(err.getCount());
                        }
                        logger.debug("{}:{} Errors from kinesis stream {}: {}", flow.getId(), buffer, flow.getDestination(), strErrors.toString());
                    }
                }
            }
            return sendResult;
        } finally {
            metrics.commit();
            activePutRecordsCalls.decrementAndGet();
        }
    }