in src/com/amazon/kinesis/streaming/agent/tailing/KinesisSender.java [78:174]
protected BufferSendResult<KinesisRecord> attemptSend(RecordBuffer<KinesisRecord> buffer) {
activePutRecordsCalls.incrementAndGet();
IMetricsScope metrics = agentContext.beginScope();
metrics.addDimension(Metrics.DESTINATION_DIMENSION, "KinesisStream:" + getDestination());
if (!Strings.isNullOrEmpty(agentContext.getInstanceTag())) {
metrics.addDimension(Metrics.INSTANCE_DIMENSION, agentContext.getInstanceTag());
}
try {
BufferSendResult<KinesisRecord> sendResult = null;
List<PutRecordsRequestEntry> requestRecords = new ArrayList<>();
for(KinesisRecord data : buffer) {
PutRecordsRequestEntry record = new PutRecordsRequestEntry();
record.setData(data.data());
record.setPartitionKey(data.partitionKey());
requestRecords.add(record);
}
PutRecordsRequest request = new PutRecordsRequest();
request.setStreamName(getDestination());
request.setRecords(requestRecords);
PutRecordsResult result = null;
Stopwatch timer = Stopwatch.createStarted();
totalPutRecordsCalls.incrementAndGet();
try {
logger.trace("{}: Sending buffer {} to kinesis stream {}...", flow.getId(), buffer, getDestination());
metrics.addCount(RECORDS_ATTEMPTED_METRIC, requestRecords.size());
result = agentContext.getKinesisClient().putRecords(request);
metrics.addCount(SERVICE_ERRORS_METRIC, 0);
} catch (AmazonServiceException e) {
metrics.addCount(SERVICE_ERRORS_METRIC, 1);
totalPutRecordsServiceErrors.incrementAndGet();
throw e;
} catch (Exception e) {
metrics.addCount(SERVICE_ERRORS_METRIC, 1);
totalPutRecordsOtherErrors.incrementAndGet();
throw e;
} finally {
totalPutRecordsLatency.addAndGet(timer.elapsed(TimeUnit.MILLISECONDS));
}
if(sendResult == null) {
List<Integer> sentRecords = new ArrayList<>(requestRecords.size());
Multiset<String> errors = HashMultiset.<String> create();
int index = 0;
long totalBytesSent = 0;
for (final PutRecordsResultEntry responseEntry : result.getRecords()) {
final PutRecordsRequestEntry record = requestRecords.get(index);
if (responseEntry.getErrorCode() == null) {
sentRecords.add(index);
totalBytesSent += record.getData().limit();
} else {
logger.trace("{}:{} Record {} returned error code {}: {}", flow.getId(), buffer, index, responseEntry.getErrorCode(),
responseEntry.getErrorMessage());
errors.add(responseEntry.getErrorCode());
}
++index;
}
if(sentRecords.size() == requestRecords.size()) {
sendResult = BufferSendResult.succeeded(buffer);
} else {
buffer = buffer.remove(sentRecords);
sendResult = BufferSendResult.succeeded_partially(buffer, requestRecords.size());
}
metrics.addData(BYTES_SENT_METRIC, totalBytesSent, StandardUnit.Bytes);
metrics.addCount(RECORDS_SENT_METRIC, sentRecords.size());
int failedRecordCount = requestRecords.size() - sentRecords.size();
metrics.addCount(RECORD_ERRORS_METRIC, failedRecordCount);
logger.debug("{}:{} Records sent to kinesis stream {}: {}. Failed records: {}",
flow.getId(),
buffer,
getDestination(),
sentRecords.size(),
failedRecordCount);
totalRecordsAttempted.addAndGet(requestRecords.size());
totalRecordsSent.addAndGet(sentRecords.size());
totalRecordsFailed.addAndGet(failedRecordCount);
if(logger.isDebugEnabled() && !errors.isEmpty()) {
synchronized(totalErrors) {
StringBuilder strErrors = new StringBuilder();
for(Multiset.Entry<String> err : errors.entrySet()) {
AtomicLong counter = totalErrors.get(err.getElement());
if (counter == null)
totalErrors.put(err.getElement(), counter = new AtomicLong());
counter.addAndGet(err.getCount());
if(strErrors.length() > 0)
strErrors.append(", ");
strErrors.append(err.getElement()).append(": ").append(err.getCount());
}
logger.debug("{}:{} Errors from kinesis stream {}: {}", flow.getId(), buffer, flow.getDestination(), strErrors.toString());
}
}
}
return sendResult;
} finally {
metrics.commit();
activePutRecordsCalls.decrementAndGet();
}
}