pilot/pkg/config/file/store.go (399 lines of code) (raw):

/* Copyright Istio Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package file import ( "bufio" "bytes" "crypto/sha256" "encoding/json" "errors" "fmt" "io" "strings" "sync" ) import ( "github.com/hashicorp/go-multierror" yamlv3 "gopkg.in/yaml.v3" "istio.io/pkg/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" kubeJson "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/tools/cache" ) import ( kubeyaml2 "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/file/util/kubeyaml" "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/config" kube2 "github.com/apache/dubbo-go-pixiu/pkg/config/legacy/source/kube" "github.com/apache/dubbo-go-pixiu/pkg/config/resource" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection" schemaresource "github.com/apache/dubbo-go-pixiu/pkg/config/schema/resource" "github.com/apache/dubbo-go-pixiu/pkg/kube" "github.com/apache/dubbo-go-pixiu/pkg/util/sets" ) var ( inMemoryKubeNameDiscriminator int64 scope = log.RegisterScope("file", "File client messages", 0) ) // KubeSource is an in-memory source implementation that can handle K8s style resources. type KubeSource struct { mu sync.Mutex name string schemas *collection.Schemas inner model.ConfigStore defaultNs resource.Namespace versionCtr int64 shas map[kubeResourceKey]resourceSha byFile map[string]map[kubeResourceKey]config.GroupVersionKind } func (s *KubeSource) Schemas() collection.Schemas { return *s.schemas } func (s *KubeSource) Get(typ config.GroupVersionKind, name, namespace string) *config.Config { return s.inner.Get(typ, name, namespace) } func (s *KubeSource) List(typ config.GroupVersionKind, namespace string) ([]config.Config, error) { return s.inner.List(typ, namespace) } func (s *KubeSource) Create(config config.Config) (revision string, err error) { return s.inner.Create(config) } func (s *KubeSource) Update(config config.Config) (newRevision string, err error) { return s.inner.Update(config) } func (s *KubeSource) UpdateStatus(config config.Config) (newRevision string, err error) { return s.inner.UpdateStatus(config) } func (s *KubeSource) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) { return s.inner.Patch(orig, patchFn) } func (s *KubeSource) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error { return s.inner.Delete(typ, name, namespace, resourceVersion) } func (s *KubeSource) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) { panic("implement me") } func (s *KubeSource) Run(stop <-chan struct{}) { } func (s *KubeSource) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error { panic("implement me") } func (s *KubeSource) HasSynced() bool { return true } type resourceSha [sha256.Size]byte type kubeResource struct { // resource *resource.Instance config *config.Config schema collection.Schema sha resourceSha } func (r *kubeResource) newKey() kubeResourceKey { return kubeResourceKey{ kind: r.schema.Resource().Kind(), fullName: r.fullName(), } } func (r *kubeResource) fullName() resource.FullName { return resource.NewFullName(resource.Namespace(r.config.Namespace), resource.LocalName(r.config.Name)) } type kubeResourceKey struct { fullName resource.FullName kind string } var _ model.ConfigStore = &KubeSource{} // NewKubeSource returns a new in-memory Source that works with Kubernetes resources. func NewKubeSource(schemas collection.Schemas) *KubeSource { name := fmt.Sprintf("kube-inmemory-%d", inMemoryKubeNameDiscriminator) inMemoryKubeNameDiscriminator++ return &KubeSource{ name: name, schemas: &schemas, inner: memory.MakeSkipValidation(schemas), shas: make(map[kubeResourceKey]resourceSha), byFile: make(map[string]map[kubeResourceKey]config.GroupVersionKind), } } // SetDefaultNamespace enables injecting a default namespace for resources where none is already specified func (s *KubeSource) SetDefaultNamespace(defaultNs resource.Namespace) { s.defaultNs = defaultNs } // Clear the contents of this source func (s *KubeSource) Clear() { s.versionCtr = 0 s.shas = make(map[kubeResourceKey]resourceSha) s.byFile = make(map[string]map[kubeResourceKey]config.GroupVersionKind) s.inner = memory.MakeSkipValidation(*s.schemas) } // ContentNames returns the names known to this source. func (s *KubeSource) ContentNames() map[string]struct{} { s.mu.Lock() defer s.mu.Unlock() result := sets.New() for n := range s.byFile { result.Insert(n) } return result } // ApplyContent applies the given yamltext to this source. The content is tracked with the given name. If ApplyContent // gets called multiple times with the same name, the contents applied by the previous incarnation will be overwritten // or removed, depending on the new content. // Returns an error if any were encountered, but that still may represent a partial success func (s *KubeSource) ApplyContent(name, yamlText string) error { s.mu.Lock() defer s.mu.Unlock() // We hold off on dealing with parseErr until the end, since partial success is possible resources, parseErrs := s.parseContent(s.schemas, name, yamlText) oldKeys := s.byFile[name] newKeys := make(map[kubeResourceKey]config.GroupVersionKind) for _, r := range resources { key := r.newKey() oldSha, found := s.shas[key] if !found || oldSha != r.sha { s.versionCtr++ r.config.ResourceVersion = fmt.Sprintf("v%d", s.versionCtr) scope.Debug("KubeSource.ApplyContent: Set: ", r.schema.Name(), r.fullName()) // apply is idempotent, but configstore is not, thus the odd logic here _, err := s.inner.Update(*r.config) if err != nil { _, err = s.inner.Create(*r.config) if err != nil { return fmt.Errorf("cannot store config %s/%s %s from reader: %s", r.schema.Resource().Version(), r.schema.Resource().Kind(), r.fullName(), err) } } s.shas[key] = r.sha } newKeys[key] = r.schema.Resource().GroupVersionKind() if oldKeys != nil { scope.Debug("KubeSource.ApplyContent: Delete: ", r.schema.Name(), key) delete(oldKeys, key) } } for k, col := range oldKeys { empty := "" err := s.inner.Delete(col, k.fullName.Name.String(), k.fullName.Namespace.String(), &empty) if err != nil { scope.Errorf("encountered unexpected error removing resource from filestore: %s", err) } } s.byFile[name] = newKeys if parseErrs != nil { return fmt.Errorf("errors parsing content %q: %v", name, parseErrs) } return nil } // RemoveContent removes the content for the given name func (s *KubeSource) RemoveContent(name string) { s.mu.Lock() defer s.mu.Unlock() keys := s.byFile[name] if keys != nil { for key, col := range keys { empty := "" err := s.inner.Delete(col, key.fullName.Name.String(), key.fullName.Namespace.String(), &empty) if err != nil { scope.Errorf("encountered unexpected error removing resource from filestore: %s", err) } delete(s.shas, key) } delete(s.byFile, name) } } func (s *KubeSource) parseContent(r *collection.Schemas, name, yamlText string) ([]kubeResource, error) { var resources []kubeResource var errs error reader := bufio.NewReader(strings.NewReader(yamlText)) decoder := kubeyaml2.NewYAMLReader(reader) chunkCount := -1 for { chunkCount++ doc, lineNum, err := decoder.Read() if err == io.EOF { break } if err != nil { e := fmt.Errorf("error reading documents in %s[%d]: %v", name, chunkCount, err) scope.Warnf("%v - skipping", e) scope.Debugf("Failed to parse yamlText chunk: %v", yamlText) errs = multierror.Append(errs, e) break } chunk := bytes.TrimSpace(doc) r, err := s.parseChunk(r, name, lineNum, chunk) if err != nil { var uerr *unknownSchemaError if errors.As(err, &uerr) { // Note the error to the debug log but continue scope.Debugf("skipping unknown yaml chunk %s: %s", name, uerr.Error()) } else { e := fmt.Errorf("error processing %s[%d]: %v", name, chunkCount, err) scope.Warnf("%v - skipping", e) scope.Debugf("Failed to parse yaml chunk: %v", string(chunk)) errs = multierror.Append(errs, e) } continue } resources = append(resources, r) } return resources, errs } // unknownSchemaError represents a schema was not found for a group+version+kind. type unknownSchemaError struct { group string version string kind string } func (e unknownSchemaError) Error() string { return fmt.Sprintf("failed finding schema for group/version/kind: %s/%s/%s", e.group, e.version, e.kind) } func (s *KubeSource) parseChunk(r *collection.Schemas, name string, lineNum int, yamlChunk []byte) (kubeResource, error) { // Convert to JSON jsonChunk, err := yaml.ToJSON(yamlChunk) if err != nil { return kubeResource{}, fmt.Errorf("failed converting YAML to JSON: %v", err) } // Peek at the beginning of the JSON to groupVersionKind, err := kubeJson.DefaultMetaFactory.Interpret(jsonChunk) if err != nil { return kubeResource{}, fmt.Errorf("failed interpreting jsonChunk: %v", err) } if groupVersionKind.Empty() { return kubeResource{}, fmt.Errorf("unable to parse resource with no group, version and kind") } schema, found := r.FindByGroupVersionKind(schemaresource.FromKubernetesGVK(groupVersionKind)) if !found { return kubeResource{}, &unknownSchemaError{ group: groupVersionKind.Group, version: groupVersionKind.Version, kind: groupVersionKind.Kind, } } // Cannot create new instance. This occurs because while newer types do not implement proto.Message, // this legacy code only supports proto.Messages. // Note: while NewInstance can be slightly modified to not return error here, the rest of the code // still requires a proto.Message so it won't work without completely refactoring galley/ _, e := schema.Resource().NewInstance() cannotHandleProto := e != nil if cannotHandleProto { return kubeResource{}, &unknownSchemaError{ group: groupVersionKind.Group, version: groupVersionKind.Version, kind: groupVersionKind.Kind, } } runtimeScheme := runtime.NewScheme() codecs := serializer.NewCodecFactory(runtimeScheme) deserializer := codecs.UniversalDeserializer() obj, err := kube.IstioScheme.New(schema.Resource().GroupVersionKind().Kubernetes()) if err != nil { return kubeResource{}, fmt.Errorf("failed to initialize interface for built-in type: %v", err) } _, _, err = deserializer.Decode(jsonChunk, nil, obj) if err != nil { return kubeResource{}, fmt.Errorf("failed parsing JSON for built-in type: %v", err) } objMeta, ok := obj.(metav1.Object) if !ok { return kubeResource{}, errors.New("failed to assert type of object metadata") } // If namespace is blank and we have a default set, fill in the default // (This mirrors the behavior if you kubectl apply a resource without a namespace defined) // Don't do this for cluster scoped resources if !schema.Resource().IsClusterScoped() { if objMeta.GetNamespace() == "" && s.defaultNs != "" { scope.Debugf("KubeSource.parseChunk: namespace not specified for %q, using %q", objMeta.GetName(), s.defaultNs) objMeta.SetNamespace(string(s.defaultNs)) } } else { // Clear the namespace if there is any specified. objMeta.SetNamespace("") } // Build flat map for analyzers if the line JSON object exists, if the YAML text is ill-formed, this will be nil fieldMap := make(map[string]int) // yamlv3.Node contains information like line number of the node, which will be used with its name to construct the field map yamlChunkNode := yamlv3.Node{} err = yamlv3.Unmarshal(yamlChunk, &yamlChunkNode) if err == nil && len(yamlChunkNode.Content) == 1 { // Get the Node that contains all the YAML chunk information yamlNode := yamlChunkNode.Content[0] BuildFieldPathMap(yamlNode, lineNum, "", fieldMap) } pos := kube2.Position{Filename: name, Line: lineNum} c, err := ToConfig(objMeta, schema, &pos, fieldMap) if err != nil { return kubeResource{}, err } return kubeResource{ schema: schema, sha: sha256.Sum256(yamlChunk), config: c, }, nil } const ( FieldMapKey = "istiofilefieldmap" ReferenceKey = "istiosource" ) // ToConfig converts the given object and proto to a config.Config func ToConfig(object metav1.Object, schema collection.Schema, source resource.Reference, fieldMap map[string]int) (*config.Config, error) { m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object) if err != nil { return nil, err } u := &unstructured.Unstructured{Object: m} if len(fieldMap) > 0 || source != nil { // TODO: populate annots := u.GetAnnotations() if annots == nil { annots = map[string]string{} } jsonfm, err := json.Marshal(fieldMap) if err != nil { return nil, err } annots[FieldMapKey] = string(jsonfm) jsonsource, err := json.Marshal(source) if err != nil { return nil, err } annots[ReferenceKey] = string(jsonsource) u.SetAnnotations(annots) } result := TranslateObject(u, "", schema) return result, nil } func TranslateObject(obj *unstructured.Unstructured, domainSuffix string, schema collection.Schema) *config.Config { mv2, err := schema.Resource().NewInstance() if err != nil { panic(err) } if spec, ok := obj.UnstructuredContent()["spec"]; ok { err = runtime.DefaultUnstructuredConverter.FromUnstructured(spec.(map[string]interface{}), mv2) } else { err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), mv2) } if err != nil { panic(err) } m := obj return &config.Config{ Meta: config.Meta{ GroupVersionKind: config.GroupVersionKind{ Group: m.GetObjectKind().GroupVersionKind().Group, Version: m.GetObjectKind().GroupVersionKind().Version, Kind: m.GetObjectKind().GroupVersionKind().Kind, }, UID: string(m.GetUID()), Name: m.GetName(), Namespace: m.GetNamespace(), Labels: m.GetLabels(), Annotations: m.GetAnnotations(), ResourceVersion: m.GetResourceVersion(), CreationTimestamp: m.GetCreationTimestamp().Time, OwnerReferences: m.GetOwnerReferences(), Generation: m.GetGeneration(), Domain: domainSuffix, }, Spec: mv2, } } // BuildFieldPathMap builds the flat map for each field of the YAML resource func BuildFieldPathMap(yamlNode *yamlv3.Node, startLineNum int, curPath string, fieldPathMap map[string]int) { // If no content in the node, terminate the DFS search if len(yamlNode.Content) == 0 { return } nodeContent := yamlNode.Content // Iterate content by a step of 2, because in the content array the value is in the key's next index position for i := 0; i < len(nodeContent)-1; i += 2 { // Two condition, i + 1 positions have no content, which means they have the format like "key: value", then build the map // Or i + 1 has contents, which means "key:\n value...", then perform one more DFS search keyNode := nodeContent[i] valueNode := nodeContent[i+1] pathKeyForMap := fmt.Sprintf("%s.%s", curPath, keyNode.Value) switch { case valueNode.Kind == yamlv3.ScalarNode: // Can build map because the value node has no content anymore // minus one because startLineNum starts at line 1, and yamlv3.Node.line also starts at line 1 fieldPathMap[fmt.Sprintf("{%s}", pathKeyForMap)] = valueNode.Line + startLineNum - 1 case valueNode.Kind == yamlv3.MappingNode: BuildFieldPathMap(valueNode, startLineNum, pathKeyForMap, fieldPathMap) case valueNode.Kind == yamlv3.SequenceNode: for j, node := range valueNode.Content { pathWithIndex := fmt.Sprintf("%s[%d]", pathKeyForMap, j) // Array with values or array with maps if node.Kind == yamlv3.ScalarNode { fieldPathMap[fmt.Sprintf("{%s}", pathWithIndex)] = node.Line + startLineNum - 1 } else { BuildFieldPathMap(node, startLineNum, pathWithIndex, fieldPathMap) } } } } }