in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java [75:232]
public void registerSource(final String tenant,
final String namespace,
final String sourceName,
final InputStream uploadedInputStream,
final FormDataContentDisposition fileDetail,
final String sourcePkgUrl,
final SourceConfig sourceConfig,
final AuthenticationParameters authParams) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (tenant == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
}
if (namespace == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
}
if (sourceName == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Source name is not provided");
}
if (sourceConfig == null) {
throw new RestException(Response.Status.BAD_REQUEST, "Source config is not provided");
}
throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sourceName, "register", authParams);
try {
// Check tenant exists
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
String qualifiedNamespace = tenant + "/" + namespace;
List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sourceName, namespace);
throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client is not authorized to operate {} on tenant", tenant, namespace,
sourceName, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sourceName, tenant);
throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
} catch (PulsarAdminException e) {
log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sourceName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (functionMetaDataManager.containsFunction(tenant, namespace, sourceName)) {
log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace,
sourceName);
throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sourceName));
}
Function.FunctionDetails functionDetails = null;
boolean isPkgUrlProvided = isNotBlank(sourcePkgUrl);
File componentPackageFile = null;
try {
// validate parameters
try {
if (isPkgUrlProvided) {
componentPackageFile = getPackageFile(componentType, sourcePkgUrl);
functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
sourceConfig, componentPackageFile);
} else {
if (uploadedInputStream != null) {
componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
}
functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
sourceConfig, componentPackageFile);
if (!isFunctionCodeBuiltin(functionDetails)
&& (componentPackageFile == null || fileDetail == null)) {
throw new IllegalArgumentException(
ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
}
} catch (Exception e) {
log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
namespace, sourceName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
log.error("{} {}/{}/{} cannot be admitted by the runtime factory",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType),
sourceName, e.getMessage()));
}
// function state
Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(functionDetails)
.setCreateTime(System.currentTimeMillis())
.setVersion(0);
// cache auth if need
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails finalFunctionDetails = functionDetails;
worker().getFunctionRuntimeManager()
.getRuntimeFactory()
.getAuthProvider().ifPresent(functionAuthProvider -> {
if (authParams.getClientAuthenticationDataSource() != null) {
try {
Optional<FunctionAuthData> functionAuthData = functionAuthProvider
.cacheAuthData(finalFunctionDetails,
authParams.getClientAuthenticationDataSource());
functionAuthData.ifPresent(authData -> functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(authData.getData()))
.build()));
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
}
}
});
}
Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
try {
packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
sourcePkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant,
namespace, sourceName, e);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sourcePkgUrl == null || !sourcePkgUrl.startsWith(Utils.FILE)) {
componentPackageFile.delete();
}
}
}
}