in CloudWatchSink/src/main/java/com/amazonaws/services/kinesisanalytics/CustomSinkStreamingJob.java [102:130]
private static void setupLogGroupAndStream(String awsRegion, String logGroupName, String logStreamName) {
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
AWSLogs awsLogsClient = AWSLogsClientBuilder.standard().withCredentials(credentialsProvider).withRegion(awsRegion).build();
// Create log group if it does not exist
DescribeLogGroupsResult describeLogGroupsResult = awsLogsClient.describeLogGroups();
List<LogGroup> logGroups = describeLogGroupsResult.getLogGroups();
boolean logGroupExists = false;
for (LogGroup g : logGroups) {
if (g.getLogGroupName().equals(logGroupName)) {
logGroupExists = true;
}
}
if (!logGroupExists) {
awsLogsClient.createLogGroup(new CreateLogGroupRequest(logGroupName));
awsLogsClient.putRetentionPolicy(new PutRetentionPolicyRequest()
.withLogGroupName(logGroupName)
.withRetentionInDays(LOG_RETENTION_IN_DAYS));
}
// Create log stream if it does not exist
DescribeLogStreamsRequest describeLogStreamsRequest = new DescribeLogStreamsRequest()
.withLogGroupName(logGroupName).withLogStreamNamePrefix(logStreamName);
DescribeLogStreamsResult describeLogStreamsResult = awsLogsClient.describeLogStreams(describeLogStreamsRequest);
List<LogStream> logStreams = describeLogStreamsResult.getLogStreams();
if (logStreams.isEmpty()) {
awsLogsClient.createLogStream(new CreateLogStreamRequest(logGroupName, logStreamName));
}
}