protected AllocatedPersistentTask createTask()

in x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java [150:560]


    protected AllocatedPersistentTask createTask(
        long id,
        String type,
        String action,
        TaskId parentTaskId,
        PersistentTasksCustomMetadata.PersistentTask<ShardFollowTask> taskInProgress,
        Map<String, String> headers
    ) {
        ShardFollowTask params = taskInProgress.getParams();
        Client followerClient = wrapClient(client, params.getHeaders(), clusterService.state());
        BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> threadPool.scheduleUnlessShuttingDown(delay, ccrExecutor, command);

        final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params);
        return new ShardFollowNodeTask(
            id,
            type,
            action,
            getDescription(taskInProgress),
            parentTaskId,
            headers,
            params,
            scheduler,
            System::nanoTime
        ) {

            @Override
            protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
                final Index followerIndex = params.getFollowShardId().getIndex();
                final Index leaderIndex = params.getLeaderShardId().getIndex();
                final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
                final ActionListener<IndexMetadata> listener = ActionListener.wrap(indexMetadata -> {
                    if (indexMetadata.mapping() == null) {
                        assert indexMetadata.getMappingVersion() == 1;
                        handler.accept(indexMetadata.getMappingVersion());
                        return;
                    }
                    MappingMetadata mappingMetadata = indexMetadata.mapping();
                    PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetadata);
                    followerClient.admin()
                        .indices()
                        .putMapping(
                            putMappingRequest,
                            ActionListener.wrap(putMappingResponse -> handler.accept(indexMetadata.getMappingVersion()), errorHandler)
                        );
                }, errorHandler);
                try {
                    CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, listener);
                } catch (NoSuchRemoteClusterException e) {
                    errorHandler.accept(e);
                }
            }

            @Override
            protected void innerUpdateSettings(final LongConsumer finalHandler, final Consumer<Exception> errorHandler) {
                final Index leaderIndex = params.getLeaderShardId().getIndex();
                final Index followIndex = params.getFollowShardId().getIndex();

                CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
                    final IndexMetadata leaderIMD = clusterStateResponse.getState().metadata().getProject().getIndexSafe(leaderIndex);
                    final IndexMetadata followerIMD = clusterService.state().metadata().getProject().getIndexSafe(followIndex);

                    final Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings());
                    final Settings settings = TransportResumeFollowAction.filter(leaderIMD.getSettings());
                    if (existingSettings.equals(settings)) {
                        // If no settings have been changed then just propagate settings version to shard follow node task:
                        finalHandler.accept(leaderIMD.getSettingsVersion());
                    } else {
                        // Figure out which settings have been updated:
                        final Settings updatedSettings = settings.filter(s -> {
                            final Setting<?> indexSettings = indexScopedSettings.get(s);
                            if (indexSettings == null || indexSettings.isPrivateIndex() || indexSettings.isInternalIndex()) {
                                return false;
                            }
                            return existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false;
                        });
                        if (updatedSettings.isEmpty()) {
                            finalHandler.accept(leaderIMD.getSettingsVersion());
                            return;
                        }
                        // Figure out whether the updated settings are all dynamic settings and
                        // if so just update the follower index's settings:
                        if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) {
                            // If only dynamic settings have been updated then just update these settings in follower index:
                            final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName())
                                .masterNodeTimeout(TimeValue.MAX_VALUE)
                                .settings(updatedSettings);
                            followerClient.admin()
                                .indices()
                                .updateSettings(
                                    updateSettingsRequest,
                                    ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler)
                                );
                        } else {
                            // If one or more setting are not dynamic then close follow index, update leader settings and
                            // then open leader index:
                            Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion());
                            closeIndexUpdateSettingsAndOpenIndex(followIndex.getName(), updatedSettings, handler, errorHandler);
                        }
                    }
                };
                try {
                    remoteClient(params).execute(
                        ClusterStateAction.REMOTE_TYPE,
                        CcrRequests.metadataRequest(leaderIndex.getName()),
                        ActionListener.wrap(onResponse, errorHandler)
                    );
                } catch (NoSuchRemoteClusterException e) {
                    errorHandler.accept(e);
                }
            }

            @Override
            protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exception> errorHandler) {
                /*
                 * The strategy for updating the aliases is fairly simple. We look at the aliases that exist on the leader, and those that
                 * exist on the follower. We partition these aliases into three sets: the aliases that exist on both the leader and the
                 * follower, the aliases that are on the leader only, and the aliases that are on the follower only.
                 *
                 * For the aliases that are on the leader and the follower, we compare the aliases and add an action to overwrite the
                 * follower view of the alias if the aliases are different. If the aliases are the same, we skip the alias. Note that the
                 * meaning of equals here intentionally ignores the write index. There are two reasons for this. First, follower indices
                 * do not receive direct writes so conceptually the write index is not useful. Additionally, there is a larger challenge.
                 * Suppose that we did copy over the write index from the leader to the follower. On the leader, when the write index is
                 * swapped from one index to another, this is done atomically. However, to do this on the follower, we would have to step
                 * outside the shard follow tasks framework and have a separate framework for copying aliases over. This is because if we
                 * try to manage the aliases including the write aliases with the shard follow tasks, we do not have a way to move the write
                 * index atomically (since we have a single-index view here only) and therefore we can end up in situations where we would
                 * try to assign the write index to two indices. Further, trying to do this outside the shard follow tasks framework has
                 * problems too, since it could be that the new aliases arrive on the coordinator before the write index has even been
                 * created on the local cluster. So there are race conditions either way. All of this put together means that we will simply
                 * ignore the write index.
                 *
                 * For aliases that are on the leader but not the follower, we copy those aliases over to the follower.
                 *
                 * For aliases that are on the follower but not the leader, we remove those aliases from the follower.
                 */
                final var leaderIndex = params.getLeaderShardId().getIndex();
                final var followerIndex = params.getFollowShardId().getIndex();

                final CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
                    final var leaderIndexMetadata = clusterStateResponse.getState().metadata().getProject().getIndexSafe(leaderIndex);
                    final var followerIndexMetadata = clusterService.state().metadata().getProject().getIndexSafe(followerIndex);

                    // partition the aliases into the three sets
                    final var aliasesOnLeaderNotOnFollower = new HashSet<String>();
                    final var aliasesInCommon = new HashSet<String>();
                    final var aliasesOnFollowerNotOnLeader = new HashSet<String>();

                    for (final var aliasName : leaderIndexMetadata.getAliases().keySet()) {
                        if (followerIndexMetadata.getAliases().containsKey(aliasName)) {
                            aliasesInCommon.add(aliasName);
                        } else {
                            aliasesOnLeaderNotOnFollower.add(aliasName);
                        }
                    }

                    for (final var aliasName : followerIndexMetadata.getAliases().keySet()) {
                        if (leaderIndexMetadata.getAliases().containsKey(aliasName)) {
                            assert aliasesInCommon.contains(aliasName) : aliasName;
                        } else {
                            aliasesOnFollowerNotOnLeader.add(aliasName);
                        }
                    }

                    final var aliasActions = new ArrayList<IndicesAliasesRequest.AliasActions>();

                    // add the aliases the follower does not have
                    for (final var aliasName : aliasesOnLeaderNotOnFollower) {
                        final var alias = leaderIndexMetadata.getAliases().get(aliasName);
                        // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
                        aliasActions.add(
                            IndicesAliasesRequest.AliasActions.add()
                                .index(followerIndex.getName())
                                .alias(alias.alias())
                                .filter(alias.filter() == null ? null : alias.filter().toString())
                                .indexRouting(alias.indexRouting())
                                .searchRouting(alias.searchRouting())
                                .writeIndex(false)
                        );
                    }

                    // update the aliases that are different (ignoring write aliases)
                    for (final var aliasName : aliasesInCommon) {
                        final var leaderAliasMetadata = leaderIndexMetadata.getAliases().get(aliasName);
                        // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
                        final var leaderAliasMetadataWithoutWriteIndex = new AliasMetadata.Builder(aliasName).filter(
                            leaderAliasMetadata.filter()
                        )
                            .indexRouting(leaderAliasMetadata.indexRouting())
                            .searchRouting(leaderAliasMetadata.searchRouting())
                            .writeIndex(false)
                            .build();
                        final var followerAliasMetadata = followerIndexMetadata.getAliases().get(aliasName);
                        if (leaderAliasMetadataWithoutWriteIndex.equals(followerAliasMetadata)) {
                            // skip this alias, the leader and follower have the same modulo the write index
                            continue;
                        }
                        // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
                        aliasActions.add(
                            IndicesAliasesRequest.AliasActions.add()
                                .index(followerIndex.getName())
                                .alias(leaderAliasMetadata.alias())
                                .filter(leaderAliasMetadata.filter() == null ? null : leaderAliasMetadata.filter().toString())
                                .indexRouting(leaderAliasMetadata.indexRouting())
                                .searchRouting(leaderAliasMetadata.searchRouting())
                                .writeIndex(false)
                        );
                    }

                    // remove aliases that the leader no longer has
                    for (final var aliasName : aliasesOnFollowerNotOnLeader) {
                        aliasActions.add(IndicesAliasesRequest.AliasActions.remove().index(followerIndex.getName()).alias(aliasName));
                    }

                    if (aliasActions.isEmpty()) {
                        handler.accept(leaderIndexMetadata.getAliasesVersion());
                    } else {
                        final var request = new IndicesAliasesRequest(TimeValue.MAX_VALUE, TimeValue.ZERO);
                        request.origin("ccr");
                        aliasActions.forEach(request::addAliasAction);
                        followerClient.admin()
                            .indices()
                            .aliases(
                                request,
                                ActionListener.wrap(r -> handler.accept(leaderIndexMetadata.getAliasesVersion()), errorHandler)
                            );
                    }
                };

                try {
                    remoteClient(params).execute(
                        ClusterStateAction.REMOTE_TYPE,
                        CcrRequests.metadataRequest(leaderIndex.getName()),
                        ActionListener.wrap(onResponse, errorHandler)
                    );
                } catch (final NoSuchRemoteClusterException e) {
                    errorHandler.accept(e);
                }
            }

            private void closeIndexUpdateSettingsAndOpenIndex(
                String followIndex,
                Settings updatedSettings,
                Runnable handler,
                Consumer<Exception> onFailure
            ) {
                CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
                CheckedConsumer<CloseIndexResponse, Exception> onResponse = response -> {
                    updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure);
                };
                followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure));
            }

            private void updateSettingsAndOpenIndex(
                String followIndex,
                Settings updatedSettings,
                Runnable handler,
                Consumer<Exception> onFailure
            ) {
                final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex).masterNodeTimeout(
                    TimeValue.MAX_VALUE
                );
                updateSettingsRequest.settings(updatedSettings);
                CheckedConsumer<AcknowledgedResponse, Exception> onResponse = response -> openIndex(followIndex, handler, onFailure);
                followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure));
            }

            private void openIndex(String followIndex, Runnable handler, Consumer<Exception> onFailure) {
                OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
                CheckedConsumer<OpenIndexResponse, Exception> onResponse = response -> handler.run();
                followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure));
            }

            @Override
            protected void innerSendBulkShardOperationsRequest(
                final String followerHistoryUUID,
                final List<Translog.Operation> operations,
                final long maxSeqNoOfUpdatesOrDeletes,
                final Consumer<BulkShardOperationsResponse> handler,
                final Consumer<Exception> errorHandler
            ) {

                final BulkShardOperationsRequest request = new BulkShardOperationsRequest(
                    params.getFollowShardId(),
                    followerHistoryUUID,
                    operations,
                    maxSeqNoOfUpdatesOrDeletes
                );
                followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
            }

            @Override
            protected void innerSendShardChangesRequest(
                long from,
                int maxOperationCount,
                Consumer<ShardChangesAction.Response> handler,
                Consumer<Exception> errorHandler
            ) {
                ShardChangesAction.Request request = new ShardChangesAction.Request(
                    params.getLeaderShardId(),
                    recordedLeaderShardHistoryUUID
                );
                request.setFromSeqNo(from);
                request.setMaxOperationCount(maxOperationCount);
                request.setMaxBatchSize(params.getMaxReadRequestSize());
                request.setPollTimeout(params.getReadPollTimeout());
                try {
                    remoteClient(params).execute(
                        ShardChangesAction.REMOTE_TYPE,
                        request,
                        ActionListener.wrap(handler::accept, errorHandler)
                    );
                } catch (NoSuchRemoteClusterException e) {
                    errorHandler.accept(e);
                }
            }

            @Override
            protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
                final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
                    clusterService.getClusterName().value(),
                    params.getFollowShardId().getIndex(),
                    params.getRemoteCluster(),
                    params.getLeaderShardId().getIndex()
                );

                /*
                 * We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not
                 * exist, or something else happened. If the retention lease does not exist, we will attempt to add the retention lease
                 * again. If that fails, it had better not be because the retention lease already exists. Either way, we will attempt to
                 * renew again on the next scheduled execution.
                 */
                final ActionListener<ActionResponse.Empty> listener = ActionListener.wrap(r -> {}, e -> {
                    /*
                     * We have to guard against the possibility that the shard follow node task has been stopped and the retention
                     * lease deliberately removed via the act of unfollowing. Note that the order of operations is important in
                     * TransportUnfollowAction. There, we first stop the shard follow node task, and then remove the retention
                     * leases on the leader. This means that if we end up here with the retention lease not existing because of an
                     * unfollow action, then we know that the unfollow action has already stopped the shard follow node task and
                     * there is no race condition with the unfollow action.
                     */
                    if (isCancelled() || isCompleted()) {
                        return;
                    }
                    final Throwable cause = ExceptionsHelper.unwrapCause(e);
                    logRetentionLeaseFailure(retentionLeaseId, cause);
                    // noinspection StatementWithEmptyBody
                    if (cause instanceof RetentionLeaseNotFoundException) {
                        // note that we do not need to mark as system context here as that is restored from the original renew
                        logger.trace(
                            "{} background adding retention lease [{}] while following",
                            params.getFollowShardId(),
                            retentionLeaseId
                        );
                        try {
                            final ActionListener<ActionResponse.Empty> wrappedListener = ActionListener.wrap(r -> {}, inner -> {
                                /*
                                 * If this fails that the retention lease already exists, something highly unusual is
                                 * going on. Log it, and renew again after another renew interval has passed.
                                 */
                                final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
                                logRetentionLeaseFailure(retentionLeaseId, innerCause);
                            });
                            CcrRetentionLeases.asyncAddRetentionLease(
                                params.getLeaderShardId(),
                                retentionLeaseId,
                                followerGlobalCheckpoint.getAsLong() + 1,
                                remoteClient(params),
                                wrappedListener
                            );
                        } catch (NoSuchRemoteClusterException rce) {
                            // we will attempt to renew again after another renew interval has passed
                            logRetentionLeaseFailure(retentionLeaseId, rce);
                        }
                    } else {
                        // if something else happened, we will attempt to renew again after another renew interval has passed
                    }
                });

                return threadPool.scheduleWithFixedDelay(() -> {
                    logger.trace(
                        "{} background renewing retention lease [{}] while following",
                        params.getFollowShardId(),
                        retentionLeaseId
                    );
                    CcrRetentionLeases.asyncRenewRetentionLease(
                        params.getLeaderShardId(),
                        retentionLeaseId,
                        followerGlobalCheckpoint.getAsLong() + 1,
                        remoteClient(params),
                        listener
                    );
                }, retentionLeaseRenewInterval, ccrExecutor);
            }

            private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
                assert cause instanceof ElasticsearchSecurityException == false : cause;
                if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) {
                    logger.warn(
                        () -> format(
                            "%s background management of retention lease [%s] failed while following",
                            params.getFollowShardId(),
                            retentionLeaseId
                        ),
                        cause
                    );
                }
            }

        };
    }