func()

in odps/tunnel/record_protoc_writer.go [167:307]


func (r *RecordProtocWriter) writeField(val data.Data) error {
	switch val := val.(type) {
	case data.Double:
		r.recordCrc.Update(val)
		return errors.WithStack(r.protocWriter.WriteFloat64(float64(val)))
	case data.Float:
		r.recordCrc.Update(val)
		return errors.WithStack(r.protocWriter.WriteFloat32(float32(val)))
	case data.Bool:
		r.recordCrc.Update(val)
		return errors.WithStack(r.protocWriter.WriteBool(bool(val)))
	case data.BigInt:
		r.recordCrc.Update(val)
		return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
	case data.IntervalYearMonth:
		r.recordCrc.Update(int64(val))
		return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
	case data.Int:
		r.recordCrc.Update(int64(val))
		return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
	case data.SmallInt:
		r.recordCrc.Update(int64(val))
		return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
	case data.TinyInt:
		r.recordCrc.Update(int64(val))
		return errors.WithStack(r.protocWriter.WriteSInt64(int64(val)))
	case *data.String:
		b := []byte(string(*val))
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case data.String:
		b := []byte(string(val))
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case *data.VarChar:
		b := []byte(val.Data())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case data.VarChar:
		b := []byte(val.Data())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case *data.Char:
		b := []byte(val.Data())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case data.Char:
		b := []byte(val.Data())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case data.Binary:
		r.recordCrc.Update(val)
		return errors.WithStack(r.protocWriter.WriteBytes(val))
	case data.DateTime:
		t := val.Time()
		// 应该直接写成: milliSeconds := t.UnixMilli()
		// 但是 Time.UnixMilli is added in go.1.17
		// func (t Time) UnixMilli() int64 {
		//	return t.unixSec()*1e3 + int64(t.nsec())/1e6
		// }
		unixSec := t.Unix()
		nanoSec := t.Nanosecond()
		milliSeconds := unixSec*1e3 + int64(nanoSec)/1e6

		// TODO 需要根据schema中的shouldTransform,来确定是否将时间转换为本地时区的时间
		r.recordCrc.Update(milliSeconds)
		return errors.WithStack(r.protocWriter.WriteSInt64(milliSeconds))
	case data.Date:
		t := val.Time()
		// 获取从1970年以来的天数
		days := t.Unix() / data.SecondsPerDay
		r.recordCrc.Update(days)
		return errors.WithStack(r.protocWriter.WriteSInt64(days))
	case data.IntervalDayTime:
		seconds := val.Seconds()
		nanoSeconds := val.NanosFraction()

		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)
		err := r.protocWriter.WriteSInt64(seconds)
		if err != nil {
			return errors.WithStack(err)
		}

		return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
	case data.Timestamp:
		t := val.Time()
		seconds := t.Unix()
		nanoSeconds := int32(t.Nanosecond())

		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)

		err := r.protocWriter.WriteSInt64(seconds)
		if err != nil {
			return errors.WithStack(err)
		}

		return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
	case data.TimestampNtz:
		t := val.Time()
		seconds := t.Unix()
		nanoSeconds := int32(t.Nanosecond())

		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)

		err := r.protocWriter.WriteSInt64(seconds)
		if err != nil {
			return errors.WithStack(err)
		}

		return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
	case data.Decimal:
		b := []byte(val.Value())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case *data.Decimal:
		b := []byte(val.Value())
		r.recordCrc.Update(b)
		return errors.WithStack(r.protocWriter.WriteBytes(b))
	case data.Array:
		return errors.WithStack(r.writeArray(val.ToSlice()))
	case *data.Array:
		return errors.WithStack(r.writeArray(val.ToSlice()))
	case data.Map:
		return errors.WithStack(r.writeMap(&val))
	case *data.Map:
		return errors.WithStack(r.writeMap(val))
	case data.Struct:
		return errors.WithStack(r.writeStruct(&val))
	case *data.Struct:
		return errors.WithStack(r.writeStruct(val))
	case data.Json:
		return errors.WithStack(r.writeJson(&val))
	case *data.Json:
		return errors.WithStack(r.writeJson(val))
	}

	return errors.Errorf("invalid data type %v", val.Type())
}