in odps/tunnel/record_protoc_reader.go [151:353]
func (r *RecordProtocReader) readField(dt datatype.DataType) (data.Data, error) {
var fieldValue data.Data
switch dt.ID() {
case datatype.DOUBLE:
v, err := r.protocReader.ReadFloat64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.Double(v)
case datatype.FLOAT:
v, err := r.protocReader.ReadFloat32()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.Float(v)
case datatype.BOOLEAN:
v, err := r.protocReader.ReadBool()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.Bool(v)
case datatype.BIGINT:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.BigInt(v)
case datatype.IntervalYearMonth:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.IntervalYearMonth(v)
case datatype.INT:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.Int(v)
case datatype.SMALLINT:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.SmallInt(v)
case datatype.TINYINT:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.TinyInt(v)
case datatype.STRING:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.String(v)
case datatype.VARCHAR:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
t := dt.(datatype.VarcharType)
fieldValue, _ = data.MakeVarChar(t.Length, string(v))
case datatype.CHAR:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
t := dt.(datatype.CharType)
fieldValue, _ = data.MakeChar(t.Length, string(v))
case datatype.BINARY:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = data.Binary(v)
case datatype.DATETIME:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
// TODO 需要根据schema中的shouldTransform,来确定是否将时间转换为本地时区的时间
seconds := v / 1000
nanoSeconds := (v % 1000) * 1000_000
// time.Unix获取的时间已经带本地时区信息
fieldValue = data.DateTime(time.Unix(seconds, nanoSeconds))
case datatype.DATE:
v, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
// v为从1970-01-01以来的天数
d := epochDay.AddDate(0, 0, int(v))
fieldValue = data.Date(d)
case datatype.IntervalDayTime:
seconds, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
nanoSeconds, err := r.protocReader.ReadSInt32()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
fieldValue = data.NewIntervalDayTime(seconds, nanoSeconds)
case datatype.TIMESTAMP:
seconds, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
nanoSeconds, err := r.protocReader.ReadSInt32()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
fieldValue = data.Timestamp(time.Unix(seconds, int64(nanoSeconds)))
case datatype.TIMESTAMP_NTZ:
seconds, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
nanoSeconds, err := r.protocReader.ReadSInt32()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
fieldValue = data.TimestampNtz(time.Unix(seconds, int64(nanoSeconds)).UTC())
case datatype.DECIMAL:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
decimalType := dt.(datatype.DecimalType)
fieldValue = data.NewDecimal(int(decimalType.Precision), int(decimalType.Scale), string(v))
case datatype.ARRAY:
var err error
fieldValue, err = r.readArray(dt.(datatype.ArrayType).ElementType)
if err != nil {
return nil, errors.WithStack(err)
}
case datatype.MAP:
var err error
fieldValue, err = r.readMap(dt.(datatype.MapType))
if err != nil {
return nil, errors.WithStack(err)
}
case datatype.STRUCT:
var err error
fieldValue, err = r.readStruct(dt.(datatype.StructType))
if err != nil {
return nil, errors.WithStack(err)
}
case datatype.JSON:
v, err := r.protocReader.ReadBytes()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(v)
fieldValue = &data.Json{
Data: string(v),
Valid: true,
}
}
return fieldValue, nil
}