in deliverystream/src/main/java/com/amazonaws/kinesisfirehose/deliverystream/UpdateHandler.java [62:155]
private ProgressEvent<ResourceModel, CallbackContext> updateDeliveryStreamAndUpdateProgress(final FirehoseAPIWrapper firehoseAPIWrapper,
final ResourceModel model,
final ResourceModel previousModel,
final CallbackContext callbackContext,
final Logger logger,
final List<Tag> previousResourceAndStackTags,
final List<Tag> currentResourceAndStackTags) {
val deliveryStreamEncryptionStatus = callbackContext.getDeliveryStreamEncryptionStatus();
if (callbackContext.getStabilizationRetriesRemaining() == 0) {
throw new RuntimeException(TIMED_OUT_MESSAGE);
}
DescribeDeliveryStreamResponse describeDeliveryStreamResp;
try {
describeDeliveryStreamResp = firehoseAPIWrapper.describeDeliveryStream(model.getDeliveryStreamName());
} catch (ResourceNotFoundException e) {
logger.log(String.format("DescribeDeliveryStream failed with exception %s", e.getMessage()));
return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
} catch (final Exception e) {
logger.log(String.format("DescribeDeliveryStream failed with exception %s", e.getMessage()));
// In case describe fails(either on the first call or on the callbacks) we would set the
// previous values of callbackContext, return and mark handler status as in-progress for cfn to retry.
return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
.deliveryStreamStatus(callbackContext.getDeliveryStreamStatus())
.deliveryStreamEncryptionStatus(callbackContext.getDeliveryStreamEncryptionStatus())
.stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
.build(),
(int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
model);
}
// In case of callbacks.
if (deliveryStreamEncryptionStatus != null) {
val currentDSEncryptionStatus = describeDeliveryStreamResp.deliveryStreamDescription().deliveryStreamEncryptionConfiguration().statusAsString();
if (currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.ENABLED.toString())
|| currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.DISABLED.toString())) {
return ProgressEvent.defaultSuccessHandler(model);
}
else if (currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.ENABLING_FAILED.toString())
|| currentDSEncryptionStatus.equals(DeliveryStreamEncryptionStatus.DISABLING_FAILED.toString())) {
val errMsg = getErrorMessageFromEncryptionStatus(currentDSEncryptionStatus);
Exception exp = InvalidArgumentException.builder()
.message(errMsg).build();
return ProgressEvent.defaultFailureHandler(exp, ExceptionMapper.mapToHandlerErrorCode(exp, HandlerType.UPDATE));
} else {
return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
.deliveryStreamStatus(describeDeliveryStreamResp.deliveryStreamDescription().deliveryStreamStatusAsString())
.deliveryStreamEncryptionStatus(currentDSEncryptionStatus)
.stabilizationRetriesRemaining(callbackContext.getStabilizationRetriesRemaining() - 1)
.build(),
(int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
model);
}
}
try {
updateDestination(firehoseAPIWrapper, model, describeDeliveryStreamResp);
}catch (final Exception e) {
logger.log(String.format("UpdateDeliveryStream failed with exception %s", e.getMessage()));
return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
}
EncryptionAction encryptionAction = getEncryptionActionToPerform(
model, describeDeliveryStreamResp);
try {
updateEncryptionOnDeliveryStream(firehoseAPIWrapper,model, encryptionAction, logger);
}catch (final Exception e) {
logger.log(String.format("updateEncryptionOnDeliveryStream failed with exception %s", e.getMessage()));
return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e, HandlerType.UPDATE));
}
try {
updateTagsOnDeliveryStream(firehoseAPIWrapper, model, previousModel, logger, previousResourceAndStackTags, currentResourceAndStackTags);
} catch (final Exception e) {
logger.log(String
.format("updateTagsOnDeliveryStream failed with exception %s", e.getMessage()));
return ProgressEvent.defaultFailureHandler(e, ExceptionMapper.mapToHandlerErrorCode(e,HandlerType.UPDATE));
}
// If no encryption action was performed, mark this as success as per existing flow, no need to callback.
if (encryptionAction == EncryptionAction.DO_NOTHING) {
logger.log(String
.format("No Encryption action was performed. Marking the update handler as success."));
return ProgressEvent.defaultSuccessHandler(model);
}
// If the delivery stream encryption was either Started or stopped, it is supposed to have a status.
val describeResp = firehoseAPIWrapper.describeDeliveryStream(model.getDeliveryStreamName());
return ProgressEvent.defaultInProgressHandler(CallbackContext.builder()
.deliveryStreamStatus(describeResp.deliveryStreamDescription().deliveryStreamStatusAsString())
.deliveryStreamEncryptionStatus(describeResp.deliveryStreamDescription().deliveryStreamEncryptionConfiguration().statusAsString())
.stabilizationRetriesRemaining(NUMBER_OF_STATUS_POLL_RETRIES)
.build(),
(int) Duration.ofSeconds(CALLBACK_DELAY_IN_SECONDS).getSeconds(),
model);
}