private synchronized void flush()

in firehose-sink/src/main/java/com/amazonaws/hbase/datasink/FirehoseDataSinkImpl.java [144:192]


	private synchronized void flush(String streamName) {
			long startTime = System.currentTimeMillis();
		
			ConcurrentLinkedQueue<Record> recordsList = (ConcurrentLinkedQueue<Record>)streamListMap.get(streamName);
			LinkedList<Record> tmpList = new LinkedList<Record>();
			PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
			putRecordBatchRequest.setDeliveryStreamName(streamName);
			for (int counter = 0; counter < 500; counter ++ ) { 
				try {
					tmpList.add(recordsList.element());
				} catch ( NoSuchElementException e ) {
					//Someone has emptied the buffer quicker than us.
					break;
				}
			}
			PutRecordBatchResult results = null;
			putRecordBatchRequest.setRecords(tmpList);
			try {
				results = firehose.putRecordBatch(putRecordBatchRequest);
			} catch ( ResourceNotFoundException e ) {
				LOG.error(e.getErrorMessage(),e.getCause(),e.getErrorCode());
				throw new RuntimeException(e);
			} catch ( InvalidArgumentException e) {
				LOG.error("Invalid record count > 500 or invalid record size");
				throw new RuntimeException(e);
			} catch (InvalidKMSResourceException e) {
				LOG.error("Invalid KMS Key");
				throw new RuntimeException(e);
			} catch ( ServiceUnavailableException e) {
				LOG.warn("Throttled ! retrying without delay.");
			}
			
			// We just remove the records that are successful from the list. The rest will be retried.
			int successCount = 0;
			for ( PutRecordBatchResponseEntry resultentry: results.getRequestResponses() ){
				Record item = tmpList.remove();
				if ( resultentry.getRecordId() != null) {
					successCount ++;
					recordsList.remove(item);
				} 
			}
			
			addFlushstats(streamName,System.currentTimeMillis());
			
			if (LOG.isDebugEnabled()) {
				LOG.debug("putRecordBatch success: " +successCount+ " out of results:" +results.getRequestResponses().size() + " PutRecordBatch took {} ms ", Long.toString(System.currentTimeMillis() - startTime));
			}
			
	}