in src/main/java/com/amazonaws/kvstranscribestreaming/lambda/SendRunTaskRequestLambda.java [50:117]
public String handleRequest(SQSEvent event, Context context) {
try {
logger.info(LAMBDA_KEY_PREFIX + " received request : " + objectMapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
logger.error(LAMBDA_KEY_PREFIX + " Error happened where serializing the event", e);
}
logger.info(LAMBDA_KEY_PREFIX + " received context: " + context.toString());
try {
event.getRecords().forEach(msg -> {
logger.info("Received streaming message : " + msg.getBody());
});
if (event.getRecords().size() != 1) {
logger.error("Invalid number of records present in the SQS message body");
throw new RuntimeException("Invalid number of records");
}
SQSEvent.SQSMessage sqsMessage = event.getRecords().get(0);
logger.info("SQS message body: {} ", sqsMessage.getBody());
Map<String, Object> eventBodyMap = objectMapper.readValue(sqsMessage.getBody(), Map.class);
Map<String, String> eventDetail = (Map) eventBodyMap.get("detail");
String streamingStatus = eventDetail.get("streamingStatus");
String transactionId = eventDetail.get("transactionId");
logger.info("Received STARTED event");
if (StreamingStatus.STARTED.name().equals(streamingStatus)) {
final StreamingStatusStartedDetail startedDetail = objectMapper.convertValue(eventDetail,
StreamingStatusStartedDetail.class);
logger.info("[{}] Streaming status {} , EventDetail: {}", transactionId, streamingStatus, startedDetail);
List<String> commandOverride = Arrays.asList("-e", sqsMessage.getBody());
List<KeyValuePair> environmentOverride = Arrays.asList(
new KeyValuePair().withName("IS_TRANSCRIBE_ENABLED").withValue(IS_TRANSCRIBE_ENABLED),
new KeyValuePair().withName("RECORDINGS_BUCKET_NAME").withValue(RECORDINGS_BUCKET_NAME),
new KeyValuePair().withName("AWS_REGION").withValue(AWS_REGION.getName()),
new KeyValuePair().withName("WEBSOCKET_MAPPING_TABLE_NAME").withValue(WEBSOCKET_MAPPING_TABLE_NAME),
new KeyValuePair().withName("TRANSCRIBE_API_GATEWAY_APIID").withValue(TRANSCRIBE_API_GATEWAY_APIID),
new KeyValuePair().withName("TRANSCRIBE_API_GATEWAY_STAGE").withValue(TRANSCRIBE_API_GATEWAY_STAGE)
);
ContainerOverride containerOverride = new ContainerOverride().withName(CONTAINER_NAME).withCommand(commandOverride).withEnvironment(environmentOverride);
RunTaskResult result = ecsClient.runTask(new RunTaskRequest()
.withCluster(CLUSTER_NAME)
.withTaskDefinition(TASK_DEFINITION)
.withLaunchType(LaunchType.EC2)
.withOverrides(new TaskOverride().withContainerOverrides(containerOverride)));
if(result.getFailures().isEmpty()) {
logger.info("{} Sending RunTask request succeeded. ", LAMBDA_KEY_PREFIX);
return "{ \"result\": \"Success\" }";
} else {
List<Failure> failures = result.getFailures();
logger.error("{} Sending RunTask request failed", LAMBDA_KEY_PREFIX);
for(Failure failure : failures) {
logger.error("Failure detail: {}", failure);
}
return "{ \"result\": \"Failed\" }";
}
}
} catch (Exception e) {
logger.error("{} Sending RunTask request failed with: ", LAMBDA_KEY_PREFIX, e);
return "{ \"result\": \"Failed\" }";
}
return "{ \"result\": \"Success\" }";
}