tools/mc2bq/pkg/schema/schema.go (253 lines of code) (raw):

// Copyright 2023 Google LLC All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package schema contains the latest BigQuery schemas and utilities for // serializeing Migration Center objects to schema compatible JSON objects. package schema import ( _ "embed" "encoding/json" "errors" "fmt" "reflect" "sort" "strings" "time" "cloud.google.com/go/bigquery" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" "google.golang.org/protobuf/types/known/timestamppb" "github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/messages" ) // ExporterSchema is the collection of the individual table schemas that will // be used during export type ExporterSchema struct { AssetTable bigquery.Schema `json:"asset_table" bq:"assets"` GroupTable bigquery.Schema `json:"group_table" bq:"groups"` PreferenceSetTable bigquery.Schema `json:"preference_set_table" bq:"preference_sets"` } var _ json.Marshaler = &ExporterSchema{} var _ json.Unmarshaler = &ExporterSchema{} // MarshalJSON implementes the json.Marshaller interface. func (s *ExporterSchema) MarshalJSON() ([]byte, error) { // We need to implement the Marshaller interface because bigquery.Schema marshals with // ToJSONFields() insteald of implementing the Marshaller interface. myType := reflect.TypeOf(s).Elem() myValue := reflect.ValueOf(s).Elem() res := map[string]json.RawMessage{} for i := 0; i < myType.NumField(); i++ { key := myType.Field(i).Tag.Get("json") schema := myValue.Field(i).Interface().(bigquery.Schema) fields, err := schema.ToJSONFields() if err != nil { return nil, err } res[key] = fields } return json.Marshal(res) } // UnmarshalJSON implementes the json.Unmarshaller interface. func (s *ExporterSchema) UnmarshalJSON(buf []byte) error { // We implement the Unmarshaller interface because bigquery.Schema doesn't // simply unmarshal but it is loaded by using SchemaFromJSON() myType := reflect.TypeOf(s).Elem() myValue := reflect.ValueOf(s).Elem() raw := map[string]json.RawMessage{} err := json.Unmarshal(buf, &raw) if err != nil { return err } for i := 0; i < myType.NumField(); i++ { key := myType.Field(i).Tag.Get("json") rawSchema, ok := raw[key] if !ok { // schema older than this entity continue } loadedSchema, err := bigquery.SchemaFromJSON(rawSchema) if err != nil { return fmt.Errorf("unmarshal %s: %w", key, err) } myValue.Field(i).Set(reflect.ValueOf(loadedSchema)) } if len(s.AssetTable) == 0 { return messages.NewError(messages.ErrorInvalidSchema) } return nil } //go:embed migrationcenter_v1_latest.schema.json var rawEmbeddedSchema []byte // EmbeddedSchema is the embedded schema distributed with the tool. var EmbeddedSchema ExporterSchema func init() { err := json.Unmarshal(rawEmbeddedSchema, &EmbeddedSchema) if err != nil { panic(err) } } // NewSerializer creates a type safe serializer for type T. // It's the callers responsibility to make sure that the schema and type T match. // root describes the root node string that will appear in errors. func NewSerializer[T protoreflect.ProtoMessage](root string, schema bigquery.Schema) func(obj T) ([]byte, error) { return func(obj T) ([]byte, error) { return SerializeObjectToBigQuery(obj.ProtoReflect(), root, schema) } } // SerializeObjectToBigQuery serializes an object as a BigQuery compatible JSON. // A '\n' is appended at the end of the json data. // The function should never return an error in production, if it fails it's a bug // resulting from a mismatch between the API object and the BigQuery schema and both are generated // from the same protobuf. func SerializeObjectToBigQuery(obj protoreflect.Message, root string, schema bigquery.Schema) ([]byte, error) { serializedObj, err := normalizeToSchema( obj, &bigquery.FieldSchema{ Name: root, Schema: schema, Type: bigquery.RecordFieldType, }, ) if err != nil { return nil, err } res, err := json.Marshal(serializedObj) if err != nil { return nil, err } return append(res, '\n'), err } func fieldConversionError(kind protoreflect.Kind, bqtype bigquery.FieldType) error { return fmt.Errorf("convert proto kind %q to bigquery type %q", kind.String(), bqtype) } func convertProtoValueToBQType(value protoreflect.Value, fd protoreflect.FieldDescriptor, schema *bigquery.FieldSchema) (any, error) { bqtype := schema.Type kind := fd.Kind() switch kind { case protoreflect.BoolKind: switch bqtype { case bigquery.BooleanFieldType: return value.Bool(), nil } case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind, protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: switch bqtype { case bigquery.IntegerFieldType: return value.Int(), nil } case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, protoreflect.Uint64Kind, protoreflect.Fixed64Kind: switch bqtype { case bigquery.IntegerFieldType: return value.Uint(), nil } case protoreflect.StringKind: switch bqtype { case bigquery.StringFieldType: return value.String(), nil } case protoreflect.FloatKind, protoreflect.DoubleKind: return value.Float(), nil case protoreflect.MessageKind: return normalizeToSchema(value.Message(), schema) case protoreflect.EnumKind: switch bqtype { case bigquery.StringFieldType: return protoimpl.X.EnumStringOf(fd.Enum(), value.Enum()), nil } } return nil, fieldConversionError(kind, bqtype) } func normalizeMessageField(obj protoreflect.Message, col *bigquery.FieldSchema) (any, error) { fieldDesc := obj.Descriptor().Fields().ByName(protoreflect.Name(col.Name)) if fieldDesc == nil { return nil, fmt.Errorf("field %q not found", col.Name) } value := obj.Get(fieldDesc) if fieldDesc.Cardinality() != protoreflect.Repeated { res, err := convertProtoValueToBQType(value, fieldDesc, col) if err != nil { return nil, wrapWithSerializeError(col.Name, err) } return res, nil } if fieldDesc.IsList() { lst := value.List() if lst.Len() == 0 { return nil, nil } tmpList := make([]any, lst.Len()) itemSchema := *col itemSchema.Repeated = false // we are serializing the item so it's not repeated for i := 0; i < len(tmpList); i++ { var err error item := lst.Get(i) tmpList[i], err = convertProtoValueToBQType(item, fieldDesc, &itemSchema) if err != nil { return nil, wrapWithSerializeError(fmt.Sprintf("%s[%d]", col.Name, i), err) } } return tmpList, nil } if fieldDesc.IsMap() { // Create a dynamic map. Because maps are not supported by BigQuery. // We need to convert it to an array in the form of [struct{key: string, value: T}, ...] if len(col.Schema) != 2 { return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid")) } if col.Schema[1].Name != "value" { return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid")) } if fieldDesc.MapKey().Kind() != protoreflect.StringKind { return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid")) } dict := value.Map() keySchema := col.Schema[1] result := []map[string]any{} var err error dict.Range(func(mk protoreflect.MapKey, v protoreflect.Value) bool { item := map[string]any{} key := mk.String() item["key"] = key item["value"], err = convertProtoValueToBQType(v, fieldDesc.MapValue(), keySchema) if err != nil { err = wrapWithSerializeError(fmt.Sprintf("%s[%q]", col.Name, key), err) return false } result = append(result, item) return true }) if err != nil { return nil, err } // Sort by key to make the output list stable, otherwise each export // might reorder the items making diffing harder sort.SliceStable(result, func(i, j int) bool { return strings.Compare(result[i]["key"].(string), result[j]["key"].(string)) < 0 }) if len(result) == 0 { return nil, nil } return result, nil } // Fields are either scalars, lists or maps panic("unreachable code") } // normalizeToSchema normalizes obj to according to the provided schema. // Specifically converts structs to maps, and maps to key-value lists. func normalizeToSchema(obj protoreflect.Message, schema *bigquery.FieldSchema) (any, error) { // short circuit for nil values if !obj.IsValid() { return nil, nil } if obj, ok := obj.Interface().(*timestamppb.Timestamp); ok { return time.Unix(obj.Seconds, int64(obj.Nanos)).Format(time.RFC3339), nil } result := map[string]any{} for _, col := range schema.Schema { res, err := normalizeMessageField(obj, col) if err != nil { return nil, err } if res == nil { continue } result[col.Name] = res } return result, nil } type serializeError struct { field string err error } func (err *serializeError) Error() string { return fmt.Sprintf("error serializing field %s: %v", err.field, err.err) } func (err *serializeError) Unwrap() error { return err.err } // wrapWithSerializeError wraps err with a serializeError. If the error is already a serializeError // it prepends the field to the original errors field. func wrapWithSerializeError(field string, err error) *serializeError { var serr *serializeError if errors.As(err, &serr) { serr.field = field + "." + serr.field return serr } return &serializeError{ field: field, err: err, } }