in memstore/common/upsert_batch.go [437:538]
func readUpsertBatch(buffer []byte) (*UpsertBatch, error) {
batch := &UpsertBatch{
buffer: buffer,
columnsByID: make(map[int]int),
}
// numRows.
reader := utils.NewBufferReader(buffer)
numRows, err := reader.ReadInt32(4)
if err != nil {
return nil, utils.StackError(err, "Failed to read number of rows")
}
if numRows < 0 {
return nil, utils.StackError(err, "Number of rows should be >= 0")
}
batch.NumRows = int(numRows)
// numColumns.
numColumns, err := reader.ReadUint16(4 + 4)
if err != nil {
return nil, utils.StackError(err, "Failed to read number of columns")
}
batch.NumColumns = int(numColumns)
// 2 byte num columns
arrivalTime, err := reader.ReadUint32(4 + 4 + 2 + 14)
if err != nil {
return nil, utils.StackError(err, "Failed to read arrival time")
}
batch.ArrivalTime = arrivalTime
// Header too small, error out.
if len(buffer) < 28+ColumnHeaderSize(batch.NumColumns) {
return nil, utils.StackError(nil, "Invalid upsert batch data with incomplete header section")
}
header := NewUpsertBatchHeader(buffer[28:], batch.NumColumns)
columns := make([]*columnReader, batch.NumColumns)
for i := range columns {
columnType, err := header.ReadColumnType(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read type for column %d", i)
}
columnID, err := header.ReadColumnID(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read id for column %d", i)
}
batch.columnsByID[columnID] = i
columnMode, columnUpdateMode, err := header.ReadColumnFlag(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read mode for column %d", i)
}
columns[i] = &columnReader{columnID: columnID, columnMode: columnMode, columnUpdateMode: columnUpdateMode, dataType: columnType,
cmpFunc: GetCompareFunc(columnType)}
columnStartOffset, err := header.ReadColumnOffset(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read start offset for column %d", i)
}
columnEndOffset, err := header.ReadColumnOffset(i + 1)
if err != nil {
return nil, utils.StackError(err, "Failed to read end offset for column %d", i)
}
isGoType := IsGoType(columnType)
isArrayType := IsArrayType(columnType)
currentOffset := columnStartOffset
switch columnMode {
case AllValuesDefault:
case HasNullVector:
if !isGoType {
// Null vector points to the beginning of the column data section.
nullVectorLength := utils.AlignOffset(batch.NumRows, 8) / 8
columns[i].nullVector = buffer[currentOffset : currentOffset+nullVectorLength]
currentOffset += nullVectorLength
}
fallthrough
case AllValuesPresent:
if isGoType || isArrayType {
currentOffset = utils.AlignOffset(currentOffset, 4)
offsetVectorLength := (batch.NumRows + 1) * 4
columns[i].offsetVector = buffer[currentOffset : currentOffset+offsetVectorLength]
currentOffset += offsetVectorLength
}
// Round up to 8 byte padding.
currentOffset = utils.AlignOffset(currentOffset, 8)
columns[i].valueVector = buffer[currentOffset:columnEndOffset]
}
}
batch.columns = columns
return batch, nil
}