in firehose-sink/src/main/java/com/amazonaws/hbase/datasink/FirehoseDataSinkImpl.java [144:192]
private synchronized void flush(String streamName) {
long startTime = System.currentTimeMillis();
ConcurrentLinkedQueue<Record> recordsList = (ConcurrentLinkedQueue<Record>)streamListMap.get(streamName);
LinkedList<Record> tmpList = new LinkedList<Record>();
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(streamName);
for (int counter = 0; counter < 500; counter ++ ) {
try {
tmpList.add(recordsList.element());
} catch ( NoSuchElementException e ) {
//Someone has emptied the buffer quicker than us.
break;
}
}
PutRecordBatchResult results = null;
putRecordBatchRequest.setRecords(tmpList);
try {
results = firehose.putRecordBatch(putRecordBatchRequest);
} catch ( ResourceNotFoundException e ) {
LOG.error(e.getErrorMessage(),e.getCause(),e.getErrorCode());
throw new RuntimeException(e);
} catch ( InvalidArgumentException e) {
LOG.error("Invalid record count > 500 or invalid record size");
throw new RuntimeException(e);
} catch (InvalidKMSResourceException e) {
LOG.error("Invalid KMS Key");
throw new RuntimeException(e);
} catch ( ServiceUnavailableException e) {
LOG.warn("Throttled ! retrying without delay.");
}
// We just remove the records that are successful from the list. The rest will be retried.
int successCount = 0;
for ( PutRecordBatchResponseEntry resultentry: results.getRequestResponses() ){
Record item = tmpList.remove();
if ( resultentry.getRecordId() != null) {
successCount ++;
recordsList.remove(item);
}
}
addFlushstats(streamName,System.currentTimeMillis());
if (LOG.isDebugEnabled()) {
LOG.debug("putRecordBatch success: " +successCount+ " out of results:" +results.getRequestResponses().size() + " PutRecordBatch took {} ms ", Long.toString(System.currentTimeMillis() - startTime));
}
}