in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java [241:417]
public void updateSink(final String tenant,
final String namespace,
final String sinkName,
final InputStream uploadedInputStream,
final FormDataContentDisposition fileDetail,
final String sinkPkgUrl,
final SinkConfig sinkConfig,
final AuthenticationParameters authParams,
UpdateOptionsImpl updateOptions) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (tenant == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
}
if (namespace == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
}
if (sinkName == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
}
if (sinkConfig == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
}
throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sinkName, "update", authParams);
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
}
Function.FunctionMetaData existingComponent =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.NOT_FOUND,
String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
}
SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
// The rest end points take precedence over whatever is there in functionconfig
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
sinkConfig.setName(sinkName);
SinkConfig mergedConfig;
try {
mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
} catch (Exception e) {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
componentPackageFile = getPackageFile(
componentType,
sinkPkgUrl,
existingComponent.getPackageLocation().getPackagePath(),
uploadedInputStream);
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
mergedConfig, componentPackageFile);
if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
&& !isFunctionCodeBuiltin(functionDetails)
&& (componentPackageFile == null || fileDetail == null)) {
throw new IllegalArgumentException(
ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
namespace, sinkName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
}
// merge from existing metadata
Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
.setFunctionDetails(functionDetails);
// update auth data if need
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails finalFunctionDetails = functionDetails;
worker().getFunctionRuntimeManager()
.getRuntimeFactory()
.getAuthProvider().ifPresent(functionAuthProvider -> {
if (authParams.getClientAuthenticationDataSource() != null && updateOptions != null
&& updateOptions.isUpdateAuthData()) {
// get existing auth data if it exists
Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(
Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
}
try {
Optional<FunctionAuthData> newFunctionAuthData = functionAuthProvider
.updateAuthData(finalFunctionDetails, existingFunctionAuthData,
authParams.getClientAuthenticationDataSource());
if (newFunctionAuthData.isPresent()) {
functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
.build());
} else {
functionMetaDataBuilder.clearFunctionAuthSpec();
}
} catch (Exception e) {
log.error("Error updating authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
}
}
});
}
Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
Function.FunctionMetaData metaData = functionMetaDataBuilder.build();
metaData = FunctionMetaDataUtils.incrMetadataVersion(metaData, metaData);
try {
packageLocationMetaDataBuilder = getFunctionPackageLocation(metaData,
sinkPkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType),
tenant, namespace, sinkName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
} else {
packageLocationMetaDataBuilder =
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
String transformFunction = mergedConfig.getTransformFunction();
if (isNotBlank(transformFunction)
&& !transformFunction.equals(existingSinkConfig.getTransformFunction())) {
setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
}
updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((sinkPkgUrl != null && !sinkPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
componentPackageFile.delete();
}
}
}
}