in encoder/avro.go [398:438]
func (e *avroEncoder) decodeEventFields(c *types.CommonFormatEvent, r *goavro.Record) error {
hasNonNil := false
c.Fields = new([]types.CommonFormatField)
for i, j := 0, 0; i < len(e.inSchema.Columns); i++ {
if filteredField(e.filter, i, &j) {
continue
}
n := e.inSchema.Columns[i].Name
v, err := r.Get(n)
if err != nil && !strings.Contains(err.Error(), "no such field") {
return err
}
if v != nil {
hasNonNil = true
dt := e.inSchema.Columns[i].DataType
if dt == "timestamp" || dt == "datetime" {
ms, ok := v.(int64)
if !ok {
return fmt.Errorf("timestamp and datetime formats expected to be int64")
}
t := time.Unix(ms/1000, (ms%1000)*1000000)
if dt == "datetime" {
t = t.In(time.UTC)
}
v = t
}
}
*c.Fields = append(*c.Fields, types.CommonFormatField{Name: n, Value: v})
if e.inSchema.Columns[i].Key == "PRI" && v != nil {
c.Key = append(c.Key, v)
}
}
if !hasNonNil {
c.Fields = nil
}
return nil
}