in odps/tunnel/record_protoc_writer.go [167:307]
func (r *RecordProtocWriter) writeField(val data.Data) error {
switch val := val.(type) {
case data.Double:
r.recordCrc.Update(val)
return errors.WithStack(r.protocWriter.WriteFloat64(float64(val)))
case data.Float:
r.recordCrc.Update(val)
return errors.WithStack(r.protocWriter.WriteFloat32(float32(val)))
case data.Bool:
r.recordCrc.Update(val)
return errors.WithStack(r.protocWriter.WriteBool(bool(val)))
case data.BigInt:
r.recordCrc.Update(val)
return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
case data.IntervalYearMonth:
r.recordCrc.Update(int64(val))
return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
case data.Int:
r.recordCrc.Update(int64(val))
return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
case data.SmallInt:
r.recordCrc.Update(int64(val))
return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
case data.TinyInt:
r.recordCrc.Update(int64(val))
return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
case *data.String:
b := []byte(string(*val))
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case data.String:
b := []byte(string(val))
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case *data.VarChar:
b := []byte(val.Data())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case data.VarChar:
b := []byte(val.Data())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case *data.Char:
b := []byte(val.Data())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case data.Char:
b := []byte(val.Data())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case data.Binary:
r.recordCrc.Update(val)
return errors.WithStack(r.protocWriter.WriteBytes(val))
case data.DateTime:
t := val.Time()
// 应该直接写成: milliSeconds := t.UnixMilli()
// 但是 Time.UnixMilli is added in go.1.17
// func (t Time) UnixMilli() int64 {
// return t.unixSec()*1e3 + int64(t.nsec())/1e6
// }
unixSec := t.Unix()
nanoSec := t.Nanosecond()
milliSeconds := unixSec*1e3 + int64(nanoSec)/1e6
// TODO 需要根据schema中的shouldTransform,来确定是否将时间转换为本地时区的时间
r.recordCrc.Update(milliSeconds)
return errors.WithStack(r.protocWriter.WriteSInt64(milliSeconds))
case data.Date:
t := val.Time()
// 获取从1970年以来的天数
days := t.Unix() / data.SecondsPerDay
r.recordCrc.Update(days)
return errors.WithStack(r.protocWriter.WriteSInt64(days))
case data.IntervalDayTime:
seconds := val.Seconds()
nanoSeconds := val.NanosFraction()
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
err := r.protocWriter.WriteSInt64(seconds)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
case data.Timestamp:
t := val.Time()
seconds := t.Unix()
nanoSeconds := int32(t.Nanosecond())
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
err := r.protocWriter.WriteSInt64(seconds)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
case data.TimestampNtz:
t := val.Time()
seconds := t.Unix()
nanoSeconds := int32(t.Nanosecond())
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
err := r.protocWriter.WriteSInt64(seconds)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
case data.Decimal:
b := []byte(val.Value())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case *data.Decimal:
b := []byte(val.Value())
r.recordCrc.Update(b)
return errors.WithStack(r.protocWriter.WriteBytes(b))
case data.Array:
return errors.WithStack(r.writeArray(val.ToSlice()))
case *data.Array:
return errors.WithStack(r.writeArray(val.ToSlice()))
case data.Map:
return errors.WithStack(r.writeMap(&val))
case *data.Map:
return errors.WithStack(r.writeMap(val))
case data.Struct:
return errors.WithStack(r.writeStruct(&val))
case *data.Struct:
return errors.WithStack(r.writeStruct(val))
case data.Json:
return errors.WithStack(r.writeJson(&val))
case *data.Json:
return errors.WithStack(r.writeJson(val))
}
return errors.Errorf("invalid data type %v", val.Type())
}