in lca-ai-stack/source/kvs_transcribe_streaming/src/main/java/com/amazonaws/kvstranscribestreaming/SQSProcessor.java [67:97]
public static void main(String ... args) {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
/* Print a status every 10 seconds */
ScheduledExecutorService statusThread = Executors.newSingleThreadScheduledExecutor();
statusThread.scheduleAtFixedRate(() -> {
String status = String.format("Active Threads: %d", threadPool.getActiveCount());
logger.info(status);
}, 1, 10, TimeUnit.SECONDS);
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
int active = 0;
while(true) {
active = threadPool.getActiveCount();
if (active == MAX_THREADS) {
Thread.yield();
continue;
}
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
.withWaitTimeSeconds(20)
.withMaxNumberOfMessages(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
threadPool.execute(new RecordingThread(message.getBody()));
sqs.deleteMessage(queueUrl, message.getReceiptHandle());
}
}
}