public void subscribe()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java [563:654]


    public void subscribe(Subscriber<? super RecordsRetrieved> s) {
        synchronized (lockObject) {
            if (subscriber != null) {
                log.error(
                        "{}: A subscribe occurred while there was an active subscriber.  Sending error to current subscriber",
                        streamAndShardId);
                MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException();

                //
                // Notify current subscriber
                //
                subscriber.onError(multipleSubscriberException);
                subscriber = null;

                //
                // Notify attempted subscriber
                //
                s.onError(multipleSubscriberException);
                terminateExistingFlow();
                return;
            }
            terminateExistingFlow();

            subscriber = s;
            try {
                subscribeToShard(currentSequenceNumber);
            } catch (Throwable t) {
                errorOccurred(flow, t);
                return;
            }
            if (flow == null) {
                //
                // Failed to subscribe to a flow
                //
                errorOccurred(flow, new IllegalStateException("SubscribeToShard failed"));
                return;
            }
            subscriber.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    synchronized (lockObject) {
                        if (subscriber != s) {
                            log.warn(
                                    "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
                                    streamAndShardId, n, lastSuccessfulRequestDetails);
                            return;
                        }
                        if (flow == null) {
                            //
                            // Flow has been terminated, so we can't make any requests on it anymore.
                            //
                            log.debug(
                                    "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.",
                                    streamAndShardId);
                            errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
                            return;
                        }
                        long previous = availableQueueSpace;
                        availableQueueSpace += n;
                        if (previous <= 0) {
                            flow.request(1);
                        }
                    }
                }

                @Override
                public void cancel() {
                    synchronized (lockObject) {
                        if (subscriber != s) {
                            log.warn(
                                    "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
                                    streamAndShardId, lastSuccessfulRequestDetails);
                            return;
                        }
                        if (!hasValidSubscriber()) {
                            log.warn(
                                    "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
                                    streamAndShardId, lastSuccessfulRequestDetails);
                        }
                        subscriber = null;
                        if (flow != null) {
                            log.debug(
                                    "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
                                    streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId);
                            flow.cancel();
                            availableQueueSpace = 0;
                        }
                    }
                }
            });
        }
    }