func()

in odps/tunnel/record_protoc_reader.go [56:128]


func (r *RecordProtocReader) Read() (data.Record, error) {
	record := make([]data.Data, len(r.columns))

LOOP:
	for {
		tag, _, err := r.protocReader.ReadTag()
		if err != nil {
			return nil, errors.WithStack(err)
		}

		switch tag {
		case EndRecord:
			crc := r.recordCrc.Value()
			uint32V, err := r.protocReader.ReadUInt32()
			if err != nil {
				return nil, errors.WithStack(err)
			}

			if crc != uint32V {
				return nil, errors.New("crc value is error")
			}
			r.recordCrc.Reset()
			r.crcOfCrc.Update(crc)
			break LOOP
		case MetaCount:
			sInt64, err := r.protocReader.ReadSInt64()
			if err != nil {
				return nil, errors.WithStack(err)
			}

			if sInt64 != r.count {
				return nil, errors.New("record count does not match")
			}

			tag, _, err := r.protocReader.ReadTag()
			if err != nil {
				return nil, errors.WithStack(err)
			}

			if tag != MetaChecksum {
				return nil, errors.New("invalid stream")
			}

			crcOfCrc, err := r.protocReader.ReadUInt32()
			if err == nil {
				_, err = r.protocReader.inner.Read([]byte{'0'})
				if (!errors.Is(err, io.EOF)) && (!errors.Is(err, io.ErrUnexpectedEOF)) {
					return nil, errors.New("expect end of stream, but not")
				}
			}

			if r.crcOfCrc.Value() != crcOfCrc {
				return nil, errors.New("checksum is invalid")
			}
		default:
			columnIndex := int32(tag)
			if int(columnIndex) > len(r.columns) {
				return nil, errors.New("invalid protobuf tag")
			}
			r.recordCrc.Update(columnIndex)
			c := r.columns[columnIndex-1]
			fv, err := r.readField(c.Type)
			if err != nil {
				return nil, errors.WithStack(err)
			}

			record[columnIndex-1] = fv
		}
	}

	r.count += 1
	return record, nil
}