in odps/tunnel/record_protoc_reader.go [56:128]
func (r *RecordProtocReader) Read() (data.Record, error) {
record := make([]data.Data, len(r.columns))
LOOP:
for {
tag, _, err := r.protocReader.ReadTag()
if err != nil {
return nil, errors.WithStack(err)
}
switch tag {
case EndRecord:
crc := r.recordCrc.Value()
uint32V, err := r.protocReader.ReadUInt32()
if err != nil {
return nil, errors.WithStack(err)
}
if crc != uint32V {
return nil, errors.New("crc value is error")
}
r.recordCrc.Reset()
r.crcOfCrc.Update(crc)
break LOOP
case MetaCount:
sInt64, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
if sInt64 != r.count {
return nil, errors.New("record count does not match")
}
tag, _, err := r.protocReader.ReadTag()
if err != nil {
return nil, errors.WithStack(err)
}
if tag != MetaChecksum {
return nil, errors.New("invalid stream")
}
crcOfCrc, err := r.protocReader.ReadUInt32()
if err == nil {
_, err = r.protocReader.inner.Read([]byte{'0'})
if (!errors.Is(err, io.EOF)) && (!errors.Is(err, io.ErrUnexpectedEOF)) {
return nil, errors.New("expect end of stream, but not")
}
}
if r.crcOfCrc.Value() != crcOfCrc {
return nil, errors.New("checksum is invalid")
}
default:
columnIndex := int32(tag)
if int(columnIndex) > len(r.columns) {
return nil, errors.New("invalid protobuf tag")
}
r.recordCrc.Update(columnIndex)
c := r.columns[columnIndex-1]
fv, err := r.readField(c.Type)
if err != nil {
return nil, errors.WithStack(err)
}
record[columnIndex-1] = fv
}
}
r.count += 1
return record, nil
}