in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java [76:238]
public void registerSink(final String tenant,
final String namespace,
final String sinkName,
final InputStream uploadedInputStream,
final FormDataContentDisposition fileDetail,
final String sinkPkgUrl,
final SinkConfig sinkConfig,
final AuthenticationParameters authParams) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (tenant == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
}
if (namespace == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
}
if (sinkName == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
}
if (sinkConfig == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
}
throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sinkName, "register", authParams);
try {
// Check tenant exists
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
String qualifiedNamespace = tenant + "/" + namespace;
List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sinkName, namespace);
throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client is not authorized to operate {} on tenant", tenant, namespace,
sinkName, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sinkName, tenant);
throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
} catch (PulsarAdminException e) {
log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sinkName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace,
sinkName);
throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sinkName));
}
Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
if (isNotBlank(sinkPkgUrl)) {
componentPackageFile = getPackageFile(componentType, sinkPkgUrl);
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
sinkConfig, componentPackageFile);
} else {
if (uploadedInputStream != null) {
componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
}
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
sinkConfig, componentPackageFile);
if (!isFunctionCodeBuiltin(functionDetails)
&& (componentPackageFile == null || fileDetail == null)) {
throw new IllegalArgumentException(
ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
}
} catch (Exception e) {
log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
namespace, sinkName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
log.error("{} {}/{}/{} cannot be admitted by the runtime factory",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType),
sinkName, e.getMessage()));
}
// function state
Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(functionDetails)
.setCreateTime(System.currentTimeMillis())
.setVersion(0);
// cache auth if need
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails finalFunctionDetails = functionDetails;
worker().getFunctionRuntimeManager()
.getRuntimeFactory()
.getAuthProvider().ifPresent(functionAuthProvider -> {
if (authParams.getClientAuthenticationDataSource() != null) {
try {
Optional<FunctionAuthData> functionAuthData = functionAuthProvider
.cacheAuthData(finalFunctionDetails,
authParams.getClientAuthenticationDataSource());
functionAuthData.ifPresent(authData -> functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(authData.getData()))
.build()));
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
}
}
});
}
Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
try {
packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
sinkPkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant,
namespace, sinkName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
String transformFunction = sinkConfig.getTransformFunction();
if (isNotBlank(transformFunction)) {
setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
}
updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sinkPkgUrl == null || !sinkPkgUrl.startsWith(Utils.FILE)) {
componentPackageFile.delete();
}
}
}
}