in src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java [75:117]
public void start(Map<String, String> props) {
batch = Boolean.parseBoolean(props.get(FirehoseSinkConnector.BATCH));
batchSize = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE));
batchSizeInBytes = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE_IN_BYTES));
deliveryStreamName = props.get(FirehoseSinkConnector.DELIVERY_STREAM);
roleARN = props.get(FirehoseSinkConnector.ROLE_ARN);
roleSessionName = props.get(FirehoseSinkConnector.ROLE_SESSION_NAME);
roleExternalID = props.get(FirehoseSinkConnector.ROLE_EXTERNAL_ID);
roleDurationSeconds = Integer.parseInt(props.get(FirehoseSinkConnector.ROLE_DURATION_SECONDS));
kinesisEndpoint = props.get(FirehoseSinkConnector.KINESIS_ENDPOINT);
region = props.get(FirehoseSinkConnector.REGION);
AmazonKinesisFirehoseClientBuilder builder = AmazonKinesisFirehoseClient.builder();
if (!StringUtils.isNullOrEmpty(kinesisEndpoint)) {
EndpointConfiguration endpointConfig = new EndpointConfiguration(kinesisEndpoint, region);
builder.withEndpointConfiguration(endpointConfig );
}else if (!StringUtils.isNullOrEmpty(region)) {
// you cannot set region AND endpoint.
builder.withRegion(region);
}
builder.withCredentials(IAMUtility.createCredentials(
region,
roleARN,
roleExternalID,
roleSessionName,
roleDurationSeconds));
firehoseClient = builder.build();
// Validate delivery stream
validateDeliveryStream();
}