pkg/log/structure/merger/merged.go (689 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package merger
import (
"fmt"
"slices"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/khi/pkg/log/structure/structuredata"
)
type arrayMergeResultElementReference struct {
From structuredata.StructureData
Index string
}
type arrayMergeResultSourceCache struct {
References []*arrayMergeResultElementReference
}
type strategicMergePatchKeys struct {
FieldKeys []string
StrategicMergeAttibutes []string
}
type strategicMergePatchArrayEleemnts struct {
Elements []structuredata.StructureData
Replace bool
DeletedFields []structuredata.StructureData
}
type deleteFromPrimitiveListDirective struct {
list []any
}
type setElementOrderDirective struct {
list []any
}
type StrategicMergedStructureData struct {
path string
prev structuredata.StructureData
patch structuredata.StructureData
mergeKeyResolver *MergeConfigResolver
deleteFromPrimitiveListDirective *deleteFromPrimitiveListDirective
setElementOrderDirective *setElementOrderDirective
arrayMergeResultSourceCache *arrayMergeResultSourceCache
}
func NewStrategicMergedStructureData(path string, prev structuredata.StructureData, patch structuredata.StructureData, mergeKeyResolver *MergeConfigResolver) *StrategicMergedStructureData {
return &StrategicMergedStructureData{
path: path,
prev: prev,
patch: patch,
mergeKeyResolver: mergeKeyResolver,
deleteFromPrimitiveListDirective: nil,
arrayMergeResultSourceCache: nil,
setElementOrderDirective: nil,
}
}
// Keys implements structuredata.StructureData.
func (d *StrategicMergedStructureData) Keys() ([]string, error) {
prevKeys, err := d.prev.Keys()
if err != nil {
return nil, err
}
patchKeys, err := d.patch.Keys()
if err != nil {
return nil, err
}
ty, err := d.Type()
if err != nil {
return nil, err
}
patchAttr, err := d.readScalarAsString(d.patch, "$patch")
if patchAttr == "delete" {
return []string{""}, nil
}
switch ty {
case structuredata.StructuredTypeMap:
keydiff := NewKeyDiff(prevKeys, patchKeys)
strategicPatchKeys := splitPatchKeysByFieldsOrMergeAttributes(keydiff.OnlyInPatch)
if err == nil {
if patchAttr == "replace" {
return append(keydiff.Both, strategicPatchKeys.FieldKeys...), nil
}
}
result := []string{}
result = append(result, prevKeys...)
result = append(result, strategicPatchKeys.FieldKeys...)
retainKeysAttr, err := d.readStringArray(d.patch, "$retainKeys")
if err == nil {
retainKeysInMap := map[string]struct{}{}
for _, key := range retainKeysAttr {
retainKeysInMap[key] = struct{}{}
}
retainResult := []string{}
for _, key := range result {
if _, in := retainKeysInMap[key]; in {
retainResult = append(retainResult, key)
}
}
return retainResult, nil
}
return result, nil
case structuredata.StructuredTypeArray:
if d.arrayMergeResultSourceCache == nil {
err := d.buildArrayMergeResultSourceCache()
if err != nil {
return nil, err
}
}
return stringSequence(len(d.arrayMergeResultSourceCache.References)), nil
default:
return []string{""}, nil
}
}
// Type implements structuredata.StructureData.
func (d *StrategicMergedStructureData) Type() (structuredata.StructuredDataFieldType, error) {
patchAttr, err := d.readScalarAsString(d.patch, "$patch")
if err == nil && patchAttr == "delete" {
return structuredata.StructuredTypeScalar, nil
}
return d.patch.Type()
}
// Value implements structuredata.StructureData.
func (d *StrategicMergedStructureData) Value(fieldName string) (any, error) {
ty, err := d.Type()
if err != nil {
return nil, err
}
switch ty {
case structuredata.StructuredTypeScalar:
patchPolicy, err := d.readScalarAsString(d.patch, "$patch")
if err == nil && patchPolicy == "delete" {
return nil, nil
}
return d.patch.Value(fieldName)
case structuredata.StructuredTypeMap:
if fieldName == "" {
return d, nil
}
patchPolicy, err := d.readScalarAsString(d.patch, "$patch")
if err == nil && patchPolicy == "replace" {
return d.patch.Value(fieldName)
}
retainKeysAttr, err := d.readStringArray(d.patch, "$retainKeys")
if err == nil {
kept := false
for _, key := range retainKeysAttr {
if key == fieldName {
kept = true
break
}
}
if !kept {
return fmt.Errorf("field not found:%s(patched by delete already)", fieldName), nil
}
}
prevFound := false
patchFound := false
var patchValueStructure structuredata.StructureData
var prevValueStructure structuredata.StructureData
patchValue, err := d.patch.Value(fieldName)
if err == nil {
var convertible bool
patchFound = true
patchValueStructure, convertible = patchValue.(structuredata.StructureData)
if !convertible {
return nil, fmt.Errorf("unreachable. Map children was not the structure data")
}
}
prevValue, err := d.prev.Value(fieldName)
if err == nil {
var convertible bool
prevValueStructure, convertible = prevValue.(structuredata.StructureData)
if !convertible {
return nil, fmt.Errorf("unreachable. Map children was not the structure data")
}
prevValueType, err := prevValueStructure.Type()
if err != nil {
return nil, err
}
// Ignore if the previous value is nil
if prevValueType == structuredata.StructuredTypeScalar {
prevActualValue, err := prevValueStructure.Value("")
if err != nil {
return nil, err
}
if prevActualValue != nil {
prevFound = true
} else if !patchFound {
return prevValueStructure, nil
}
} else {
prevFound = true
}
}
switch {
case prevFound && patchFound:
result := NewStrategicMergedStructureData(fmt.Sprintf("%s.%s", d.path, fieldName), prevValueStructure, patchValueStructure, d.mergeKeyResolver)
deleteFromPrimitiveList, err := d.readArray(d.patch, fmt.Sprintf("$deleteFromPrimitiveList/%s", fieldName))
if err == nil {
result.deleteFromPrimitiveListDirective = &deleteFromPrimitiveListDirective{
list: deleteFromPrimitiveList,
}
}
setOrderElementList, err := d.readArray(d.patch, fmt.Sprintf("$setElementOrder/%s", fieldName))
if err == nil {
result.setElementOrderDirective = &setElementOrderDirective{
list: setOrderElementList,
}
}
return result, nil
case prevFound:
result := NewStrategicMergedStructureData(fmt.Sprintf("%s.%s", d.path, fieldName), prevValueStructure, prevValueStructure, d.mergeKeyResolver)
deleteFromPrimitiveList, err := d.readArray(d.patch, fmt.Sprintf("$deleteFromPrimitiveList/%s", fieldName))
if err == nil {
result.deleteFromPrimitiveListDirective = &deleteFromPrimitiveListDirective{
list: deleteFromPrimitiveList,
}
}
setOrderElementList, err := d.readArray(d.patch, fmt.Sprintf("$setElementOrder/%s", fieldName))
if err == nil {
result.setElementOrderDirective = &setElementOrderDirective{
list: setOrderElementList,
}
}
return result, nil
case patchFound:
result := NewStrategicMergedStructureData(fmt.Sprintf("%s.%s", d.path, fieldName), patchValueStructure, patchValueStructure, d.mergeKeyResolver)
deleteFromPrimitiveList, err := d.readArray(d.patch, fmt.Sprintf("$deleteFromPrimitiveList/%s", fieldName))
if err == nil {
result.deleteFromPrimitiveListDirective = &deleteFromPrimitiveListDirective{
list: deleteFromPrimitiveList,
}
}
setOrderElementList, err := d.readArray(d.patch, fmt.Sprintf("$setElementOrder/%s", fieldName))
if err == nil {
result.setElementOrderDirective = &setElementOrderDirective{
list: setOrderElementList,
}
}
return result, nil
default:
return nil, fmt.Errorf("field not found:%s", fieldName)
}
case structuredata.StructuredTypeArray:
if fieldName == "" {
return d, nil
}
if d.arrayMergeResultSourceCache == nil {
err := d.buildArrayMergeResultSourceCache()
if err != nil {
return nil, err
}
}
index, err := strconv.Atoi(fieldName)
if err != nil {
return nil, err
}
if index < 0 || index >= len(d.arrayMergeResultSourceCache.References) {
return nil, fmt.Errorf("index out of range")
}
ref := d.arrayMergeResultSourceCache.References[index]
if ref.Index == "" {
return ref.From, nil
}
return ref.From.Value(ref.Index)
default:
return nil, fmt.Errorf("unsupported type to merge")
}
}
func (d *StrategicMergedStructureData) buildArrayMergeResultSourceCache() error {
d.arrayMergeResultSourceCache = &arrayMergeResultSourceCache{
References: make([]*arrayMergeResultElementReference, 0),
}
patchKeys, err := d.patch.Keys()
if err != nil {
return err
}
strategy := d.mergeKeyResolver.GetMergeArrayStrategy(d.path)
if strategy == MergeStrategyReplace {
for _, key := range patchKeys {
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: d.patch,
Index: key,
})
}
} else {
mergeKey, err := d.mergeKeyResolver.GetMergeKey(d.path)
if err != nil {
return err
}
patchElements, err := d.splitPatchKeysByElementsOrMergeAttributesInArray(d.patch)
if err != nil {
return err
}
if patchElements.Replace {
for _, elem := range patchElements.Elements {
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: elem,
Index: "",
})
}
return nil
}
prevFields := []structuredata.StructureData{}
prevKeys, err := d.prev.Keys()
if err != nil {
return err
}
if len(prevKeys) == 1 && prevKeys[0] == "" {
// The previous value is nil
} else {
for _, key := range prevKeys {
vany, err := d.prev.Value(key)
if err != nil {
return err
}
vst, convertible := vany.(structuredata.StructureData)
if !convertible {
return fmt.Errorf("expected vany to be convertible to structuredata, but didn't")
}
prevFields = append(prevFields, vst)
}
}
prevFieldsKeyOrder := []string{}
prevFieldsMap := map[string]structuredata.StructureData{}
for _, field := range prevFields {
keyValue, err := d.readScalar(field, mergeKey)
if err != nil {
return fmt.Errorf("merge key %s not found in prev field map", mergeKey)
}
hash, err := toHash(keyValue)
if err != nil {
return err
}
prevFieldsKeyOrder = append(prevFieldsKeyOrder, hash)
prevFieldsMap[hash] = field
}
patchFieldKeyOrder := []string{}
patchFieldMap := map[string]structuredata.StructureData{}
for _, field := range patchElements.Elements {
keyValue, err := d.readScalar(field, mergeKey)
if err != nil {
return fmt.Errorf("merge key %s not found in patch field map", mergeKey)
}
hash, err := toHash(keyValue)
if err != nil {
return err
}
patchFieldKeyOrder = append(patchFieldKeyOrder, hash)
patchFieldMap[hash] = field
}
deletedMap := map[string]struct{}{}
if d.deleteFromPrimitiveListDirective != nil {
for _, primitive := range d.deleteFromPrimitiveListDirective.list {
hash, err := toHash(primitive)
if err != nil {
return err
}
deletedMap[hash] = struct{}{}
}
}
for _, field := range patchElements.DeletedFields {
key, err := d.readScalarAsString(field, mergeKey)
if err != nil {
return err
}
deletedMap[key] = struct{}{}
}
keydiff := NewKeyDiffForArrayMerge(prevFieldsKeyOrder, patchFieldKeyOrder)
setElementOrderKeys := []string{}
if d.setElementOrderDirective != nil {
for _, orderKeyValue := range d.setElementOrderDirective.list {
if structuredata, convertible := orderKeyValue.(structuredata.StructureData); convertible {
orderKeyValue, err = d.readScalar(structuredata, mergeKey)
if err != nil {
return err
}
}
hash, err := toHash(orderKeyValue)
if err != nil {
return err
}
setElementOrderKeys = append(setElementOrderKeys, hash)
}
}
orderedKeys := reorderArrayKeysForMerge(prevFieldsKeyOrder, patchFieldKeyOrder, setElementOrderKeys)
bothKeys := map[string]struct{}{}
for _, key := range keydiff.Both {
bothKeys[key] = struct{}{}
}
for _, key := range orderedKeys {
if _, found := deletedMap[key]; found {
continue
}
if _, found := bothKeys[key]; found {
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: NewStrategicMergedStructureData(d.path+"[]", prevFieldsMap[key], patchFieldMap[key], d.mergeKeyResolver),
Index: "",
})
continue
}
if _, found := patchFieldMap[key]; found {
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: patchFieldMap[key],
Index: "",
})
continue
}
if _, found := prevFieldsMap[key]; found {
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: prevFieldsMap[key],
Index: "",
})
continue
}
complemented, err := structuredata.DataFromYaml(fmt.Sprintf("%s: %s", mergeKey, key))
if err != nil {
return err
}
d.arrayMergeResultSourceCache.References = append(d.arrayMergeResultSourceCache.References, &arrayMergeResultElementReference{
From: complemented,
Index: "",
})
}
}
return nil
}
func (d *StrategicMergedStructureData) readScalar(st structuredata.StructureData, fieldName string) (any, error) {
if fieldName == "" {
return st.Value(fieldName)
}
attr, err := st.Value(fieldName)
if err != nil {
return "", err
}
attrSt, convertible := attr.(structuredata.StructureData)
if !convertible {
return "", fmt.Errorf("expected the data to be structure data but wasn't")
}
attrStAny, err := attrSt.Value("")
if err != nil {
return "", err
}
return attrStAny, nil
}
func (d *StrategicMergedStructureData) readScalarAsString(st structuredata.StructureData, fieldName string) (string, error) {
fieldAny, err := d.readScalar(st, fieldName)
if err != nil {
return "", err
}
fieldString, convertible := fieldAny.(string)
if !convertible {
return "", fmt.Errorf("field %s can't convert into string", fieldName)
}
return fieldString, nil
}
func (d *StrategicMergedStructureData) readArray(st structuredata.StructureData, fieldName string) ([]any, error) {
attr, err := st.Value(fieldName)
if err != nil {
return nil, err
}
attrSt, convertible := attr.(structuredata.StructureData)
if !convertible {
return nil, fmt.Errorf("expected the data to be structure data but wasn't")
}
ty, err := attrSt.Type()
if err != nil {
return nil, err
}
if ty != structuredata.StructuredTypeArray {
return nil, fmt.Errorf("expected an array but %s was given", ty)
}
result := []any{}
keys, err := attrSt.Keys()
if err != nil {
return nil, err
}
for _, key := range keys {
r, err := d.readScalar(attrSt, key)
if err != nil {
return nil, err
}
result = append(result, r)
}
return result, nil
}
func (d *StrategicMergedStructureData) readStringArray(st structuredata.StructureData, fieldName string) ([]string, error) {
source, err := d.readArray(st, fieldName)
if err != nil {
return nil, err
}
result := []string{}
for _, val := range source {
valInStr, convertible := val.(string)
if !convertible {
return nil, fmt.Errorf("the array element can't be converted to string")
}
result = append(result, valInStr)
}
return result, nil
}
var _ structuredata.StructureData = (*StrategicMergedStructureData)(nil)
func splitPatchKeysByFieldsOrMergeAttributes(patchKeys []string) *strategicMergePatchKeys {
fieldKeys := []string{}
attributes := []string{}
for _, key := range patchKeys {
if key == "$patch" || key == "$retainKeys" || strings.HasPrefix(key, "$setElementOrder/") || strings.HasPrefix(key, "$deleteFromPrimitiveList/") {
attributes = append(attributes, key)
} else {
fieldKeys = append(fieldKeys, key)
}
}
return &strategicMergePatchKeys{
FieldKeys: fieldKeys,
StrategicMergeAttibutes: attributes,
}
}
func (d *StrategicMergedStructureData) splitPatchKeysByElementsOrMergeAttributesInArray(arrayData structuredata.StructureData) (*strategicMergePatchArrayEleemnts, error) {
elements := []structuredata.StructureData{}
deletedFields := []structuredata.StructureData{}
patchByReplace := false
keys, err := arrayData.Keys()
if err != nil {
return nil, err
}
for _, key := range keys {
scalarOrMapAny, err := arrayData.Value(key)
if err != nil {
return nil, err
}
scalarOrMap, convertible := scalarOrMapAny.(structuredata.StructureData)
if !convertible {
return nil, fmt.Errorf("failed to convert to structuredata.StructureData")
}
ty, err := scalarOrMap.Type()
if err != nil {
return nil, err
}
if ty != structuredata.StructuredTypeMap {
elements = append(elements, scalarOrMap)
} else {
patchField, err := d.readScalarAsString(scalarOrMap, "$patch")
if err == nil {
if patchField == "replace" {
patchByReplace = true
} else if patchField == "delete" {
deletedFields = append(deletedFields, scalarOrMap)
}
continue
}
elements = append(elements, scalarOrMap)
}
}
return &strategicMergePatchArrayEleemnts{
Elements: elements,
Replace: patchByReplace,
DeletedFields: deletedFields,
}, nil
}
func removeSetElements(list []any, patch []any, remove []any) ([]any, error) {
retained := map[string]struct{}{}
for _, v := range list {
vhash, err := toHash(v)
if err != nil {
return nil, err
}
retained[vhash] = struct{}{}
}
for _, v := range patch {
vhash, err := toHash(v)
if err != nil {
return nil, err
}
retained[vhash] = struct{}{}
}
for _, v := range remove {
vhash, err := toHash(v)
if err != nil {
return nil, err
}
delete(retained, vhash)
}
result := []any{}
for _, v := range list {
vhash, err := toHash(v)
if err != nil {
return nil, err
}
if _, found := retained[vhash]; found {
result = append(result, v)
}
}
for _, v := range patch {
vhash, err := toHash(v)
if err != nil {
return nil, err
}
if _, found := retained[vhash]; found {
result = append(result, v)
}
}
return result, nil
}
func reorderArrayKeysForMerge(prev []string, patch []string, setElementOrderDirective []string) []string {
parallelList := []string{}
liveList := []string{}
liveOnlyList := []string{}
patchMap := toIndexMap(patch)
prevMap := toIndexMap(prev)
directiveMap := toIndexMap(setElementOrderDirective)
for _, directiveElem := range setElementOrderDirective {
if _, found := patchMap[directiveElem]; !found {
if _, found := prevMap[directiveElem]; !found {
// Assume the item was existing from the past
liveList = append(liveList, directiveElem)
}
}
}
for _, elem := range prev {
isLiveOnly := false
if _, found := directiveMap[elem]; !found {
if _, found := patchMap[elem]; !found {
isLiveOnly = true
}
}
if isLiveOnly {
liveOnlyList = append(liveOnlyList, elem)
} else {
liveList = append(liveList, elem)
}
}
parallelList = append(parallelList, liveList...)
parallelList = append(parallelList, patch...)
parallelList = append(parallelList, setElementOrderDirective...)
slices.Sort(parallelList)
parallelList = slices.Compact(parallelList)
liveMap := toIndexMap(liveList)
liveOnlyMap := toIndexMap(liveOnlyList)
slices.SortStableFunc(parallelList, func(a, b string) int {
if bothInMap(directiveMap, a, b) {
return directiveMap[a] - directiveMap[b]
}
if bothInMap(patchMap, a, b) {
return patchMap[a] - patchMap[b]
}
if bothInMap(liveMap, a, b) {
return liveMap[a] - liveMap[b]
}
if _, found := liveMap[a]; found {
return -1
}
return 1
})
slices.SortStableFunc(liveOnlyList, func(a, b string) int {
if bothInMap(directiveMap, a, b) {
return directiveMap[a] - directiveMap[b]
}
if bothInMap(liveOnlyMap, a, b) {
return liveOnlyMap[a] - liveOnlyMap[b]
}
if _, found := directiveMap[a]; found {
return -1
}
return 1
})
return append(liveOnlyList, parallelList...)
}
func toHash(v any) (string, error) {
if vStr, convertible := v.(string); convertible {
return vStr, nil
}
if vInt, convertible := v.(int); convertible {
return strconv.Itoa(vInt), nil
}
if vBool, convertible := v.(bool); convertible {
return strconv.FormatBool(vBool), nil
}
return "", fmt.Errorf("given type was not hashable")
}
func toIndexMap(arr []string) map[string]int {
result := map[string]int{}
for i, elem := range arr {
result[elem] = i
}
return result
}
func bothInMap[T any](mapSet map[string]T, a string, b string) bool {
_, foundA := mapSet[a]
if !foundA {
return false
}
_, foundB := mapSet[b]
return foundB
}
func stringSequence(length int) []string {
result := []string{}
for i := 0; i < length; i++ {
result = append(result, strconv.Itoa(i))
}
return result
}