in services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java [151:214]
public void limitedSubscription_callCompleteMethodOfSubs_whenLimitsReached() {
AtomicBoolean onCompleteSubsMethodsCalled = new AtomicBoolean(false);
AtomicBoolean completeMethodOfHandlerCalled = new AtomicBoolean(false);
AtomicBoolean errorOccurred = new AtomicBoolean(false);
List<SubscribeToShardEventStream> events = new ArrayList<>();
asyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(s -> s.type(ShardIteratorType.LATEST)),
new SubscribeToShardResponseHandler() {
@Override
public void responseReceived(SubscribeToShardResponse response) {
verifyHttpMetadata(response);
}
@Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
publisher.limit(3).subscribe(new Subscriber<SubscribeToShardEventStream>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(10);
}
@Override
public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
events.add(subscribeToShardEventStream);
}
@Override
public void onError(Throwable throwable) {
errorOccurred.set(true);
}
@Override
public void onComplete() {
onCompleteSubsMethodsCalled.set(true);
}
});
}
@Override
public void exceptionOccurred(Throwable throwable) {
errorOccurred.set(true);
}
@Override
public void complete() {
completeMethodOfHandlerCalled.set(true);
}
}).join();
try {
Thread.sleep(WAIT_TIME_FOR_SUBSCRIPTION_COMPLETION);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertThat(onCompleteSubsMethodsCalled).isTrue();
assertThat(completeMethodOfHandlerCalled).isFalse();
assertThat(errorOccurred).isFalse();
assertThat(events.size()).isEqualTo(3);
}