wstl1/tools/notebook/extensions/wstl/service/wstlserver/context.go (161 lines of code) (raw):

// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package wstlserver import ( "context" "encoding/json" "errors" "fmt" "strings" "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/transform" /* copybara-comment: transform */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/util/gcsutil" /* copybara-comment: gcsutil */ "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/util/ioutil" /* copybara-comment: ioutil */ "google.golang.org/grpc/codes" /* copybara-comment: codes */ "google.golang.org/grpc/status" /* copybara-comment: status */ dhpb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: data_harmonization_go_proto */ hpb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: harmonization_go_proto */ httppb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: http_go_proto */ lpb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/mapping_engine/proto" /* copybara-comment: library_go_proto */ wspb "github.com/GoogleCloudPlatform/healthcare-data-harmonization/tools/notebook/extensions/wstl/proto" /* copybara-comment: wstlservice_go_proto */ ) // Context is a single transformation context that loosely corresponds to a Jupyter notebook cell. type Context struct { // Whistle incremental transformer context. incrementalTransformer transform.Transformer // Google Cloud Storage client. storageClient gcsutil.StorageClient } // NewContext instantiates a transformation context. // TODO () Modify the funtion to only take in non-nil StorageClient. func NewContext(c gcsutil.StorageClient) (*Context, error) { if c == nil { return nil, fmt.Errorf("the argument to NewContext is nil") } return &Context{storageClient: c}, nil } // EvaluateIncrementalTransformation evaluates incremental updates to the whistle script and outputs // the result. func (c *Context) EvaluateIncrementalTransformation(request *wspb.IncrementalTransformRequest) ([]*wspb.TransformedRecords, error) { if request == nil { return nil, errors.New("empty request") } if request.GetWstl() == "" { return nil, errors.New("missing wstl script from session") } config := newHarmonizationConfig(request.GetWstl(), request.GetLibraryConfig(), request.GetCodeConfig(), request.GetUnitConfig()) trans, err := transform.NewTransformer(context.Background(), config, transform.TransformationConfig{}, transform.GCSClient(c.storageClient)) if err != nil { return nil, err } inputs := request.GetInput() if len(inputs) == 0 { inputs = append(inputs, &wspb.Location{Location: &wspb.Location_InlineJson{InlineJson: "{}"}}) } return executeTransformation(trans, inputs), nil } // TODO (): move to wstlserver level. func newHarmonizationConfig(wstl string, libraryConfigs []*wspb.Location, codeConfigs []*wspb.Location, unitConfig *wspb.Location) *dhpb.DataHarmonizationConfig { libConfig := []*lpb.LibraryConfig{} if len(libraryConfigs) > 0 { for _, libraryConfig := range libraryConfigs { switch l := libraryConfig.GetLocation().(type) { case *wspb.Location_LocalPath: names := ioutil.MustReadGlob(l.LocalPath, "library_config") for _, f := range names { if !strings.HasSuffix(f, ".wstl") { continue } tLibConfig := &lpb.LibraryConfig{} tLibConfig.UserLibraries = []*lpb.UserLibrary{ &lpb.UserLibrary{ Type: hpb.MappingType_MAPPING_LANGUAGE, Path: &httppb.Location{ Location: &httppb.Location_LocalPath{ LocalPath: f, }, }, }, } libConfig = append(libConfig, tLibConfig) } case *wspb.Location_GcsLocation: tLibConfig := &lpb.LibraryConfig{} tLibConfig.UserLibraries = []*lpb.UserLibrary{ &lpb.UserLibrary{ Type: hpb.MappingType_MAPPING_LANGUAGE, Path: &httppb.Location{ Location: &httppb.Location_GcsLocation{ GcsLocation: l.GcsLocation, }, }, }, } libConfig = append(libConfig, tLibConfig) } } } var codeConfigLocs []*httppb.Location if len(codeConfigs) > 0 { for _, cConfig := range codeConfigs { switch l := cConfig.GetLocation().(type) { case *wspb.Location_LocalPath: names := ioutil.MustReadGlob(l.LocalPath, "code_config") for _, f := range names { if !strings.HasSuffix(f, ".json") { continue } codeConfigLocs = append(codeConfigLocs, &httppb.Location{Location: &httppb.Location_LocalPath{LocalPath: f}}) } case *wspb.Location_GcsLocation: codeConfigLocs = append(codeConfigLocs, &httppb.Location{Location: &httppb.Location_GcsLocation{GcsLocation: l.GcsLocation}}) } } } uConfig := &hpb.UnitHarmonizationConfig{} if unitConfig != nil { switch l := unitConfig.GetLocation().(type) { case *wspb.Location_LocalPath: uConfig.UnitConversion = &httppb.Location{Location: &httppb.Location_LocalPath{LocalPath: l.LocalPath}} case *wspb.Location_GcsLocation: uConfig.UnitConversion = &httppb.Location{Location: &httppb.Location_GcsLocation{GcsLocation: l.GcsLocation}} } } return &dhpb.DataHarmonizationConfig{ StructureMappingConfig: &hpb.StructureMappingConfig{ Mapping: &hpb.StructureMappingConfig_MappingLanguageString{ MappingLanguageString: wstl, }, }, LibraryConfig: libConfig, HarmonizationConfig: &hpb.CodeHarmonizationConfig{CodeLookup: codeConfigLocs}, UnitHarmonizationConfig: uConfig, } } func executeTransformation(trans transform.Transformer, inputs []*wspb.Location) []*wspb.TransformedRecords { results := []*wspb.TransformedRecords{} for _, input := range inputs { tRecord := &wspb.TransformedRecords{} var source json.RawMessage switch l := input.GetLocation().(type) { case *wspb.Location_InlineJson: source = json.RawMessage(l.InlineJson) case *wspb.Location_GcsLocation: var err error if source, err = gcsutil.ReadFromGcs(context.Background(), l.GcsLocation); err != nil { tRecord.Record = &wspb.TransformedRecords_Error{ Error: status.New(codes.InvalidArgument, err.Error()).Proto(), } results = append(results, tRecord) continue } default: tRecord.Record = &wspb.TransformedRecords_Error{ Error: status.New(codes.InvalidArgument, "unsupported input type").Proto(), } results = append(results, tRecord) continue } output, err := trans.JSONtoJSON(source) if err != nil { tRecord.Record = &wspb.TransformedRecords_Error{ Error: status.New(codes.InvalidArgument, err.Error()).Proto(), } } else { tRecord.Record = &wspb.TransformedRecords_Output{ Output: string(output), } } results = append(results, tRecord) } return results }