wstl1/mapping_engine/transform/transform.go (286 lines of code) (raw):

// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package transform contains methods to transform json trees as specified by the config. package transform import ( "context" "encoding/json" "fmt" "io/ioutil" "google.golang.org/protobuf/encoding/prototext" /* copybara-comment: prototext */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/auth" /* copybara-comment: auth */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/cloudfunction" /* copybara-comment: cloudfunction */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/errors" /* copybara-comment: errors */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/fetch" /* copybara-comment: fetch */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/harmonization/harmonizecode" /* copybara-comment: harmonizecode */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/harmonization/harmonizeunit" /* copybara-comment: harmonizeunit */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/mapping" /* copybara-comment: mapping */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/postprocess" /* copybara-comment: postprocess */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/projector" /* copybara-comment: projector */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/types/register_all" /* copybara-comment: registerall */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/types" /* copybara-comment: types */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/util/gcsutil" /* copybara-comment: gcsutil */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/util/jsonutil" /* copybara-comment: jsonutil */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_language/transpiler" /* copybara-comment: transpiler */ dhpb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: data_harmonization_go_proto */ hapb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: harmonization_go_proto */ httppb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: http_go_proto */ libpb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: library_go_proto */ mappb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: mapping_go_proto */ ) // Transformer defines an interface to perform transformations. type Transformer interface { // Transform transforms given JSONToken (parsed JSON) into a target JSONToken using the // config. Transform(jsonutil.JSONToken) (jsonutil.JSONToken, error) // JSONtoJSON transforms given raw JSON into a target raw JSON using the config. JSONtoJSON(json.RawMessage) (json.RawMessage, error) // ParseJSON parses given raw JSON into a JSONToken. ParseJSON(json.RawMessage) (jsonutil.JSONToken, error) // LoadProjectors registers all given projectors in the config. LoadProjectors([]*mappb.ProjectorDefinition) error // Registry returns the registry used in Transformer. Registry() *types.Registry // HasPostProcessProjector returns true iff a post process projector is set. HasPostProcessProjector() bool } // DefaultTransformer contains projectors initialized for a specific config, and receiver methods // to perform transformations. type DefaultTransformer struct { registry *types.Registry dataHarmonizationConfig *dhpb.DataHarmonizationConfig mappingConfig *mappb.MappingConfig transformationConfig TransformationConfig } // TransformationConfig contains metadata used during transformation. type TransformationConfig struct { LogTrace bool SkipBundling bool } // Options for initializing Data Harmonization transform library type Options struct { // CloudFunctions enables support for cloud functions within the transform library. CloudFunctions bool // FetchConfigs enables support for fetch configurations wtihin the transform library. FetchConfigs bool // Parallel enables support for parallelization within the transform library. Parallel bool // GCSClient is a client for downloading from GCS. If unset, the default third party GCS client will be used. GCSClient gcsutil.StorageClient } // Option is a setter function for Options. type Option func(*Options) // CloudFunctions initializes the CloudFunctions transform option. func CloudFunctions(enabledCloudFunctions bool) Option { return func(args *Options) { args.CloudFunctions = enabledCloudFunctions } } // FetchConfigs initializes the FetchConfigs transform option. func FetchConfigs(enableFetchConfig bool) Option { return func(args *Options) { args.FetchConfigs = enableFetchConfig } } // Parallel initializes the Parallel transform option. func Parallel(enableParallel bool) Option { return func(args *Options) { args.Parallel = enableParallel } } // GCSClient sets the GCSClient in the transform option. func GCSClient(c gcsutil.StorageClient) Option { return func(args *Options) { args.GCSClient = c } } // NewTransformer creates and initializes a transformer, and returns a new DefaultTransformer by // default. func NewTransformer(ctx context.Context, config *dhpb.DataHarmonizationConfig, tconfig TransformationConfig, setters ...Option) (Transformer, error) { return NewDefaultTransformer(ctx, config, tconfig, setters...) } // NewDefaultTransformer creates and initializes a default transformer. func NewDefaultTransformer(ctx context.Context, config *dhpb.DataHarmonizationConfig, tconfig TransformationConfig, setters ...Option) (*DefaultTransformer, error) { t := &DefaultTransformer{ registry: types.NewRegistry(), dataHarmonizationConfig: config, transformationConfig: tconfig, } if err := registerall.RegisterAll(t.registry); err != nil { return nil, err } options := &Options{} for _, setter := range setters { setter(options) } gcsutil.InitializeClient(options.GCSClient) if hc := config.GetHarmonizationConfig(); hc != nil { if err := harmonizecode.LoadCodeHarmonizationProjectors(t.registry, hc); err != nil { return nil, err } } if uc := config.GetUnitHarmonizationConfig(); uc != nil { if err := harmonizeunit.LoadUnitHarmonizationProjectors(t.registry, uc); err != nil { return nil, err } } mpc, err := t.LoadMappingConfig(config) if err != nil { return nil, err } t.mappingConfig = mpc if err := t.LoadProjectors(mpc.GetProjector()); err != nil { return nil, err } // Load the library configurations. for _, lc := range config.GetLibraryConfig() { if err := t.LoadProjectors(lc.Projector); err != nil { return nil, err } if options.CloudFunctions { if err := cloudfunction.LoadCloudFunctionProjectors(t.registry, lc.CloudFunction); err != nil { return nil, err } } else { if len(lc.CloudFunction) != 0 { return nil, &invalidCloudFunctionProjectorError{lc: lc} } } if err := auth.LoadServerConfigs(lc.Servers); err != nil { return nil, err } if options.FetchConfigs { if err := fetch.LoadFetchProjectors(context.Background(), t.registry, lc.HttpQuery); err != nil { return nil, err } } else { if len(lc.HttpQuery) != 0 { return nil, &invalidFetchProjectorError{lc: lc} } } for _, lib := range lc.GetUserLibraries() { mpc, err := loadMappingConfig(lib.GetPath(), lib.GetType()) if err != nil { return nil, err } // Custom library configs should only have projectors defined, all else is ignored. if err := t.LoadProjectors(mpc.GetProjector()); err != nil { return nil, err } } } return t, nil } type invalidCloudFunctionProjectorError struct { lc *libpb.LibraryConfig } type invalidFetchProjectorError struct { lc *libpb.LibraryConfig } func (e invalidCloudFunctionProjectorError) Error() string { var names []string for _, cf := range e.lc.CloudFunction { names = append(names, cf.Name) } return fmt.Sprintf("attempting to use disabled Cloud Function projectors feature with projectors: %v", names) } func (e invalidFetchProjectorError) Error() string { var names []string for _, cf := range e.lc.HttpQuery { names = append(names, cf.Name) } return fmt.Sprintf("attempting to use disabled Fetch projectors feature with projectors: %v", names) } // Project is a convenience function to call a single projector out of context. func (t *DefaultTransformer) Project(projector string, args ...jsonutil.JSONMetaNode) (res jsonutil.JSONToken, err error) { pctx := types.NewContext(t.registry) defer errors.Recover("Project", func(e error) { err = e }) proj, err := t.registry.FindProjector(projector) if err != nil { return nil, err } res, err = proj(args, pctx) return } // LoadMappingConfig loads the mapping config inline or from a GCS path. func (t *DefaultTransformer) LoadMappingConfig(config *dhpb.DataHarmonizationConfig) (*mappb.MappingConfig, error) { mpc := &mappb.MappingConfig{} if sm := config.GetStructureMappingConfig(); sm != nil { switch mapping := config.GetStructureMappingConfig().Mapping.(type) { case *hapb.StructureMappingConfig_MappingConfig: return mapping.MappingConfig, nil case *hapb.StructureMappingConfig_MappingPathConfig: return loadMappingConfig(mapping.MappingPathConfig.MappingConfigPath, mapping.MappingPathConfig.MappingType) case *hapb.StructureMappingConfig_MappingLanguageString: return transpiler.Transpile(mapping.MappingLanguageString) default: return nil, fmt.Errorf("unsupported structure mapping config type: %v", mapping) } } return mpc, nil } // LoadProjectors registers all given projectors. func (t *DefaultTransformer) LoadProjectors(projectors []*mappb.ProjectorDefinition) error { for _, pd := range projectors { p := projector.FromDef(pd, mapping.NewWhistler()) if err := t.registry.RegisterProjector(pd.Name, p); err != nil { return fmt.Errorf("error registering projector %s: %v", pd.Name, err) } } return nil } // Transform converts the json tree using the specified config. func (t *DefaultTransformer) Transform(in jsonutil.JSONToken) (res jsonutil.JSONToken, err error) { pctx := types.NewContext(t.registry) defer errors.Recover("Transform", func(e error) { err = e }) pctx.Variables.Push() inn, err := jsonutil.TokenToNode(in) if err != nil { return nil, fmt.Errorf("input was invalid: %v", err) } args := []jsonutil.JSONMetaNode{inn} e := mapping.NewWhistler() if err := e.ProcessMappings(t.mappingConfig.RootMapping, "root", args, pctx.Output, pctx); err != nil { return nil, err } result, err := postprocess.Process(pctx, t.mappingConfig, t.transformationConfig.SkipBundling, e) if err != nil { return nil, err } return result, nil } // JSONtoJSON converts the byte array (JSON format) using the specified config. func (t *DefaultTransformer) JSONtoJSON(in json.RawMessage) (json.RawMessage, error) { ji, err := t.ParseJSON(in) if err != nil { return nil, err } res, err := t.Transform(ji) if err != nil { return nil, err } return json.Marshal(res) } // ParseJSON parses the given JSON into a JSONToken. func (t *DefaultTransformer) ParseJSON(in json.RawMessage) (jsonutil.JSONToken, error) { mc, err := jsonutil.UnmarshalJSON(in) if err != nil { return nil, err } return mc, nil } // Registry returns the registry in DefaultTransformer. func (t *DefaultTransformer) Registry() *types.Registry { return t.registry } // RegisterProjector adds the given Projector to this transformer's registry. func (t *DefaultTransformer) RegisterProjector(name string, proj types.Projector) error { return t.registry.RegisterProjector(name, proj) } // HasPostProcessProjector returns true iff a post process projector is set. func (t *DefaultTransformer) HasPostProcessProjector() bool { return t.mappingConfig.GetPostProcessProjectorDefinition() != nil || t.mappingConfig.GetPostProcessProjectorName() != "" } // loadMappingConfig loads a mapping config from GCS. func loadMappingConfig(loc *httppb.Location, typ hapb.MappingType) (*mappb.MappingConfig, error) { var data []byte switch l := loc.Location.(type) { case *httppb.Location_GcsLocation: d, err := gcsutil.ReadFromGcs(context.Background(), l.GcsLocation) if err != nil { return nil, fmt.Errorf("failed to read mapping config from GCS, %v", err) } data = d case *httppb.Location_LocalPath: d, err := ioutil.ReadFile(l.LocalPath) if err != nil { return nil, fmt.Errorf("failed to read library file with error %v", err) } data = d case *httppb.Location_UrlPath: return nil, fmt.Errorf("loading mappings from remote path %s is unsupported", l.UrlPath) default: return nil, fmt.Errorf("location type %T is unsupported", l) } mpc := &mappb.MappingConfig{} switch typ { case hapb.MappingType_RAW_PROTO: if err := prototext.Unmarshal(data, mpc); err != nil { return nil, err } case hapb.MappingType_MAPPING_LANGUAGE: lmpc, err := transpiler.Transpile(string(data)) if err != nil { return nil, err } mpc = lmpc default: return nil, fmt.Errorf("invalid mapping config type %v", typ) } return mpc, nil }