public void start()

in src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java [75:117]


	public void start(Map<String, String> props) {

		batch = Boolean.parseBoolean(props.get(FirehoseSinkConnector.BATCH));
		
		batchSize = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE));
		
		batchSizeInBytes = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE_IN_BYTES));
		
		deliveryStreamName = props.get(FirehoseSinkConnector.DELIVERY_STREAM);

		roleARN = props.get(FirehoseSinkConnector.ROLE_ARN);

		roleSessionName = props.get(FirehoseSinkConnector.ROLE_SESSION_NAME);

		roleExternalID = props.get(FirehoseSinkConnector.ROLE_EXTERNAL_ID);

		roleDurationSeconds = Integer.parseInt(props.get(FirehoseSinkConnector.ROLE_DURATION_SECONDS));

		kinesisEndpoint = props.get(FirehoseSinkConnector.KINESIS_ENDPOINT);
		
		region = props.get(FirehoseSinkConnector.REGION);

		AmazonKinesisFirehoseClientBuilder builder = AmazonKinesisFirehoseClient.builder();
		if (!StringUtils.isNullOrEmpty(kinesisEndpoint)) {
			EndpointConfiguration endpointConfig = new EndpointConfiguration(kinesisEndpoint, region);
			builder.withEndpointConfiguration(endpointConfig );
		}else if (!StringUtils.isNullOrEmpty(region)) {
			// you cannot set region AND endpoint.
			builder.withRegion(region);
		}
		builder.withCredentials(IAMUtility.createCredentials(
						region, 
						roleARN, 
						roleExternalID, 
						roleSessionName, 
						roleDurationSeconds));
		
		firehoseClient = builder.build();
		

		// Validate delivery stream
		validateDeliveryStream();
	}