private ProgressEvent createDeliveryStreamAndUpdateProgress()

in deliverystream/src/main/java/com/amazonaws/kinesisfirehose/deliverystream/CreateHandler.java [72:122]


    private ProgressEvent<ResourceModel, CallbackContext> createDeliveryStreamAndUpdateProgress(final FirehoseAPIWrapper firehoseAPIWrapper,
                                                                                                final ResourceModel model,
                                                                                                final CallbackContext callbackContext,
                                                                                                final Logger logger) {
        val deliveryStreamStatus = callbackContext.getDeliveryStreamStatus();

        if (callbackContext.getStabilizationRetriesRemaining() == 0) {
            throw new RuntimeException(TIMED_OUT_MESSAGE);
        }

        if (deliveryStreamStatus == null) {
            if (model.getTags() != null) {
                logger.log(String.format("%d resource Tags would be applied for create on the delivery stream name %s", model.getTags().size(), model.getDeliveryStreamName()));
            }
            if (model.getDeliveryStreamEncryptionConfigurationInput() != null) {
                logger.log(String.format("Delivery Stream Encryption would be enabled on the delivery stream name %s", model.getDeliveryStreamName()));
            }
            try {
                return createDeliveryStream(firehoseAPIWrapper, model);
            } catch (final Exception e) {
                logger.log(String.format("createDeliveryStream failed with exception %s", e.getMessage()));
                return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.CREATE));
            }
        } else {
            // If for some reason during the stabilization phase, a call like getDeliveryStreamStatus fails, catch the exception, and
            // retry stabilizing if more attempts are remaining.
            String currentDeliveryStreamStatus = "";
            try {
                currentDeliveryStreamStatus = getDeliveryStreamStatus(firehoseAPIWrapper,model.getDeliveryStreamName());
            } catch (final Exception e) {
                logger.log(String.format("Error getting Delivery Stream Status. Exception %s", e.getMessage()));
            }

            if (currentDeliveryStreamStatus.equals(DeliveryStreamStatus.ACTIVE.toString())) {
                return ProgressEvent.defaultSuccessHandler(model);
            } else if (currentDeliveryStreamStatus.equals(DeliveryStreamStatus.CREATING_FAILED.toString())) {
                // Creating an InvalidArgumentException instead of InvalidKMSException since that would be too specific of a cause
                // for CREATING_FAILED status.
                Exception exp = InvalidArgumentException.builder()
                    .message(String.format(CREATE_DELIVERY_STREAM_ERROR_MSG_FORMAT,currentDeliveryStreamStatus)).build();
                return ProgressEvent.defaultFailureHandler(exp, ExceptionMapper.mapToHandlerErrorCode(exp, HandlerType.CREATE));
            } else {
                return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
                                .deliveryStreamStatus(currentDeliveryStreamStatus)
                                .stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
                                .build(),
                        (int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
                        model);
            }
        }
    }