wstl1/mapping_engine/mapping/engine.go (454 lines of code) (raw):

// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package mapping contains methods and mechanisms for executing a mapping config. package mapping import ( "errors" "fmt" "strconv" "strings" "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/builtins" /* copybara-comment: builtins */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/types" /* copybara-comment: types */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/util/jsonutil" /* copybara-comment: jsonutil */ errs "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/errors" /* copybara-comment: errors */ mappb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: mapping_go_proto */ ) // Engine defines an interface for mapping processing engine. type Engine interface { ProcessMappings(maps []*mappb.FieldMapping, projName string, args []jsonutil.JSONMetaNode, output *jsonutil.JSONToken, pctx *types.Context) error EvaluateValueSource(vs *mappb.ValueSource, args []jsonutil.JSONMetaNode, output jsonutil.JSONToken, pctx *types.Context) (jsonutil.JSONMetaNode, error) } func checkCondition(conditionVs *mappb.ValueSource, args []jsonutil.JSONMetaNode, output *jsonutil.JSONToken, pctx *types.Context, a jsonutil.JSONTokenAccessor) (bool, error) { cond, err := EvaluateValueSource(conditionVs, args, *output, pctx, a) if err != nil { return false, err } cp, ok := cond.(jsonutil.JSONMetaPrimitiveNode) if ok { cb, ok := cp.Value.(jsonutil.JSONBool) if ok { return bool(cb), nil } } t, err := jsonutil.NodeToToken(cond) if err != nil { return false, err } notNil, err := builtins.IsNotNil(t) return bool(notNil), err } func isNil(src jsonutil.JSONToken) bool { result, _ := builtins.IsNil(src) return bool(result) } func postProcessValue(value jsonutil.JSONToken) jsonutil.JSONToken { switch t := value.(type) { case jsonutil.JSONStr: // Only trim if the string is more than just whitespace. if trimmed := strings.TrimSpace(string(t)); len(trimmed) > 0 { return jsonutil.JSONStr(trimmed) } } return value } func addObject(src jsonutil.JSONToken, targetObject string, pctx *types.Context) { // Prevent nested arrays showing up in top level objects. There is no real use // case where arrays should show up in the top level objects list. We // especially want to do this here (as opposed to later), to avoid having a // situation where the list is in a bad state and something tries to read from // it during mapping. var objs jsonutil.JSONArr if sa, ok := src.(jsonutil.JSONArr); ok { objs = sa for _, obj := range objs { addObject(obj, targetObject, pctx) } } else { pctx.TopLevelObjects[targetObject] = append(pctx.TopLevelObjects[targetObject], src) } } // EvaluateValueSource "interprets" a single value source. The source is converted into a JSONToken // representation of its value (or an error). // More specifically, the value of each argument is either evaluated or fetched from the corresponding source/environment. // Then each set of argument is passed into the projector, the resulting value is stored into a JSONMetaNode. func EvaluateValueSource(vs *mappb.ValueSource, args []jsonutil.JSONMetaNode, output jsonutil.JSONToken, pctx *types.Context, a jsonutil.JSONTokenAccessor) (jsonutil.JSONMetaNode, error) { if vs == nil { return nil, errors.New("nil value source pointer") } nextArgs := make([]jsonutil.JSONMetaNode, 0, 1) var iterableIndicies []bool var anyArgsToIterate bool if vs.GetSource() != nil { arg, err := evaluateValueSourceSource(vs, args, output, pctx, a) if err != nil { return nil, err } nextArgs = append(nextArgs, arg) iterableIndicies = append(iterableIndicies, isArray(vs)) anyArgsToIterate = isArray(vs) for _, s := range vs.AdditionalArg { arg, err := EvaluateValueSource(s, args, output, pctx, a) if err != nil { return nil, errs.Wrap(errs.NewProtoLocation(s, vs), err) } nextArgs = append(nextArgs, arg) // Check if we need to enumerate each additional arg (based on whether it/it's projector is enumerated) shouldIterate := (isArray(s) && s.GetProjector() == "") || isSelectorArray(s.GetProjector()) iterableIndicies = append(iterableIndicies, shouldIterate) anyArgsToIterate = anyArgsToIterate || shouldIterate } } projName := strings.TrimSuffix(vs.Projector, "[]") proj, err := pctx.Registry.FindProjector(projName) if err != nil { return nil, fmt.Errorf("error finding projector: %v", err) } if anyArgsToIterate { if len(nextArgs) == 0 { return nil, errors.New("source was enumerated (ended with []) but source itself did not exist (?)") } // Zip the arguments together - enumerate any additional args that need it. zippedArgs, err := zip(nextArgs, iterableIndicies) if err != nil { return nil, fmt.Errorf("error zipping args: %v", err) } projVals := make([]jsonutil.JSONMetaNode, 0) for _, args := range zippedArgs { sources := []string{} for _, s := range args { if s != nil { sources = append(sources, s.ProvenanceString()) } else { sources = append(sources, "null") } } pv, err := proj(args, pctx) if err != nil { return nil, errs.Wrap(errs.Locationf("Iterated arguments %q", strings.Join(sources, ", ")), err) } pv = postProcessValue(pv) if isNil(pv) { continue } pvm, err := jsonutil.TokenToNodeWithProvenance(pv, "", jsonutil.Provenance{ Sources: args, Function: projName, }) if err != nil { return nil, err } projVals = append(projVals, pvm) } return jsonutil.JSONMetaArrayNode{ Items: projVals, JSONMeta: jsonutil.NewJSONMeta("", jsonutil.Provenance{ Sources: nextArgs, Function: projName, }), }, nil } pv, err := proj(nextArgs, pctx) if err != nil { return nil, err } if projName == "" && len(nextArgs) == 1 && nextArgs[0] != nil { return jsonutil.TokenToNodeWithProvenance(pv, nextArgs[0].Key(), nextArgs[0].Provenance()) } return jsonutil.TokenToNodeWithProvenance(pv, "", jsonutil.Provenance{ Sources: nextArgs, Function: projName, }) } func isArray(vs *mappb.ValueSource) bool { var selector string switch s := vs.Source.(type) { case *mappb.ValueSource_FromSource: selector = s.FromSource case *mappb.ValueSource_FromDestination: selector = s.FromDestination case *mappb.ValueSource_FromLocalVar: selector = s.FromLocalVar case *mappb.ValueSource_FromInput: selector = s.FromInput.Field case *mappb.ValueSource_ProjectedValue: selector = s.ProjectedValue.Projector default: return false } return isSelectorArray(selector) } func isSelectorArray(selector string) bool { return strings.HasSuffix(selector, "[]") } // zip allows synchronized iteration of some arrays along with non-arrays. // Given some values, and an equal number of iterable flags; For any index where an iterable flag // is true and the value is an array - the array is expanded. For example // zip(["foo", "bar", "baz", [1, 2, 3, 4]], nil or [false, false, false, false]) returns // [ // ["foo", "bar", "baz", [1, 2, 3, 4]], // ] // zip(["foo", "bar", "baz", [1, 2, 3]], [false, false, false, true]) returns // [ // ["foo", "bar", "baz", 1], // ["foo", "bar", "baz", 2], // ["foo", "bar", "baz", 3], // ] // zip([["one", "two", "three"], [a, b], [1, 2, 3]], [true, false, true]) returns // [ // ["one", [a, b], 1], // ["two", [a, b], 2], // ["three", [a, b], 3], // ] // zip(["foo", "bar", "baz", []], [false, false, false, true]) returns // [ ] (empty list) // zip(["foo", "bar", "baz", [1, 2, 3], []], [false, false, false, true, true]) returns // [ ] (empty list) func zip(values []jsonutil.JSONMetaNode, iterables []bool) ([][]jsonutil.JSONMetaNode, error) { if len(values) != len(iterables) { return nil, fmt.Errorf("this is an internal bug: number of values (%d) did not match number of iterable flags (%d)", len(values), len(iterables)) } // Validate that things flagged to be iterated are actually iterated. baseLen := 0 basePath := "" if iterables != nil { for i, a := range values { if !iterables[i] { continue } if a == nil { a = jsonutil.JSONMetaArrayNode{} values[i] = a } arr, ok := a.(jsonutil.JSONMetaArrayNode) if !ok { return nil, fmt.Errorf("can't iterate non-array %q (it was the %s argument in the function call)", a.ProvenanceString(), errs.SuffixNumber(i+1)) } if baseLen == 0 { baseLen = len(arr.Items) basePath = arr.ProvenanceString() } if len(arr.Items) != baseLen && len(arr.Items) > 0 { return nil, fmt.Errorf("can't zip/iterate arrays of different sizes together (%q had %d items, but %q had %d)", basePath, baseLen, arr.ProvenanceString(), len(arr.Items)) } } } if baseLen == 0 { return [][]jsonutil.JSONMetaNode{}, nil } // Zip together iterables and non-iterables. var res [][]jsonutil.JSONMetaNode for i := 0; i < baseLen; i++ { z := []jsonutil.JSONMetaNode{} for j, a := range values { if iterables != nil && iterables[j] { arr := values[j].(jsonutil.JSONMetaArrayNode) if len(arr.Items) > 0 { z = append(z, arr.Items[i]) } else { z = append(z, nil) } } else { z = append(z, a) } } res = append(res, z) } return res, nil } // evaluateValueSourceSource triages and extracts the specific value of a ValueSource.Source oneof // (which can be one of various constant types, or a reference to an input field, variable, or // output field). func evaluateValueSourceSource(vs *mappb.ValueSource, args []jsonutil.JSONMetaNode, output jsonutil.JSONToken, pctx *types.Context, a jsonutil.JSONTokenAccessor) (jsonutil.JSONMetaNode, error) { var metaNode jsonutil.JSONMetaNode var err error var location string switch s := vs.Source.(type) { // Constants: case *mappb.ValueSource_ConstString: location = "const string" metaNode, err = jsonutil.TokenToNodeWithProvenance(jsonutil.JSONStr(s.ConstString), fmt.Sprintf("%q", s.ConstString), jsonutil.Provenance{}) case *mappb.ValueSource_ConstInt: location = "const int" metaNode, err = jsonutil.TokenToNodeWithProvenance(jsonutil.JSONNum(s.ConstInt), fmt.Sprintf("%d", s.ConstInt), jsonutil.Provenance{}) case *mappb.ValueSource_ConstFloat: location = "const float" metaNode, err = jsonutil.TokenToNodeWithProvenance(jsonutil.JSONNum(s.ConstFloat), fmt.Sprintf("%f", s.ConstFloat), jsonutil.Provenance{}) case *mappb.ValueSource_ConstBool: location = "const bool" metaNode, err = jsonutil.TokenToNodeWithProvenance(jsonutil.JSONBool(s.ConstBool), fmt.Sprintf("%v", s.ConstBool), jsonutil.Provenance{}) // More complicated things: case *mappb.ValueSource_FromSource: location = fmt.Sprintf("From Source %q", s.FromSource) as, asErr := fromSourceToArgSource(s, args) if asErr != nil { return nil, asErr } metaNode, err = EvaluateArgSource(as, args, pctx) case *mappb.ValueSource_FromDestination: location = fmt.Sprintf("From Destination %q", s.FromDestination) token, lerr := EvaluateFromDestination(s, output, a) if lerr != nil { return nil, lerr } metaNode, err = jsonutil.TokenToNodeWithProvenance(token, fmt.Sprintf("%s's output field %s", pctx.Projector(), s.FromDestination), jsonutil.Provenance{}) case *mappb.ValueSource_FromLocalVar: location = fmt.Sprintf("From Var %q", s.FromLocalVar) // TODO(): Provenance support for vars. token, lerr := EvaluateFromVar(s, pctx, a) if lerr != nil { return nil, lerr } metaNode, err = jsonutil.TokenToNodeWithProvenance(token, fmt.Sprintf("%s's var %s", pctx.Projector(), s.FromLocalVar), jsonutil.Provenance{}) case *mappb.ValueSource_ProjectedValue: if s.ProjectedValue.Projector != "" { location = "Argument for " + s.ProjectedValue.Projector } else { location = "Nested expression" } metaNode, err = EvaluateValueSource(s.ProjectedValue, args, output, pctx, a) // TODO(): token Key = Gvid(); Parent = common ancestor of all args // No need to mutate parent though. case *mappb.ValueSource_FromArg: if s.FromArg < 0 || int(s.FromArg) > len(args) { return nil, fmt.Errorf("from_arg is out of range. Requested arg %d but projector only got %d", s.FromArg, len(args)) } location = fmt.Sprintf("from arg %d", s.FromArg) if s.FromArg == 0 { location += " (all args)" metaNode = jsonutil.JSONMetaArrayNode{Items: args} } else { metaNode = args[s.FromArg-1] } case *mappb.ValueSource_FromInput: metaNode, err = EvaluateArgSource(s.FromInput, args, pctx) location = fmt.Sprintf("input arg %d field %q", s.FromInput.Arg, s.FromInput.Field) default: return nil, fmt.Errorf("unknown value source %T", vs.Source) } if err != nil { return nil, errs.Wrap(errs.Locationf(location), err) } return metaNode, nil } func fromSourceToArgSource(source *mappb.ValueSource_FromSource, args []jsonutil.JSONMetaNode) (*mappb.ValueSource_InputSource, error) { if len(args) == 0 { return &mappb.ValueSource_InputSource{ Field: source.FromSource, }, nil } if len(args) == 1 { return &mappb.ValueSource_InputSource{ // Default to first/only arg Arg: 1, Field: source.FromSource, }, nil } segs, err := jsonutil.SegmentPath(source.FromSource) if err != nil { return nil, fmt.Errorf("error parsing source %q: %v", source.FromSource, err) } if len(segs) == 0 { return nil, fmt.Errorf("error parsing source - was empty but projector had %d args (so at least an argument index is required)", len(args)) } argIdx, err := strconv.Atoi(segs[0]) if err != nil { return &mappb.ValueSource_InputSource{ Field: source.FromSource, }, nil } return &mappb.ValueSource_InputSource{ Arg: int32(argIdx), Field: strings.ReplaceAll(strings.Join(segs[1:], "."), ".[", "["), }, nil } // EvaluateArgSource extracts a specified value from the arguments (or their subfields) or the // context. Given a FromSource pb message, the function will check the args (which should be passed // down from whatever projector this FromSource is used from), and/or the context, deciding on its // own whether the former or latter is appropriate based on the content of FromSource. func EvaluateArgSource(vs *mappb.ValueSource_InputSource, args []jsonutil.JSONMetaNode, pctx *types.Context) (jsonutil.JSONMetaNode, error) { // We need to find the argument (or subfield) or pctx referred to by this source. segs, err := jsonutil.SegmentPath(vs.Field) if err != nil { return nil, fmt.Errorf("error parsing source %s: %v", vs.Field, err) } if len(segs) == 0 && vs.Arg == 0 { return nil, errors.New("empty arg_source - needs to either have a valid argument index, a valid JSON path [without argument index refers to a input context value], or both") } var targetObj jsonutil.JSONMetaNode if vs.Arg < 0 || int(vs.Arg) > len(args) { return nil, fmt.Errorf("from_input.arg %d is out of range must be [0, %d]", vs.Arg, len(args)) } // Remove array indicator ([]) suffix. if len(segs) > 0 && segs[len(segs)-1] == "[]" { segs = segs[0 : len(segs)-1] } if vs.Arg == 0 { targetObj, err = getValueFromContext(args, segs, pctx) if err != nil { return nil, fmt.Errorf("error getting value %q from input context: %v", vs.Field, err) } } else { targetObj, err = jsonutil.GetNodeFieldSegmented(args[vs.Arg-1], segs) if err != nil { return nil, fmt.Errorf("error getting field %q from %q: %v", vs.Field, args[vs.Arg-1].ProvenanceString(), err) } } return targetObj, err } func getValueFromContext(args []jsonutil.JSONMetaNode, segs []string, pctx *types.Context) (jsonutil.JSONMetaNode, error) { var node jsonutil.JSONMetaNode var remSegs []string var err error if node, remSegs, err = ParentInfoFromArgs(args, segs); err != nil { return nil, err } return jsonutil.GetNodeFieldSegmented(node, remSegs) } // ParentInfoFromArgs finds the longest (prefix) subset of the given path segments that are // present in the argument ancestors. Returns this ancestor and the remaining path relative to it. func ParentInfoFromArgs(args []jsonutil.JSONMetaNode, segs []string) (jsonutil.JSONMetaNode, []string, error) { if len(args) == 0 { return nil, segs, nil } var bestCommonAnc jsonutil.JSONMetaNode var remSegs []string for _, arg := range args { if arg == nil { continue } root := arg for root.Parent() != nil { root = root.Parent() } // Make sure this isn't just some const string arg. if _, ok := root.(jsonutil.JSONMetaPrimitiveNode); ok { continue } p := arg.Path() argSegs, err := jsonutil.SegmentPath(p) if err != nil { return nil, segs, fmt.Errorf("argument %q does not have a valid path %s: %v", arg.ProvenanceString(), p, err) } commonAnc, rem, err := getCommonAncestor(root, argSegs, segs) // The best common node is the one closest to the leaf we want. if bestCommonAnc == nil || len(rem) < len(remSegs) { bestCommonAnc = commonAnc remSegs = rem } } return bestCommonAnc, remSegs, nil } // getCommonAncestor finds the last common node of two paths in a JSON tree. It takes one fully // qualified path (i.e. a path with all array indices filled in) and one partly qualified path (i.e. // a path missing array indices, up to the point where it diverges from the fully qualified path). // It then traverses the tree until it finds a place where the two paths diverge, and returns that // last common node both paths shared, along with the suffix of the second path that diverged from // the fully qualified path. // // e.x. Given a JSON object, called obj: // { // "commonA": { // "arrayA": [ // { // "commonB": { // "foo": 1337, // "bar": 9999 // } // } // ], // "baz": { // "num": 1234 // } // } // } // // getCommonAncestor(obj, // ["commonA", "arrayA", "[0]", "commonB", "foo"], // ["commonA", "arrayA", "commonB", "bar"]) // returns { "foo": 1337, "bar": 9999 }, ["bar"] // // getCommonAncestor(obj, // ["commonA", "arrayA", "[0]", "commonB", "foo"], // ["commonA", "baz", "num"] // ) // returns { "arrayA": [...], "baz": { "num": 1234 } }, ["baz", "num"] func getCommonAncestor(baseNode jsonutil.JSONMetaNode, fullyQualifiedPath []string, partlyQualifiedPath []string) (jsonutil.JSONMetaNode, []string, error) { commonAnc := baseNode var fullIndex, partialIndex int var err error for ; partialIndex < len(partlyQualifiedPath); partialIndex++ { // Advance cursor for full path while it has array indices that the other path doesn't. for ; fullIndex < len(fullyQualifiedPath) && jsonutil.IsIndex(fullyQualifiedPath[fullIndex]) && !jsonutil.IsIndex(partlyQualifiedPath[partialIndex]); fullIndex++ { commonAnc, err = jsonutil.GetNodeField(commonAnc, fullyQualifiedPath[fullIndex]) if err != nil { return nil, partlyQualifiedPath, fmt.Errorf("error traversing to ancestor %v: %v", fullyQualifiedPath[:fullIndex+1], err) } } // Check if the two paths diverge yet. if fullIndex >= len(fullyQualifiedPath) || partlyQualifiedPath[partialIndex] != fullyQualifiedPath[fullIndex] { break } // Advance cursor for both paths (they are still in sync). commonAnc, err = jsonutil.GetNodeField(commonAnc, fullyQualifiedPath[fullIndex]) if err != nil { return nil, partlyQualifiedPath, fmt.Errorf("error traversing to ancestor %v: %v", fullyQualifiedPath[:fullIndex+1], err) } fullIndex++ } return commonAnc, partlyQualifiedPath[partialIndex:], nil } // EvaluateFromDestination returns the field from the output specified by the given path. func EvaluateFromDestination(vs *mappb.ValueSource_FromDestination, output jsonutil.JSONToken, a jsonutil.JSONTokenAccessor) (jsonutil.JSONToken, error) { return readField(output, vs.FromDestination, a) } // EvaluateFromVar returns the context variable with the given name, or an error if it was // not found. func EvaluateFromVar(vs *mappb.ValueSource_FromLocalVar, pctx *types.Context, a jsonutil.JSONTokenAccessor) (jsonutil.JSONToken, error) { v, name, err := getVar(vs.FromLocalVar, pctx) if err != nil { return nil, err } return readField(v, strings.TrimPrefix(strings.TrimPrefix(vs.FromLocalVar, name), "."), a) } type undefinedVarError struct { name string } func (e undefinedVarError) Error() string { return fmt.Sprintf("attempted to access undefined var %s", e.name) } // getVar returns the context variable with the given path and the variable name // extracted from the path, or returns an undefinedVarError if the variable is // not defined in the context. func getVar(source string, pctx *types.Context) (jsonutil.JSONToken, string, error) { src := strings.TrimSuffix(strings.TrimSuffix(source, "!"), "[]") path, err := jsonutil.SegmentPath(src) if err != nil { return nil, "", fmt.Errorf("error parsing var accessor %q: %v", src, err) } if len(path) == 0 { return nil, "", fmt.Errorf("no valid var accessor specified (%q is not valid)", src) } name := path[0] var v *jsonutil.JSONToken if v, err = pctx.Variables.Get(name); err != nil { return nil, name, fmt.Errorf("error accessing var %s: %v", src, err) } if v == nil { return jsonutil.JSONToken(nil), name, nil } return *v, name, nil } func readField(src jsonutil.JSONToken, field string, a jsonutil.JSONTokenAccessor) (jsonutil.JSONToken, error) { return a.GetField(src, strings.TrimSuffix(field, "[]")) } func writeField(src jsonutil.JSONToken, field string, dest *jsonutil.JSONToken, forceOverwrite bool, srcIterate bool, a jsonutil.JSONTokenAccessor) error { return a.SetField(src, strings.TrimSuffix(field, "!"), dest, forceOverwrite || strings.HasSuffix(field, "!"), srcIterate) } func isSrcIteratable(vs *mappb.ValueSource) bool { if strings.HasSuffix(vs.Projector, "[]") { return true } if vs.Projector == "" { if vs.GetSource() != nil { return isArray(vs) } } return false }