in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java [563:654]
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
synchronized (lockObject) {
if (subscriber != null) {
log.error(
"{}: A subscribe occurred while there was an active subscriber. Sending error to current subscriber",
streamAndShardId);
MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException();
//
// Notify current subscriber
//
subscriber.onError(multipleSubscriberException);
subscriber = null;
//
// Notify attempted subscriber
//
s.onError(multipleSubscriberException);
terminateExistingFlow();
return;
}
terminateExistingFlow();
subscriber = s;
try {
subscribeToShard(currentSequenceNumber);
} catch (Throwable t) {
errorOccurred(flow, t);
return;
}
if (flow == null) {
//
// Failed to subscribe to a flow
//
errorOccurred(flow, new IllegalStateException("SubscribeToShard failed"));
return;
}
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
streamAndShardId, n, lastSuccessfulRequestDetails);
return;
}
if (flow == null) {
//
// Flow has been terminated, so we can't make any requests on it anymore.
//
log.debug(
"{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.",
streamAndShardId);
errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
return;
}
long previous = availableQueueSpace;
availableQueueSpace += n;
if (previous <= 0) {
flow.request(1);
}
}
}
@Override
public void cancel() {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
streamAndShardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
streamAndShardId, lastSuccessfulRequestDetails);
}
subscriber = null;
if (flow != null) {
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
}
}
}
});
}
}