in datahub/resultmodel.go [598:649]
func NewGetBatchRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult, deserializer *batchDeserializer) (*GetRecordsResult, error) {
data, err := util.UnwrapMessage(data)
if err != nil {
return nil, err
}
gbr := &pbmodel.GetBinaryRecordsResponse{}
if err := proto.Unmarshal(data, gbr); err != nil {
return nil, err
}
result := &GetRecordsResult{
CommonResponseResult: *commonResp,
RecordSchema: schema,
}
if gbr.NextCursor != nil {
result.NextCursor = *(gbr.NextCursor)
}
if gbr.StartSequence != nil {
result.StartSequence = *gbr.StartSequence
}
if gbr.LatestSequence != nil {
result.LatestSequence = *gbr.LatestSequence
}
if gbr.LatestTime != nil {
result.LatestTime = *gbr.LatestTime
}
// 这里的RecordCount不是record数量,而是batch的数量
if gbr.RecordCount != nil {
if *gbr.RecordCount > 0 {
result.Records = make([]IRecord, 0, *gbr.RecordCount)
for _, record := range gbr.Records {
meta := &respMeta{
cursor: record.GetCursor(),
nextCursor: record.GetNextCursor(),
sequence: record.GetSequence(),
systemTime: record.GetSystemTime(),
serial: int64(record.GetSerial()),
}
recordList, err := deserializer.deserialize(record.Data, meta)
if err != nil {
return nil, err
}
result.Records = append(result.Records, recordList...)
}
}
}
result.RecordCount = len(result.Records)
return result, nil
}