func()

in client/connector.go [382:547]


func (u *UpsertBatchBuilderImpl) PrepareUpsertBatch(tableName string, columnNames []string,
	updateModes []memCom.ColumnUpdateMode, rows []Row) ([]byte, int, error) {
	var err error
	upsertBatchBuilder := memCom.NewUpsertBatchBuilder()

	schema, err := u.schemaHandler.FetchSchema(tableName)
	if err != nil {
		return nil, 0, err
	}

	if schema.Table.IsFactTable {
		upsertBatchBuilder.MarkFactTable()
	}

	// use abandonRows to record abandoned row index due to invalid data
	abandonRows := make(map[int]struct{})

	for colIndex, columnName := range columnNames {
		columnID, exist := schema.ColumnDict[columnName]
		if !exist {
			continue
		}
		column := schema.Table.Columns[columnID]

		// following conditions only overwrite is supported:
		// 1. dimension table (TODO: might support min/max in the future if needed)
		// 2. primary key column
		// 3. archiving sort column
		// 4. data type not in uint8, int8, uint16, int16, uint32, int32, float32
		if (!schema.Table.IsFactTable ||
			utils.IndexOfInt(schema.Table.PrimaryKeyColumns, columnID) >= 0 ||
			utils.IndexOfInt(schema.Table.ArchivingSortColumns, columnID) >= 0 ||
			schema.Table.Columns[columnID].IsOverwriteOnlyDataType()) &&
			updateModes[colIndex] > memCom.UpdateForceOverwrite {
			return nil, 0, utils.StackError(nil, "column %s only supports overwrite", columnName)
		}

		dataType := memCom.DataTypeForColumn(column)
		if err = upsertBatchBuilder.AddColumnWithUpdateMode(columnID, dataType, updateModes[colIndex]); err != nil {
			return nil, 0, err
		}

		if column.IsEnumBasedColumn() {
			if err = u.prepareEnumCases(column.IsEnumArrayColumn(), tableName, columnName, colIndex, columnID, rows, abandonRows, column.CaseInsensitive, column.DisableAutoExpand); err != nil {
				return nil, 0, err
			}
		}
	}

	for rowIndex, row := range rows {
		if _, exist := abandonRows[rowIndex]; exist {
			continue
		}
		upsertBatchBuilder.AddRow()

		upsertBatchColumnIndex := 0
		for inputColIndex, columnName := range columnNames {
			columnID, exist := schema.ColumnDict[columnName]
			if !exist {
				continue
			}
			column := schema.Table.Columns[columnID]

			value := row[inputColIndex]

			// prevent primary key being nil
			if value == nil && utils.IndexOfInt(schema.Table.PrimaryKeyColumns, columnID) >= 0 {
				upsertBatchBuilder.RemoveRow()
				u.logger.With(
					"name", "PrepareUpsertBatch",
					"table", tableName,
					"columnID", columnID,
					"value", value).Error("PrimaryKey column is nil")
				break
			}

			// skip rows if time column is nil for fact table
			if value == nil && schema.Table.IsFactTable && !schema.Table.Config.AllowMissingEventTime && columnID == 0 {
				upsertBatchBuilder.RemoveRow()
				u.logger.With(
					"name", "PrepareUpsertBatch",
					"table", tableName,
					"columnID", columnID,
					"value", value).Error("Time column is nil")
				break
			}

			if column.IsEnumBasedColumn() {
				if column.IsEnumArrayColumn() {
					if value != nil {
						arrVal := make([]interface{}, 0)
						// no error handling here as it should already be covered in prepareEnumCases
						json.Unmarshal([]byte(value.(string)), &arrVal)
						hasError := false
						for i := 0; i < len(arrVal); i++ {
							arrVal[i], err = u.schemaHandler.TranslateEnum(tableName, columnID, arrVal[i], column.CaseInsensitive)
							if err != nil {
								hasError = true
								upsertBatchBuilder.RemoveRow()
								u.logger.With(
									"name", "prepareUpsertBatch",
									"error", err.Error(),
									"table", tableName,
									"columnID", columnID,
									"value", arrVal[i]).Error("Failed to translate enum")
								break
							}
							if arrVal[i] == -1 {
								arrVal[i] = nil
							}
						}
						if hasError {
							break
						}
						value = arrVal
					}
				} else {
					value, err = u.schemaHandler.TranslateEnum(tableName, columnID, value, column.CaseInsensitive)
					if err != nil {
						upsertBatchBuilder.RemoveRow()
						u.logger.With(
							"name", "prepareUpsertBatch",
							"error", err.Error(),
							"table", tableName,
							"columnID", columnID,
							"value", value).Error("Failed to translate enum")
						break
					}

					// If enum value is not found from predefined enum cases and default value is not set, we set it to nil.
					if value == -1 {
						value = nil
					}
				}
			}

			// Set value to the last row.
			// compute hll value to insert
			if column.HLLConfig.IsHLLColumn {
				// here use original column data type to compute hll value
				value, err = computeHLLValue(memCom.DataTypeFromString(column.Type), value)
				if err != nil {
					upsertBatchBuilder.RemoveRow()
					u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
					break
				}
				if err = upsertBatchBuilder.SetValue(upsertBatchBuilder.NumRows-1, upsertBatchColumnIndex, value); err != nil {
					upsertBatchBuilder.RemoveRow()
					u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
					break
				}
			} else {
				// directly insert value
				if err = upsertBatchBuilder.SetValue(upsertBatchBuilder.NumRows-1, upsertBatchColumnIndex, value); err != nil {
					upsertBatchBuilder.RemoveRow()
					u.logger.With("name", "PrepareUpsertBatch", "error", err.Error(), "table", tableName, "columnID", columnID, "value", value).Error("Failed to set value")
					break
				}
			}
			upsertBatchColumnIndex++
		}
	}

	batchBytes, err := upsertBatchBuilder.ToByteArray()
	return batchBytes, upsertBatchBuilder.NumRows, err
}