public void cancelledSubscription_doesNotCallCompleteMethodOfHandler()

in services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java [217:282]


    public void cancelledSubscription_doesNotCallCompleteMethodOfHandler() {
        AtomicBoolean onCompleteSubsMethodsCalled = new AtomicBoolean(false);
        AtomicBoolean completeMethodOfHandlerCalled = new AtomicBoolean(false);
        AtomicBoolean errorOccurred = new AtomicBoolean(false);
        List<SubscribeToShardEventStream> events = new ArrayList<>();
        asyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
                                           .shardId(shardId)
                                           .startingPosition(s -> s.type(ShardIteratorType.LATEST)),
                                     new SubscribeToShardResponseHandler() {
                                         @Override
                                         public void responseReceived(SubscribeToShardResponse response) {
                                             verifyHttpMetadata(response);
                                         }

                                         @Override
                                         public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
                                             publisher.limit(3).subscribe(new Subscriber<SubscribeToShardEventStream>() {
                                                 private Subscription subscription;
                                                 @Override
                                                 public void onSubscribe(Subscription subscription) {
                                                     this.subscription = subscription;
                                                     subscription.request(10);
                                                 }

                                                 @Override
                                                 public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
                                                     events.add(subscribeToShardEventStream);
                                                     //Cancel on first event.
                                                     subscription.cancel();
                                                 }

                                                 @Override
                                                 public void onError(Throwable throwable) {
                                                     errorOccurred.set(true);
                                                 }

                                                 @Override
                                                 public void onComplete() {
                                                     onCompleteSubsMethodsCalled.set(true);
                                                 }
                                             });
                                         }

                                         @Override
                                         public void exceptionOccurred(Throwable throwable) {
                                             errorOccurred.set(true);
                                         }

                                         @Override
                                         public void complete() {
                                             completeMethodOfHandlerCalled.set(true);
                                         }
                                     }).join();

        try {
            Thread.sleep(WAIT_TIME_FOR_SUBSCRIPTION_COMPLETION);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        assertThat(completeMethodOfHandlerCalled).isFalse();
        assertThat(onCompleteSubsMethodsCalled).isFalse();
        assertThat(errorOccurred).isFalse();
        assertThat(events.size()).isEqualTo(1);

    }