in schema/mysql_avro_schema_converter.go [80:133]
func ConvertToAvroFromSchema(tblSchema *types.TableSchema, formatType string) ([]byte, error) {
avroSchema := &types.AvroSchema{
Name: fmt.Sprintf("%s_%s", strings.Replace(tblSchema.DBName, "-", "__", -1), strings.Replace(tblSchema.TableName, "-", "__", -1)),
Type: types.AvroRECORD,
Namespace: namespace,
Fields: []types.AvroField{},
Owner: tblSchema.DBName,
}
for _, colSchema := range tblSchema.Columns {
avroType := MySQLToAvroType[strings.ToUpper(colSchema.DataType)]
if avroType == "" {
continue
}
if colSchema.Type == types.MySQLBoolean {
avroType = MySQLToAvroType["BOOLEAN"]
}
fieldTypes := []types.AvroPrimitiveType{types.AvroNULL, avroType}
avroField := types.AvroField{
Name: colSchema.Name,
Type: fieldTypes,
Default: nil,
}
avroSchema.Fields = append(avroSchema.Fields, avroField)
}
if formatType == "avro" {
fieldTypes := []types.AvroPrimitiveType{types.AvroLONG}
avroField := types.AvroField{
Name: "ref_key",
Type: fieldTypes,
Default: nil,
}
avroSchema.Fields = append(avroSchema.Fields, avroField)
fieldTypes = []types.AvroPrimitiveType{types.AvroBYTES}
avroField = types.AvroField{
Name: "row_key",
Type: fieldTypes,
Default: nil,
}
avroSchema.Fields = append(avroSchema.Fields, avroField)
fieldTypes = []types.AvroPrimitiveType{types.AvroNULL, types.AvroBOOLEAN}
avroField = types.AvroField{
Name: "is_deleted",
Type: fieldTypes,
Default: nil,
}
avroSchema.Fields = append(avroSchema.Fields, avroField)
}
return json.Marshal(avroSchema)
}