in pkg/apiserver/customresource_handler.go [599:976]
func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crdInfo, error) {
storageMap := r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[uid]; ok {
return ret, nil
}
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
// Get the up-to-date CRD when we have the lock, to avoid racing with updateCustomResourceDefinition.
// If updateCustomResourceDefinition sees an update and happens later, the storage will be deleted and
// we will re-create the updated storage on demand. If updateCustomResourceDefinition happens before,
// we make sure that we observe the same up-to-date CRD.
crd, err := r.crdLister.Get(name)
if err != nil {
return nil, err
}
storageMap = r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[crd.UID]; ok {
return ret, nil
}
storageVersion, err := apiextensionshelpers.GetCRDStorageVersion(crd)
if err != nil {
return nil, err
}
// Scope/Storages per version.
requestScopes := map[string]*handlers.RequestScope{}
storages := map[string]customresource.CustomResourceStorage{}
statusScopes := map[string]*handlers.RequestScope{}
scaleScopes := map[string]*handlers.RequestScope{}
deprecated := map[string]bool{}
warnings := map[string][]string{}
equivalentResourceRegistry := runtime.NewEquivalentResourceRegistry()
structuralSchemas := map[string]*structuralschema.Structural{}
for _, v := range crd.Spec.Versions {
val, err := apiextensionshelpers.GetSchemaForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR schema")
}
if val == nil {
continue
}
internalValidation := &apiextensionsinternal.CustomResourceValidation{}
if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(val, internalValidation, nil); err != nil {
return nil, fmt.Errorf("failed converting CRD validation to internal version: %v", err)
}
s, err := structuralschema.NewStructural(internalValidation.OpenAPIV3Schema)
if crd.Spec.PreserveUnknownFields == false && err != nil {
// This should never happen. If it does, it is a programming error.
utilruntime.HandleError(fmt.Errorf("failed to convert schema to structural: %v", err))
return nil, fmt.Errorf("the server could not properly serve the CR schema") // validation should avoid this
}
if crd.Spec.PreserveUnknownFields == false {
// we don't own s completely, e.g. defaults are not deep-copied. So better make a copy here.
s = s.DeepCopy()
if err := structuraldefaulting.PruneDefaults(s); err != nil {
// This should never happen. If it does, it is a programming error.
utilruntime.HandleError(fmt.Errorf("failed to prune defaults: %v", err))
return nil, fmt.Errorf("the server could not properly serve the CR schema") // validation should avoid this
}
}
structuralSchemas[v.Name] = s
}
openAPIModels, err := buildOpenAPIModelsForApply(r.staticOpenAPISpec, crd)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error building openapi models for %s: %v", crd.Name, err))
openAPIModels = nil
}
var typeConverter fieldmanager.TypeConverter = fieldmanager.DeducedTypeConverter{}
if openAPIModels != nil {
typeConverter, err = fieldmanager.NewTypeConverter(openAPIModels, crd.Spec.PreserveUnknownFields)
if err != nil {
return nil, err
}
}
safeConverter, unsafeConverter, err := r.converterFactory.NewConverter(crd)
if err != nil {
return nil, err
}
// Create replicasPathInCustomResource
replicasPathInCustomResource := fieldmanager.ResourcePathMappings{}
for _, v := range crd.Spec.Versions {
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR subresources")
}
if subresources == nil || subresources.Scale == nil {
replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = nil
continue
}
path := fieldpath.Path{}
splitReplicasPath := strings.Split(strings.TrimPrefix(subresources.Scale.SpecReplicasPath, "."), ".")
for _, element := range splitReplicasPath {
s := element
path = append(path, fieldpath.PathElement{FieldName: &s})
}
replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = path
}
for _, v := range crd.Spec.Versions {
// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
// decode unversioned Options objects, so we delegate to parameterScheme for such types.
parameterScheme := runtime.NewScheme()
parameterScheme.AddUnversionedTypes(schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
&metav1.ListOptions{},
&metav1.GetOptions{},
&metav1.DeleteOptions{},
)
parameterCodec := runtime.NewParameterCodec(parameterScheme)
resource := schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural}
kind := schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.Kind}
equivalentResourceRegistry.RegisterKindFor(resource, "", kind)
typer := newUnstructuredObjectTyper(parameterScheme)
creator := unstructuredCreator{}
validationSchema, err := apiextensionshelpers.GetSchemaForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR schema")
}
var internalValidationSchema *apiextensionsinternal.CustomResourceValidation
if validationSchema != nil {
internalValidationSchema = &apiextensionsinternal.CustomResourceValidation{}
if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(validationSchema, internalValidationSchema, nil); err != nil {
return nil, fmt.Errorf("failed to convert CRD validation to internal version: %v", err)
}
}
validator, _, err := apiservervalidation.NewSchemaValidator(internalValidationSchema)
if err != nil {
return nil, err
}
var statusSpec *apiextensionsinternal.CustomResourceSubresourceStatus
var statusValidator *validate.SchemaValidator
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR subresources")
}
if subresources != nil && subresources.Status != nil {
equivalentResourceRegistry.RegisterKindFor(resource, "status", kind)
statusSpec = &apiextensionsinternal.CustomResourceSubresourceStatus{}
if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceStatus_To_apiextensions_CustomResourceSubresourceStatus(subresources.Status, statusSpec, nil); err != nil {
return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
}
// for the status subresource, validate only against the status schema
if internalValidationSchema != nil && internalValidationSchema.OpenAPIV3Schema != nil && internalValidationSchema.OpenAPIV3Schema.Properties != nil {
if statusSchema, ok := internalValidationSchema.OpenAPIV3Schema.Properties["status"]; ok {
openapiSchema := &spec.Schema{}
if err := apiservervalidation.ConvertJSONSchemaPropsWithPostProcess(&statusSchema, openapiSchema, apiservervalidation.StripUnsupportedFormatsPostProcess); err != nil {
return nil, err
}
statusValidator = validate.NewSchemaValidator(openapiSchema, nil, "", strfmt.Default)
}
}
}
var scaleSpec *apiextensionsinternal.CustomResourceSubresourceScale
if subresources != nil && subresources.Scale != nil {
equivalentResourceRegistry.RegisterKindFor(resource, "scale", autoscalingv1.SchemeGroupVersion.WithKind("Scale"))
scaleSpec = &apiextensionsinternal.CustomResourceSubresourceScale{}
if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceScale_To_apiextensions_CustomResourceSubresourceScale(subresources.Scale, scaleSpec, nil); err != nil {
return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
}
}
columns, err := getColumnsForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR columns")
}
table, err := tableconvertor.New(columns)
if err != nil {
klog.V(2).Infof("The CRD for %v has an invalid printer specification, falling back to default printing: %v", kind, err)
}
storages[v.Name] = customresource.NewStorage(
resource.GroupResource(),
kind,
schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.ListKind},
customresource.NewStrategy(
typer,
crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
kind,
validator,
statusValidator,
structuralSchemas,
statusSpec,
scaleSpec,
),
crdConversionRESTOptionsGetter{
RESTOptionsGetter: r.restOptionsGetter,
converter: safeConverter,
decoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
encoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: storageVersion},
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: crd.Spec.PreserveUnknownFields,
},
crd.Status.AcceptedNames.Categories,
table,
replicasPathInCustomResource,
)
selfLinkPrefix := ""
switch crd.Spec.Scope {
case apiextensionsv1.ClusterScoped:
selfLinkPrefix = "/" + path.Join("apis", crd.Spec.Group, v.Name) + "/" + crd.Status.AcceptedNames.Plural + "/"
case apiextensionsv1.NamespaceScoped:
selfLinkPrefix = "/" + path.Join("apis", crd.Spec.Group, v.Name, "namespaces") + "/"
}
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
converter: safeConverter,
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: crd.Spec.PreserveUnknownFields,
}
var standardSerializers []runtime.SerializerInfo
for _, s := range negotiatedSerializer.SupportedMediaTypes() {
if s.MediaType == runtime.ContentTypeProtobuf {
continue
}
standardSerializers = append(standardSerializers, s)
}
requestScopes[v.Name] = &handlers.RequestScope{
Namer: handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
},
Serializer: negotiatedSerializer,
ParameterCodec: parameterCodec,
StandardSerializers: standardSerializers,
Creater: creator,
Convertor: safeConverter,
Defaulter: unstructuredDefaulter{parameterScheme, structuralSchemas, kind.GroupKind()},
Typer: typer,
UnsafeConvertor: unsafeConverter,
EquivalentResourceMapper: equivalentResourceRegistry,
Resource: schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural},
Kind: kind,
// a handler for a specific group-version of a custom resource uses that version as the in-memory representation
HubGroupVersion: kind.GroupVersion(),
MetaGroupVersion: metav1.SchemeGroupVersion,
TableConvertor: storages[v.Name].CustomResource,
Authorizer: r.authorizer,
MaxRequestBodyBytes: r.maxRequestBodyBytes,
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
resetFields := storages[v.Name].CustomResource.GetResetFields()
reqScope := *requestScopes[v.Name]
reqScope, err = scopeWithFieldManager(
typeConverter,
reqScope,
resetFields,
"",
)
if err != nil {
return nil, err
}
requestScopes[v.Name] = &reqScope
}
// override scaleSpec subresource values
// shallow copy
scaleScope := *requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/scale",
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) && subresources != nil && subresources.Scale != nil {
scaleScope, err = scopeWithFieldManager(
typeConverter,
scaleScope,
nil,
"scale",
)
if err != nil {
return nil, err
}
}
scaleScopes[v.Name] = &scaleScope
// override status subresource values
// shallow copy
statusScope := *requestScopes[v.Name]
statusScope.Subresource = "status"
statusScope.Namer = handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/status",
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) && subresources != nil && subresources.Status != nil {
resetFields := storages[v.Name].Status.GetResetFields()
statusScope, err = scopeWithFieldManager(
typeConverter,
statusScope,
resetFields,
"status",
)
if err != nil {
return nil, err
}
}
statusScopes[v.Name] = &statusScope
if v.Deprecated {
deprecated[v.Name] = true
if v.DeprecationWarning != nil {
warnings[v.Name] = append(warnings[v.Name], *v.DeprecationWarning)
} else {
warnings[v.Name] = append(warnings[v.Name], defaultDeprecationWarning(v.Name, crd.Spec))
}
}
}
ret := &crdInfo{
spec: &crd.Spec,
acceptedNames: &crd.Status.AcceptedNames,
storages: storages,
requestScopes: requestScopes,
scaleRequestScopes: scaleScopes,
statusRequestScopes: statusScopes,
deprecated: deprecated,
warnings: warnings,
storageVersion: storageVersion,
waitGroup: &utilwaitgroup.SafeWaitGroup{},
}
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()
storageMap2[crd.UID] = ret
r.customStorage.Store(storageMap2)
return ret, nil
}