func readUpsertBatch()

in memstore/common/upsert_batch.go [437:538]


func readUpsertBatch(buffer []byte) (*UpsertBatch, error) {
	batch := &UpsertBatch{
		buffer:      buffer,
		columnsByID: make(map[int]int),
	}

	// numRows.
	reader := utils.NewBufferReader(buffer)

	numRows, err := reader.ReadInt32(4)
	if err != nil {
		return nil, utils.StackError(err, "Failed to read number of rows")
	}
	if numRows < 0 {
		return nil, utils.StackError(err, "Number of rows should be >= 0")
	}

	batch.NumRows = int(numRows)

	// numColumns.
	numColumns, err := reader.ReadUint16(4 + 4)
	if err != nil {
		return nil, utils.StackError(err, "Failed to read number of columns")
	}
	batch.NumColumns = int(numColumns)

	// 2 byte num columns
	arrivalTime, err := reader.ReadUint32(4 + 4 + 2 + 14)
	if err != nil {
		return nil, utils.StackError(err, "Failed to read arrival time")
	}
	batch.ArrivalTime = arrivalTime

	// Header too small, error out.
	if len(buffer) < 28+ColumnHeaderSize(batch.NumColumns) {
		return nil, utils.StackError(nil, "Invalid upsert batch data with incomplete header section")
	}

	header := NewUpsertBatchHeader(buffer[28:], batch.NumColumns)

	columns := make([]*columnReader, batch.NumColumns)
	for i := range columns {
		columnType, err := header.ReadColumnType(i)
		if err != nil {
			return nil, utils.StackError(err, "Failed to read type for column %d", i)
		}

		columnID, err := header.ReadColumnID(i)
		if err != nil {
			return nil, utils.StackError(err, "Failed to read id for column %d", i)
		}
		batch.columnsByID[columnID] = i

		columnMode, columnUpdateMode, err := header.ReadColumnFlag(i)
		if err != nil {
			return nil, utils.StackError(err, "Failed to read mode for column %d", i)
		}

		columns[i] = &columnReader{columnID: columnID, columnMode: columnMode, columnUpdateMode: columnUpdateMode, dataType: columnType,
			cmpFunc: GetCompareFunc(columnType)}

		columnStartOffset, err := header.ReadColumnOffset(i)
		if err != nil {
			return nil, utils.StackError(err, "Failed to read start offset for column %d", i)
		}

		columnEndOffset, err := header.ReadColumnOffset(i + 1)
		if err != nil {
			return nil, utils.StackError(err, "Failed to read end offset for column %d", i)
		}

		isGoType := IsGoType(columnType)
		isArrayType := IsArrayType(columnType)

		currentOffset := columnStartOffset
		switch columnMode {
		case AllValuesDefault:
		case HasNullVector:
			if !isGoType {
				// Null vector points to the beginning of the column data section.
				nullVectorLength := utils.AlignOffset(batch.NumRows, 8) / 8
				columns[i].nullVector = buffer[currentOffset : currentOffset+nullVectorLength]
				currentOffset += nullVectorLength
			}
			fallthrough
		case AllValuesPresent:
			if isGoType || isArrayType {
				currentOffset = utils.AlignOffset(currentOffset, 4)
				offsetVectorLength := (batch.NumRows + 1) * 4
				columns[i].offsetVector = buffer[currentOffset : currentOffset+offsetVectorLength]
				currentOffset += offsetVectorLength
			}
			// Round up to 8 byte padding.
			currentOffset = utils.AlignOffset(currentOffset, 8)
			columns[i].valueVector = buffer[currentOffset:columnEndOffset]
		}
	}
	batch.columns = columns

	return batch, nil

}