in encoder/avro.go [110:162]
func convertCommonFormatToAvroRecord(rs goavro.RecordSetter, cfEvent *types.CommonFormatEvent, rec *goavro.Record, filter []int) error {
err := rec.Set("row_key", []byte(GetCommonFormatKey(cfEvent))) //TODO: Revisit row_key from primary_key
if err != nil {
return err
}
err = rec.Set("ref_key", int64(cfEvent.SeqNo))
if err != nil {
return err
}
err = rec.Set("is_deleted", strings.EqualFold(cfEvent.Type, "delete"))
if err != nil {
return err
}
if cfEvent.Fields == nil {
return nil
}
for i, j := 0, 0; i < len(*cfEvent.Fields); i++ {
if filteredField(filter, i, &j) {
continue
}
field := (*cfEvent.Fields)[i]
/* If the field is integer convert it from JSON's float number */
switch r := field.Value.(type) {
case float64:
s, err := rec.GetFieldSchema(field.Name)
if err != nil {
return err
}
for _, t := range s.(map[string]interface{})["type"].([]interface{}) {
switch t.(string) {
case "int":
field.Value = int32(r)
case "long":
field.Value = int64(r)
}
}
case time.Time:
if !r.Equal(time.Time{}) {
field.Value = r.UnixNano() / 1000000
} else {
field.Value = nil
}
}
err = rec.Set(field.Name, field.Value)
if err != nil {
return err
}
}
return nil
}