in client/connector.go [382:547]
func (u *UpsertBatchBuilderImpl) PrepareUpsertBatch(tableName string, columnNames []string,
updateModes []memCom.ColumnUpdateMode, rows []Row) ([]byte, int, error) {
var err error
upsertBatchBuilder := memCom.NewUpsertBatchBuilder()
schema, err := u.schemaHandler.FetchSchema(tableName)
if err != nil {
return nil, 0, err
}
if schema.Table.IsFactTable {
upsertBatchBuilder.MarkFactTable()
}
// use abandonRows to record abandoned row index due to invalid data
abandonRows := make(map[int]struct{})
for colIndex, columnName := range columnNames {
columnID, exist := schema.ColumnDict[columnName]
if !exist {
continue
}
column := schema.Table.Columns[columnID]
// following conditions only overwrite is supported:
// 1. dimension table (TODO: might support min/max in the future if needed)
// 2. primary key column
// 3. archiving sort column
// 4. data type not in uint8, int8, uint16, int16, uint32, int32, float32
if (!schema.Table.IsFactTable ||
utils.IndexOfInt(schema.Table.PrimaryKeyColumns, columnID) >= 0 ||
utils.IndexOfInt(schema.Table.ArchivingSortColumns, columnID) >= 0 ||
schema.Table.Columns[columnID].IsOverwriteOnlyDataType()) &&
updateModes[colIndex] > memCom.UpdateForceOverwrite {
return nil, 0, utils.StackError(nil, "column %s only supports overwrite", columnName)
}
dataType := memCom.DataTypeForColumn(column)
if err = upsertBatchBuilder.AddColumnWithUpdateMode(columnID, dataType, updateModes[colIndex]); err != nil {
return nil, 0, err
}
if column.IsEnumBasedColumn() {
if err = u.prepareEnumCases(column.IsEnumArrayColumn(), tableName, columnName, colIndex, columnID, rows, abandonRows, column.CaseInsensitive, column.DisableAutoExpand); err != nil {
return nil, 0, err
}
}
}
for rowIndex, row := range rows {
if _, exist := abandonRows[rowIndex]; exist {
continue
}
upsertBatchBuilder.AddRow()
upsertBatchColumnIndex := 0
for inputColIndex, columnName := range columnNames {
columnID, exist := schema.ColumnDict[columnName]
if !exist {
continue
}
column := schema.Table.Columns[columnID]
value := row[inputColIndex]
// prevent primary key being nil
if value == nil && utils.IndexOfInt(schema.Table.PrimaryKeyColumns, columnID) >= 0 {
upsertBatchBuilder.RemoveRow()
u.logger.With(
"name", "PrepareUpsertBatch",
"table", tableName,
"columnID", columnID,
"value", value).Error("PrimaryKey column is nil")
break
}
// skip rows if time column is nil for fact table
if value == nil && schema.Table.IsFactTable && !schema.Table.Config.AllowMissingEventTime && columnID == 0 {
upsertBatchBuilder.RemoveRow()
u.logger.With(
"name", "PrepareUpsertBatch",
"table", tableName,
"columnID", columnID,
"value", value).Error("Time column is nil")
break
}
if column.IsEnumBasedColumn() {
if column.IsEnumArrayColumn() {
if value != nil {
arrVal := make([]interface{}, 0)
// no error handling here as it should already be covered in prepareEnumCases
json.Unmarshal([]byte(value.(string)), &arrVal)
hasError := false
for i := 0; i < len(arrVal); i++ {
arrVal[i], err = u.schemaHandler.TranslateEnum(tableName, columnID, arrVal[i], column.CaseInsensitive)
if err != nil {
hasError = true
upsertBatchBuilder.RemoveRow()
u.logger.With(
"name", "prepareUpsertBatch",
"error", err.Error(),
"table", tableName,
"columnID", columnID,
"value", arrVal[i]).Error("Failed to translate enum")
break
}
if arrVal[i] == -1 {
arrVal[i] = nil
}
}
if hasError {
break
}
value = arrVal
}
} else {
value, err = u.schemaHandler.TranslateEnum(tableName, columnID, value, column.CaseInsensitive)
if err != nil {
upsertBatchBuilder.RemoveRow()
u.logger.With(
"name", "prepareUpsertBatch",
"error", err.Error(),
"table", tableName,
"columnID", columnID,
"value", value).Error("Failed to translate enum")
break
}
// If enum value is not found from predefined enum cases and default value is not set, we set it to nil.
if value == -1 {
value = nil
}
}
}
// Set value to the last row.
// compute hll value to insert
if column.HLLConfig.IsHLLColumn {
// here use original column data type to compute hll value
value, err = computeHLLValue(memCom.DataTypeFromString(column.Type), value)
if err != nil {
upsertBatchBuilder.RemoveRow()
u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
break
}
if err = upsertBatchBuilder.SetValue(upsertBatchBuilder.NumRows-1, upsertBatchColumnIndex, value); err != nil {
upsertBatchBuilder.RemoveRow()
u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
break
}
} else {
// directly insert value
if err = upsertBatchBuilder.SetValue(upsertBatchBuilder.NumRows-1, upsertBatchColumnIndex, value); err != nil {
upsertBatchBuilder.RemoveRow()
u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
break
}
}
upsertBatchColumnIndex++
}
}
batchBytes, err := upsertBatchBuilder.ToByteArray()
return batchBytes, upsertBatchBuilder.NumRows, err
}