public void updateSink()

in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java [241:417]


    public void updateSink(final String tenant,
                           final String namespace,
                           final String sinkName,
                           final InputStream uploadedInputStream,
                           final FormDataContentDisposition fileDetail,
                           final String sinkPkgUrl,
                           final SinkConfig sinkConfig,
                           final AuthenticationParameters authParams,
                           UpdateOptionsImpl updateOptions) {

        if (!isWorkerServiceAvailable()) {
            throwUnavailableException();
        }

        if (tenant == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (namespace == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (sinkName == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
        }
        if (sinkConfig == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
        }

        throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sinkName, "update", authParams);

        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();

        if (!functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
            throw new RestException(Response.Status.BAD_REQUEST,
                    String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
        }

        Function.FunctionMetaData existingComponent =
                functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);

        if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
            log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName, ComponentTypeUtils.toString(componentType));
            throw new RestException(Response.Status.NOT_FOUND,
                    String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
        }


        SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
        // The rest end points take precedence over whatever is there in functionconfig
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(sinkName);

        SinkConfig mergedConfig;
        try {
            mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
        } catch (Exception e) {
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }

        if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null
                && (updateOptions == null || !updateOptions.isUpdateAuthData())) {
            log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
        }

        Function.FunctionDetails functionDetails;
        File componentPackageFile = null;
        try {

            // validate parameters
            try {
                componentPackageFile = getPackageFile(
                        componentType,
                        sinkPkgUrl,
                        existingComponent.getPackageLocation().getPackagePath(),
                        uploadedInputStream);
                functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
                        mergedConfig, componentPackageFile);
                if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
                        && !isFunctionCodeBuiltin(functionDetails)
                        && (componentPackageFile == null || fileDetail == null)) {
                        throw new IllegalArgumentException(
                                ComponentTypeUtils.toString(componentType) + " Package is not provided");
                    }
            } catch (Exception e) {
                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
                        namespace, sinkName, e);
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }

            try {
                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
            } catch (Exception e) {
                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory",
                        ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
                        ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
            }

            // merge from existing metadata
            Function.FunctionMetaData.Builder functionMetaDataBuilder =
                    Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
                            .setFunctionDetails(functionDetails);

            // update auth data if need
            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
                Function.FunctionDetails finalFunctionDetails = functionDetails;
                worker().getFunctionRuntimeManager()
                        .getRuntimeFactory()
                        .getAuthProvider().ifPresent(functionAuthProvider -> {
                    if (authParams.getClientAuthenticationDataSource() != null && updateOptions != null
                            && updateOptions.isUpdateAuthData()) {
                        // get existing auth data if it exists
                        Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
                        if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
                            existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(
                                    Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
                        }

                        try {
                            Optional<FunctionAuthData> newFunctionAuthData = functionAuthProvider
                                    .updateAuthData(finalFunctionDetails, existingFunctionAuthData,
                                            authParams.getClientAuthenticationDataSource());

                            if (newFunctionAuthData.isPresent()) {
                                functionMetaDataBuilder.setFunctionAuthSpec(
                                        Function.FunctionAuthenticationSpec.newBuilder()
                                                .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
                                                .build());
                            } else {
                                functionMetaDataBuilder.clearFunctionAuthSpec();
                            }
                        } catch (Exception e) {
                            log.error("Error updating authentication data for {} {}/{}/{}",
                                    ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
                            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
                                    String.format("Error caching authentication data for %s %s:- %s",
                                            ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
                        }
                    }
                });
            }

            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
            if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
                Function.FunctionMetaData metaData = functionMetaDataBuilder.build();
                metaData = FunctionMetaDataUtils.incrMetadataVersion(metaData, metaData);
                try {
                    packageLocationMetaDataBuilder = getFunctionPackageLocation(metaData,
                            sinkPkgUrl, fileDetail, componentPackageFile);
                } catch (Exception e) {
                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType),
                            tenant, namespace, sinkName, e);
                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
                }
            } else {
                packageLocationMetaDataBuilder =
                        Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
            }

            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);

            String transformFunction = mergedConfig.getTransformFunction();
            if (isNotBlank(transformFunction)
                    && !transformFunction.equals(existingSinkConfig.getTransformFunction())) {
                setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
            }

            updateRequest(existingComponent, functionMetaDataBuilder.build());
        } finally {
            if (componentPackageFile != null && componentPackageFile.exists()) {
                if ((sinkPkgUrl != null && !sinkPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
                    componentPackageFile.delete();
                }
            }
        }
    }