in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [156:210]
private static boolean validateRuntimeProperties(ParameterTool paramTool) {
boolean bucketExist = false;
boolean propertiesValid = false;
boolean initialTimestampNAOrValidIfPresent = false;
try {
log.info("Printing runtime Properties to CloudWatch");
paramTool.toMap().forEach((key, value) -> log.info("parameter: " + key + ", value: " + value));
bucketExist = SessionUtil.checkIfBucketExist(paramTool.get("region"), paramTool.get("s3_output_path"));
long sessionTimeout = Long.parseLong(paramTool.get("session_time_out_in_minutes"));
boolean streamExist = SessionUtil.checkIfStreamExist(paramTool.get("region"),
paramTool.get("input_stream_name"));
// Check if stream_init_position is valid
boolean streamInitPositionValid = Arrays.stream(StreamPosition.values())
.anyMatch((t) -> t.name().equals(paramTool.get("stream_init_position")));
if (streamInitPositionValid) {
if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
if (Optional.ofNullable(paramTool.get("stream_initial_timestamp")).isPresent()) {
if (SessionUtil.validateDate(paramTool.get("stream_initial_timestamp")))
initialTimestampNAOrValidIfPresent = true;
} else
log.error(
"stream_init_position is set to 'AT_TIMESTAMP' but 'stream_initial_timestamp' is not provided");
} else
initialTimestampNAOrValidIfPresent = true;
}
// Check if all conditions are met
if (sessionTimeout != 0L && streamExist && bucketExist && streamInitPositionValid
&& initialTimestampNAOrValidIfPresent) {
propertiesValid = true;
log.info("Runtime properties are valid.");
} else {
log.error("Runtime properties are not valid.");
if (!streamExist)
log.error(
"The specified Kinesis stream: " + paramTool.get("input_stream_name") + "does not exist.");
if (!bucketExist)
log.error("The specified s3 bucket: " + paramTool.get("s3_output_path") + "does not exist.");
}
} catch (NumberFormatException e) {
log.error("Value for property 'session_time_out_in_minutes' is invalid");
e.printStackTrace();
}
if (propertiesValid) {
log.info("KDA Flink Application will consume data from: " + paramTool.get("stream_init_position"));
if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
log.info("The 'STREAM_INITIAL_TIMESTAMP' is set to: " + paramTool.get("stream_initial_timestamp"));
}
}
return propertiesValid;
}