private static boolean validateRuntimeProperties()

in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [156:210]


	private static boolean validateRuntimeProperties(ParameterTool paramTool) {

		boolean bucketExist = false;
		boolean propertiesValid = false;
		boolean initialTimestampNAOrValidIfPresent = false;

		try {
			log.info("Printing runtime Properties to CloudWatch");
			paramTool.toMap().forEach((key, value) -> log.info("parameter: " + key + ", value: " + value));
			bucketExist = SessionUtil.checkIfBucketExist(paramTool.get("region"), paramTool.get("s3_output_path"));
			long sessionTimeout = Long.parseLong(paramTool.get("session_time_out_in_minutes"));
			boolean streamExist = SessionUtil.checkIfStreamExist(paramTool.get("region"),
					paramTool.get("input_stream_name"));

			// Check if stream_init_position is valid
			boolean streamInitPositionValid = Arrays.stream(StreamPosition.values())
					.anyMatch((t) -> t.name().equals(paramTool.get("stream_init_position")));

			if (streamInitPositionValid) {
				if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
					if (Optional.ofNullable(paramTool.get("stream_initial_timestamp")).isPresent()) {
						if (SessionUtil.validateDate(paramTool.get("stream_initial_timestamp")))
							initialTimestampNAOrValidIfPresent = true;
					} else
						log.error(
								"stream_init_position is set to 'AT_TIMESTAMP' but 'stream_initial_timestamp' is not provided");
				} else
					initialTimestampNAOrValidIfPresent = true;
			}
			// Check if all conditions are met
			if (sessionTimeout != 0L && streamExist && bucketExist && streamInitPositionValid
					&& initialTimestampNAOrValidIfPresent) {
				propertiesValid = true;
				log.info("Runtime properties are valid.");
			} else {
				log.error("Runtime properties are not valid.");
				if (!streamExist)
					log.error(
							"The specified Kinesis stream: " + paramTool.get("input_stream_name") + "does not exist.");
				if (!bucketExist)
					log.error("The specified s3 bucket: " + paramTool.get("s3_output_path") + "does not exist.");
			}
		} catch (NumberFormatException e) {
			log.error("Value for property 'session_time_out_in_minutes' is invalid");
			e.printStackTrace();
		}
		if (propertiesValid) {
			log.info("KDA Flink Application will consume data from: " + paramTool.get("stream_init_position"));
			if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
				log.info("The 'STREAM_INITIAL_TIMESTAMP' is set to: " + paramTool.get("stream_initial_timestamp"));
			}

		}
		return propertiesValid;
	}