private ProgressEvent updateDeliveryStreamAndUpdateProgress()

in deliverystream/src/main/java/com/amazonaws/kinesisfirehose/deliverystream/UpdateHandler.java [62:155]


    private ProgressEvent<ResourceModel, CallbackContext> updateDeliveryStreamAndUpdateProgress(final FirehoseAPIWrapper firehoseAPIWrapper,
                                                                                                final ResourceModel model,
                                                                                                final ResourceModel previousModel,
                                                                                                final CallbackContext callbackContext,
                                                                                                final Logger logger,
                                                                                                final List<Tag> previousResourceAndStackTags,
                                                                                                final List<Tag> currentResourceAndStackTags) {
        val deliveryStreamEncryptionStatus = callbackContext.getDeliveryStreamEncryptionStatus();
        if (callbackContext.getStabilizationRetriesRemaining() == 0) {
            throw new RuntimeException(TIMED_OUT_MESSAGE);
        }
        DescribeDeliveryStreamResponse describeDeliveryStreamResp;
        try {
            describeDeliveryStreamResp = firehoseAPIWrapper.describeDeliveryStream(model.getDeliveryStreamName());
        } catch (ResourceNotFoundException e) {
            logger.log(String.format("DescribeDeliveryStream failed with exception %s", e.getMessage()));
            return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
        } catch (final Exception e) {
            logger.log(String.format("DescribeDeliveryStream failed with exception %s", e.getMessage()));
            // In case describe fails(either on the first call or on the callbacks) we would set the
            // previous values of callbackContext, return and mark handler status as in-progress for cfn to retry.
            return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
                    .deliveryStreamStatus(callbackContext.getDeliveryStreamStatus())
                    .deliveryStreamEncryptionStatus(callbackContext.getDeliveryStreamEncryptionStatus())
                    .stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
                    .build(),
                (int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
                model);
        }

        // In case of callbacks.
        if (deliveryStreamEncryptionStatus != null) {
            val currentDSEncryptionStatus = describeDeliveryStreamResp.deliveryStreamDescription().deliveryStreamEncryptionConfiguration().statusAsString();
            if (currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.ENABLED.toString())
                || currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.DISABLED.toString())) {
                return ProgressEvent.defaultSuccessHandler(model);
            }
            else if (currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.ENABLING_FAILED.toString())
                || currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.DISABLING_FAILED.toString())) {
                val errMsg = getErrorMessageFromEncryptionStatus(currentDSEncryptionStatus);
                Exception exp = InvalidArgumentException.builder()
                    .message(errMsg).build();
                return ProgressEvent.defaultFailureHandler(exp, ExceptionMapper.mapToHandlerErrorCode(exp, HandlerType.UPDATE));
            } else {
                return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
                        .deliveryStreamStatus(describeDeliveryStreamResp.deliveryStreamDescription().deliveryStreamStatusAsString())
                        .deliveryStreamEncryptionStatus(currentDSEncryptionStatus)
                        .stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
                        .build(),
                    (int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
                    model);
            }
        }

        try {
            updateDestination(firehoseAPIWrapper, model, describeDeliveryStreamResp);
        }catch (final Exception e) {
            logger.log(String.format("UpdateDeliveryStream failed with exception %s", e.getMessage()));
            return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
        }

        EncryptionAction encryptionAction = getEncryptionActionToPerform(
            model, describeDeliveryStreamResp);
        try {
            updateEncryptionOnDeliveryStream(firehoseAPIWrapper,model, encryptionAction, logger);
        }catch (final Exception e) {
            logger.log(String.format("updateEncryptionOnDeliveryStream failed with exception %s", e.getMessage()));
            return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
        }

        try {
            updateTagsOnDeliveryStream(firehoseAPIWrapper, model, previousModel, logger, previousResourceAndStackTags, currentResourceAndStackTags);
        } catch (final Exception e) {
            logger.log(String
                .format("updateTagsOnDeliveryStream failed with exception %s", e.getMessage()));
            return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e,HandlerType.UPDATE));
        }

        // If no encryption action was performed, mark this as success as per existing flow, no need to callback.
        if (encryptionAction == EncryptionAction.DO_NOTHING) {
            logger.log(String
                .format("No Encryption action was performed. Marking the update handler as success."));
            return ProgressEvent.defaultSuccessHandler(model);
        }
        // If the delivery stream encryption was either Started or stopped, it is supposed to have a status.
        val describeResp = firehoseAPIWrapper.describeDeliveryStream(model.getDeliveryStreamName());
        return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
                .deliveryStreamStatus(describeResp.deliveryStreamDescription().deliveryStreamStatusAsString())
                .deliveryStreamEncryptionStatus(describeResp.deliveryStreamDescription().deliveryStreamEncryptionConfiguration().statusAsString())
                .stabilizationRetriesRemaining(NUMBER_OF_STATUS_POLL_RETRIES)
                .build(),
            (int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
            model);
    }