in tools/mc2bq/pkg/schema/schema.go [200:291]
func normalizeMessageField(obj protoreflect.Message, col *bigquery.FieldSchema) (any, error) {
fieldDesc := obj.Descriptor().Fields().ByName(protoreflect.Name(col.Name))
if fieldDesc == nil {
return nil, fmt.Errorf("field %q not found", col.Name)
}
value := obj.Get(fieldDesc)
if fieldDesc.Cardinality() != protoreflect.Repeated {
res, err := convertProtoValueToBQType(value, fieldDesc, col)
if err != nil {
return nil, wrapWithSerializeError(col.Name, err)
}
return res, nil
}
if fieldDesc.IsList() {
lst := value.List()
if lst.Len() == 0 {
return nil, nil
}
tmpList := make([]any, lst.Len())
itemSchema := *col
itemSchema.Repeated = false // we are serializing the item so it's not repeated
for i := 0; i < len(tmpList); i++ {
var err error
item := lst.Get(i)
tmpList[i], err = convertProtoValueToBQType(item, fieldDesc, &itemSchema)
if err != nil {
return nil, wrapWithSerializeError(fmt.Sprintf("%s[%d]", col.Name, i), err)
}
}
return tmpList, nil
}
if fieldDesc.IsMap() {
// Create a dynamic map. Because maps are not supported by BigQuery.
// We need to convert it to an array in the form of [struct{key: string, value: T}, ...]
if len(col.Schema) != 2 {
return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
}
if col.Schema[1].Name != "value" {
return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
}
if fieldDesc.MapKey().Kind() != protoreflect.StringKind {
return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
}
dict := value.Map()
keySchema := col.Schema[1]
result := []map[string]any{}
var err error
dict.Range(func(mk protoreflect.MapKey, v protoreflect.Value) bool {
item := map[string]any{}
key := mk.String()
item["key"] = key
item["value"], err = convertProtoValueToBQType(v, fieldDesc.MapValue(), keySchema)
if err != nil {
err = wrapWithSerializeError(fmt.Sprintf("%s[%q]", col.Name, key), err)
return false
}
result = append(result, item)
return true
})
if err != nil {
return nil, err
}
// Sort by key to make the output list stable, otherwise each export
// might reorder the items making diffing harder
sort.SliceStable(result, func(i, j int) bool {
return strings.Compare(result[i]["key"].(string), result[j]["key"].(string)) < 0
})
if len(result) == 0 {
return nil, nil
}
return result, nil
}
// Fields are either scalars, lists or maps
panic("unreachable code")
}