func ConvertToAvroFromSchema()

in schema/mysql_avro_schema_converter.go [80:133]


func ConvertToAvroFromSchema(tblSchema *types.TableSchema, formatType string) ([]byte, error) {
	avroSchema := &types.AvroSchema{
		Name:      fmt.Sprintf("%s_%s", strings.Replace(tblSchema.DBName, "-", "__", -1), strings.Replace(tblSchema.TableName, "-", "__", -1)),
		Type:      types.AvroRECORD,
		Namespace: namespace,
		Fields:    []types.AvroField{},
		Owner:     tblSchema.DBName,
	}

	for _, colSchema := range tblSchema.Columns {
		avroType := MySQLToAvroType[strings.ToUpper(colSchema.DataType)]
		if avroType == "" {
			continue
		}
		if colSchema.Type == types.MySQLBoolean {
			avroType = MySQLToAvroType["BOOLEAN"]
		}
		fieldTypes := []types.AvroPrimitiveType{types.AvroNULL, avroType}
		avroField := types.AvroField{
			Name:    colSchema.Name,
			Type:    fieldTypes,
			Default: nil,
		}
		avroSchema.Fields = append(avroSchema.Fields, avroField)
	}

	if formatType == "avro" {
		fieldTypes := []types.AvroPrimitiveType{types.AvroLONG}
		avroField := types.AvroField{
			Name:    "ref_key",
			Type:    fieldTypes,
			Default: nil,
		}
		avroSchema.Fields = append(avroSchema.Fields, avroField)

		fieldTypes = []types.AvroPrimitiveType{types.AvroBYTES}
		avroField = types.AvroField{
			Name:    "row_key",
			Type:    fieldTypes,
			Default: nil,
		}
		avroSchema.Fields = append(avroSchema.Fields, avroField)

		fieldTypes = []types.AvroPrimitiveType{types.AvroNULL, types.AvroBOOLEAN}
		avroField = types.AvroField{
			Name:    "is_deleted",
			Type:    fieldTypes,
			Default: nil,
		}
		avroSchema.Fields = append(avroSchema.Fields, avroField)
	}

	return json.Marshal(avroSchema)
}