in firehose-sink/src/main/java/com/amazonaws/hbase/datasink/FirehoseDataSinkImpl.java [106:142]
public void putRecord(final ByteBuffer buffer, String tablename) throws IOException, InterruptedException, ExecutionException, InvalidRecordException {
Record record = (new Record()).withData(buffer);
if ( this.getRecordSize(record) > Constants.MAX_RECORD_SIZE_BYTES ) {
throw new InvalidRecordException("Record size more than 1000KB : " + this.getRecordSize(record) + ", " + record.toString());
}
String streamName = configUtil.getFirehoseNameFromTableName(tablename);
if (configUtil.isBatchPutsEnabled() == false) {
long startTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(streamName);
putRecordRequest.setRecord(record);
PutRecordResult result = firehose.putRecord(putRecordRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("PutRecord took " + Long.toString(System.currentTimeMillis() - startTime) +"ms" +
", result: " + result.getRecordId() +
", recordsize: " + Integer.toString(getRecordSize(record)));
}
} else {
if (streamListMap.containsKey(streamName) == true) { // Do we have a queue for this stream ?
ConcurrentLinkedQueue<Record> recordsList = (ConcurrentLinkedQueue<Record>)streamListMap.get(streamName);
if ( recordsList.size() >= Constants.MAX_BATCHED_RECORDS) {
flush(streamName);
}
recordsList.add(record);
} else { // If the first time we all pushing for this stream.
ConcurrentLinkedQueue<Record> recordsList = new ConcurrentLinkedQueue<Record>();
recordsList.add(record);
streamListMap.put(streamName, recordsList);
}
}
}