func normalizeMessageField()

in tools/mc2bq/pkg/schema/schema.go [200:291]


func normalizeMessageField(obj protoreflect.Message, col *bigquery.FieldSchema) (any, error) {
	fieldDesc := obj.Descriptor().Fields().ByName(protoreflect.Name(col.Name))
	if fieldDesc == nil {
		return nil, fmt.Errorf("field %q not found", col.Name)
	}
	value := obj.Get(fieldDesc)

	if fieldDesc.Cardinality() != protoreflect.Repeated {
		res, err := convertProtoValueToBQType(value, fieldDesc, col)
		if err != nil {
			return nil, wrapWithSerializeError(col.Name, err)
		}

		return res, nil
	}

	if fieldDesc.IsList() {
		lst := value.List()
		if lst.Len() == 0 {
			return nil, nil
		}

		tmpList := make([]any, lst.Len())
		itemSchema := *col
		itemSchema.Repeated = false // we are serializing the item so it's not repeated
		for i := 0; i < len(tmpList); i++ {
			var err error
			item := lst.Get(i)
			tmpList[i], err = convertProtoValueToBQType(item, fieldDesc, &itemSchema)
			if err != nil {
				return nil, wrapWithSerializeError(fmt.Sprintf("%s[%d]", col.Name, i), err)
			}
		}

		return tmpList, nil
	}

	if fieldDesc.IsMap() {
		// Create a dynamic map. Because maps are not supported by BigQuery.
		// We need to convert it to an array in the form of [struct{key: string, value: T}, ...]

		if len(col.Schema) != 2 {
			return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
		}

		if col.Schema[1].Name != "value" {
			return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
		}

		if fieldDesc.MapKey().Kind() != protoreflect.StringKind {
			return nil, wrapWithSerializeError(col.Name, errors.New("schema for dynamic map is invalid"))
		}

		dict := value.Map()
		keySchema := col.Schema[1]
		result := []map[string]any{}

		var err error
		dict.Range(func(mk protoreflect.MapKey, v protoreflect.Value) bool {
			item := map[string]any{}
			key := mk.String()
			item["key"] = key
			item["value"], err = convertProtoValueToBQType(v, fieldDesc.MapValue(), keySchema)
			if err != nil {
				err = wrapWithSerializeError(fmt.Sprintf("%s[%q]", col.Name, key), err)
				return false
			}

			result = append(result, item)
			return true
		})

		if err != nil {
			return nil, err
		}

		// Sort by key to make the output list stable, otherwise each export
		// might reorder the items making diffing harder
		sort.SliceStable(result, func(i, j int) bool {
			return strings.Compare(result[i]["key"].(string), result[j]["key"].(string)) < 0
		})

		if len(result) == 0 {
			return nil, nil
		}

		return result, nil
	}

	// Fields are either scalars, lists or maps
	panic("unreachable code")
}