public void putRecord()

in firehose-sink/src/main/java/com/amazonaws/hbase/datasink/FirehoseDataSinkImpl.java [106:142]


	public void putRecord(final ByteBuffer buffer, String tablename) throws IOException, InterruptedException, ExecutionException, InvalidRecordException {

		Record record = (new Record()).withData(buffer);

		if ( this.getRecordSize(record) > Constants.MAX_RECORD_SIZE_BYTES ) {
			throw new InvalidRecordException("Record size more than 1000KB : " + this.getRecordSize(record) + ", " + record.toString());
		}
		
		String streamName = configUtil.getFirehoseNameFromTableName(tablename);
		
		if (configUtil.isBatchPutsEnabled() == false) {
			long startTime = System.currentTimeMillis();
			PutRecordRequest putRecordRequest = new PutRecordRequest();
			putRecordRequest.setDeliveryStreamName(streamName);
			putRecordRequest.setRecord(record);
			PutRecordResult result = firehose.putRecord(putRecordRequest);
			if (LOG.isDebugEnabled()) {
				LOG.debug("PutRecord took " + Long.toString(System.currentTimeMillis() - startTime) +"ms" +
						", result: " + result.getRecordId() + 
						", recordsize: " + Integer.toString(getRecordSize(record)));
			}
		} else {
		
			if (streamListMap.containsKey(streamName) == true) { // Do we have a queue for this stream ?
				ConcurrentLinkedQueue<Record> recordsList = (ConcurrentLinkedQueue<Record>)streamListMap.get(streamName);
				if ( recordsList.size() >= Constants.MAX_BATCHED_RECORDS) {
					flush(streamName);
				}
				recordsList.add(record);
				
			} else { // If the first time we all pushing for this stream.
				ConcurrentLinkedQueue<Record> recordsList = new ConcurrentLinkedQueue<Record>();
				recordsList.add(record);
				streamListMap.put(streamName, recordsList);
			}
		}
	}