in sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java [86:299]
public Mono<Void> run(CancellationToken cancellationToken) {
logger.info("Lease with token {}: processing task started with owner {}.",
this.lease.getLeaseToken(), this.lease.getOwner());
this.hasMoreResults = true;
this.checkpointer.setCancellationToken(cancellationToken);
return Flux.just(this)
.flatMap(value -> {
if (cancellationToken.isCancellationRequested()) {
return Flux.empty();
}
// If there are still changes need to be processed, fetch right away
// If there are no changes, wait pollDelay time then try again
if(this.hasMoreResults && this.resultException == null) {
return Flux.just(value);
}
Instant stopTimer = Instant.now().plus(this.settings.getFeedPollDelay());
return Mono.just(value)
.delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL)
.repeat( () -> {
Instant currentTime = Instant.now();
return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
}).last();
})
.flatMap(value -> this.tryGetThroughputControlConfigForFeedRange(this.lease))
.flatMap(configValueHolder -> {
if (configValueHolder.v != null) {
this.options.setThroughputControlGroupName(configValueHolder.v.getGroupName());
}
return this.documentClient.createDocumentChangeFeedQuery(
this.settings.getCollectionSelfLink(),
this.options,
itemType);
})
.flatMap(documentFeedResponse -> {
if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());
final String continuationToken = documentFeedResponse.getContinuationToken();
final ChangeFeedState continuationState = ChangeFeedState.fromString(continuationToken);
checkNotNull(continuationState, "Argument 'continuationState' must not be null.");
checkArgument(
continuationState
.getContinuation()
.getContinuationTokenCount() == 1,
"For ChangeFeedProcessor the continuation state should always have one range/continuation");
this.lastServerContinuationToken = continuationToken;
this.hasMoreResults = !ModelBridgeInternal.noChanges(documentFeedResponse);
if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) {
logger.info("Lease with token {}: processing {} feeds with owner {}.",
this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner());
return this.dispatchChanges(documentFeedResponse, continuationState)
.doOnError(throwable -> logger.debug(
"Lease with token {}: Exception was thrown from thread {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
throwable))
.doOnSuccess((Void) -> {
this.options = PartitionProcessorHelper.createForProcessingFromContinuation(continuationToken, this.changeFeedMode);
if (cancellationToken.isCancellationRequested()) throw new TaskCancelledException();
});
} else {
// still need to checkpoint with the new continuation token
return this.checkpointer.checkpointPartition(continuationState)
.doOnError(throwable -> {
logger.debug(
"Failed to checkpoint Lease with token {} from thread {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
throwable);
})
.flatMap(lease -> {
this.options = PartitionProcessorHelper.createForProcessingFromContinuation(continuationToken, this.changeFeedMode);
if (cancellationToken.isCancellationRequested()) {
return Mono.error(new TaskCancelledException());
}
return Mono.empty();
});
}
})
.doOnComplete(() -> {
if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) {
this.options.setMaxItemCount(this.settings.getMaxItemCount()); // Reset after successful execution.
}
})
.onErrorResume(throwable -> {
if (throwable instanceof CosmosException) {
// NOTE - the reason why it is safe to access the this.lastServerContinuationToken
// below in a tread-safe manner is because the CosmosException would never be thrown
// form the flatMap-section above (but only from the "source" (the flatMap-section
// calling createDocumentChangeFeedQuery - so if we ever land in this if-block
// we know it is a terminal event.
CosmosException clientException = (CosmosException) throwable;
logger.warn(
"Lease with token {}: CosmosException was thrown from thread {} for lease with owner {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
this.lease.getOwner(),
clientException);
StatusCodeErrorType docDbError =
ExceptionClassifier.classifyClientException(clientException);
switch (docDbError) {
case PARTITION_NOT_FOUND: {
this.resultException = new PartitionNotFoundException(
"Partition not found.",
this.lastServerContinuationToken);
}
break;
case PARTITION_SPLIT_OR_MERGE: {
this.resultException = new FeedRangeGoneException(
"Partition split or merge.",
this.lastServerContinuationToken);
}
break;
case UNDEFINED: {
this.resultException = new RuntimeException(clientException);
}
break;
case MAX_ITEM_COUNT_TOO_LARGE: {
if (this.options.getMaxItemCount() <= 1) {
logger.error(
"Cannot reduce maxItemCount further as it's already at {}",
this.options.getMaxItemCount(),
clientException);
this.resultException = new RuntimeException(clientException);
}
this.options.setMaxItemCount(this.options.getMaxItemCount() / 2);
logger.warn("Reducing maxItemCount, new value: {}", this.options.getMaxItemCount());
return Flux.empty();
}
case TRANSIENT_ERROR: {
// Retry on transient (429) errors
if (clientException.getRetryAfterDuration().toMillis() > 0) {
Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS);
return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run
// the repeat loop
.delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL)
.repeat(() -> {
Instant currentTime = Instant.now();
return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
}).flatMap(values -> Flux.empty());
}
}
break;
default: {
logger.error(
"Lease with token {}: Unrecognized Cosmos exception returned error code {}",
this.lease.getLeaseToken(),
docDbError,
clientException);
this.resultException = new RuntimeException(clientException);
}
}
} else if (throwable instanceof LeaseLostException) {
logger.info(
"Lease with token {}: LeaseLoseException was thrown from thread {} for lease with owner {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
this.lease.getOwner());
this.resultException = (LeaseLostException) throwable;
} else if (throwable instanceof TaskCancelledException) {
logger.debug(
"Lease with token {}: Task cancelled exception was thrown from thread {} for lease with owner {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
this.lease.getOwner(),
throwable);
this.resultException = (TaskCancelledException) throwable;
} else {
logger.warn(
"Lease with token {}: Unexpected exception was thrown from thread {} for lease with owner {}",
this.lease.getLeaseToken(),
Thread.currentThread().getId(),
this.lease.getOwner(),
throwable);
this.resultException = new RuntimeException(throwable);
}
return Flux.error(throwable);
})
.repeat(() -> {
if (cancellationToken.isCancellationRequested()) {
this.resultException = new TaskCancelledException();
return false;
}
return true;
})
.onErrorResume(throwable -> {
if (this.resultException == null) {
this.resultException = new RuntimeException(throwable);
}
return Flux.empty();
})
.then()
.doFinally( any -> {
logger.info(
"Lease with token {}: processing task exited with owner {}.",
this.lease.getLeaseToken(),
this.lease.getOwner());
});
}