in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java [235:407]
public void updateFunction(final String tenant,
final String namespace,
final String functionName,
final InputStream uploadedInputStream,
final FormDataContentDisposition fileDetail,
final String functionPkgUrl,
final FunctionConfig functionConfig,
final AuthenticationParameters authParams,
UpdateOptionsImpl updateOptions) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (tenant == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
}
if (namespace == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
}
if (functionName == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Function name is not provided");
}
if (functionConfig == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Function config is not provided");
}
throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "update",
authParams);
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), functionName));
}
Function.FunctionMetaData existingComponent = functionMetaDataManager
.getFunctionMetaData(tenant, namespace, functionName);
if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
log.error("{}/{}/{} is not a {}", tenant, namespace, functionName,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), functionName));
}
FunctionConfig existingFunctionConfig = FunctionConfigUtils
.convertFromDetails(existingComponent.getFunctionDetails());
// The rest end points take precedence over whatever is there in function config
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
FunctionConfig mergedConfig;
try {
mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
} catch (Exception e) {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
componentPackageFile = getPackageFile(
componentType,
functionPkgUrl,
existingComponent.getPackageLocation().getPackagePath(),
uploadedInputStream);
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
mergedConfig, componentPackageFile);
if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
&& !isFunctionCodeBuiltin(functionDetails)
&& (componentPackageFile == null || fileDetail == null)) {
throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
+ " Package is not provided");
}
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType),
tenant, namespace, functionName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
}
// merge from existing metadata
Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
.setFunctionDetails(functionDetails);
// update auth data if need
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails finalFunctionDetails = functionDetails;
worker().getFunctionRuntimeManager()
.getRuntimeFactory()
.getAuthProvider().ifPresent(functionAuthProvider -> {
if (authParams.getClientAuthenticationDataSource() != null && updateOptions
!= null && updateOptions.isUpdateAuthData()) {
// get existing auth data if it exists
Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional
.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
}
try {
Optional<FunctionAuthData> newFunctionAuthData = functionAuthProvider
.updateAuthData(finalFunctionDetails, existingFunctionAuthData,
authParams.getClientAuthenticationDataSource());
if (newFunctionAuthData.isPresent()) {
functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(
newFunctionAuthData.get().getData())).build());
} else {
functionMetaDataBuilder.clearFunctionAuthSpec();
}
} catch (Exception e) {
log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils
.toString(componentType), tenant, namespace, functionName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), functionName,
e.getMessage()));
}
}
});
}
Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
Function.FunctionMetaData metaData = functionMetaDataBuilder.build();
metaData = FunctionMetaDataUtils.incrMetadataVersion(metaData, metaData);
try {
packageLocationMetaDataBuilder = getFunctionPackageLocation(metaData,
functionPkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType),
tenant, namespace, functionName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
} else {
packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder()
.mergeFrom(existingComponent.getPackageLocation());
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((functionPkgUrl != null && !functionPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
componentPackageFile.delete();
}
}
}
}