func()

in odps/tunnel/record_protoc_reader.go [151:353]


func (r *RecordProtocReader) readField(dt datatype.DataType) (data.Data, error) {
	var fieldValue data.Data

	switch dt.ID() {
	case datatype.DOUBLE:
		v, err := r.protocReader.ReadFloat64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.Double(v)
	case datatype.FLOAT:
		v, err := r.protocReader.ReadFloat32()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.Float(v)
	case datatype.BOOLEAN:
		v, err := r.protocReader.ReadBool()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.Bool(v)
	case datatype.BIGINT:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.BigInt(v)
	case datatype.IntervalYearMonth:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.IntervalYearMonth(v)
	case datatype.INT:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.Int(v)
	case datatype.SMALLINT:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.SmallInt(v)
	case datatype.TINYINT:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		fieldValue = data.TinyInt(v)
	case datatype.STRING:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(v)
		fieldValue = data.String(v)
	case datatype.VARCHAR:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(v)
		t := dt.(datatype.VarcharType)
		fieldValue, _ = data.MakeVarChar(t.Length, string(v))
	case datatype.CHAR:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(v)
		t := dt.(datatype.CharType)
		fieldValue, _ = data.MakeChar(t.Length, string(v))
	case datatype.BINARY:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(v)
		fieldValue = data.Binary(v)
	case datatype.DATETIME:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		// TODO 需要根据schema中的shouldTransform,来确定是否将时间转换为本地时区的时间
		seconds := v / 1000
		nanoSeconds := (v % 1000) * 1000_000
		// time.Unix获取的时间已经带本地时区信息
		fieldValue = data.DateTime(time.Unix(seconds, nanoSeconds))
	case datatype.DATE:
		v, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)
		// v为从1970-01-01以来的天数
		d := epochDay.AddDate(0, 0, int(v))
		fieldValue = data.Date(d)
	case datatype.IntervalDayTime:
		seconds, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		nanoSeconds, err := r.protocReader.ReadSInt32()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)

		fieldValue = data.NewIntervalDayTime(seconds, nanoSeconds)
	case datatype.TIMESTAMP:
		seconds, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		nanoSeconds, err := r.protocReader.ReadSInt32()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)

		fieldValue = data.Timestamp(time.Unix(seconds, int64(nanoSeconds)))
	case datatype.TIMESTAMP_NTZ:
		seconds, err := r.protocReader.ReadSInt64()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		nanoSeconds, err := r.protocReader.ReadSInt32()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(seconds)
		r.recordCrc.Update(nanoSeconds)
		fieldValue = data.TimestampNtz(time.Unix(seconds, int64(nanoSeconds)).UTC())
	case datatype.DECIMAL:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}
		r.recordCrc.Update(v)
		decimalType := dt.(datatype.DecimalType)

		fieldValue = data.NewDecimal(int(decimalType.Precision), int(decimalType.Scale), string(v))
	case datatype.ARRAY:
		var err error
		fieldValue, err = r.readArray(dt.(datatype.ArrayType).ElementType)
		if err != nil {
			return nil, errors.WithStack(err)
		}
	case datatype.MAP:
		var err error
		fieldValue, err = r.readMap(dt.(datatype.MapType))
		if err != nil {
			return nil, errors.WithStack(err)
		}
	case datatype.STRUCT:
		var err error
		fieldValue, err = r.readStruct(dt.(datatype.StructType))
		if err != nil {
			return nil, errors.WithStack(err)
		}
	case datatype.JSON:
		v, err := r.protocReader.ReadBytes()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		r.recordCrc.Update(v)

		fieldValue = &data.Json{
			Data:  string(v),
			Valid: true,
		}
	}

	return fieldValue, nil
}