in deliverystream/src/main/java/com/amazonaws/kinesisfirehose/deliverystream/CreateHandler.java [72:122]
private ProgressEvent<ResourceModel, CallbackContext> createDeliveryStreamAndUpdateProgress(final FirehoseAPIWrapper firehoseAPIWrapper,
final ResourceModel model,
final CallbackContext callbackContext,
final Logger logger) {
val deliveryStreamStatus = callbackContext.getDeliveryStreamStatus();
if (callbackContext.getStabilizationRetriesRemaining() == 0) {
throw new RuntimeException(TIMED_OUT_MESSAGE);
}
if (deliveryStreamStatus == null) {
if (model.getTags() != null) {
logger.log(String.format("%d resource Tags would be applied for create on the delivery stream name %s", model.getTags().size(), model.getDeliveryStreamName()));
}
if (model.getDeliveryStreamEncryptionConfigurationInput() != null) {
logger.log(String.format("Delivery Stream Encryption would be enabled on the delivery stream name %s", model.getDeliveryStreamName()));
}
try {
return createDeliveryStream(firehoseAPIWrapper, model);
} catch (final Exception e) {
logger.log(String.format("createDeliveryStream failed with exception %s", e.getMessage()));
return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.CREATE));
}
} else {
// If for some reason during the stabilization phase, a call like getDeliveryStreamStatus fails, catch the exception, and
// retry stabilizing if more attempts are remaining.
String currentDeliveryStreamStatus = "";
try {
currentDeliveryStreamStatus = getDeliveryStreamStatus(firehoseAPIWrapper,model.getDeliveryStreamName());
} catch (final Exception e) {
logger.log(String.format("Error getting Delivery Stream Status. Exception %s", e.getMessage()));
}
if (currentDeliveryStreamStatus.equals(DeliveryStreamStatus.ACTIVE.toString())) {
return ProgressEvent.defaultSuccessHandler(model);
} else if (currentDeliveryStreamStatus.equals(DeliveryStreamStatus.CREATING_FAILED.toString())) {
// Creating an InvalidArgumentException instead of InvalidKMSException since that would be too specific of a cause
// for CREATING_FAILED status.
Exception exp = InvalidArgumentException.builder()
.message(String.format(CREATE_DELIVERY_STREAM_ERROR_MSG_FORMAT,currentDeliveryStreamStatus)).build();
return ProgressEvent.defaultFailureHandler(exp, ExceptionMapper.mapToHandlerErrorCode(exp, HandlerType.CREATE));
} else {
return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
.deliveryStreamStatus(currentDeliveryStreamStatus)
.stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
.build(),
(int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
model);
}
}
}