func()

in client/connector.go [285:378]


func (u *UpsertBatchBuilderImpl) prepareEnumCases(isEnumArrayCol bool, tableName, columnName string, colIndex, columnID int, rows []Row, abandonRows map[int]struct{}, caseInsensitive bool, disableAutoExpand bool) error {
	enumCaseSet := make(map[string]struct{})
	for rowIndex, row := range rows {
		if _, exist := abandonRows[rowIndex]; exist {
			continue
		}
		value := row[colIndex]

		if value == nil {
			continue
		}

		abandonRow := false
		if enumCase, ok := value.(string); ok {
			if len(enumCase) > defaultStringEnumLength {
				abandonRow = true
				u.logger.With(
					"name", "prepareEnumCases",
					"error", "Enum string value is too long",
					"table", tableName,
					"columnID", columnID,
					"value", value).Debug("Enum string value is too long")
				u.metricScope.Tagged(map[string]string{"table": tableName, "columnID": strconv.Itoa(columnID)}).
					Counter("abandoned_rows_long_string").Inc(1)
			} else {
				if isEnumArrayCol {
					arrVal := make([]interface{}, 0)
					if err := json.Unmarshal([]byte(enumCase), &arrVal); err != nil {
						abandonRow = true
						u.logger.With(
							"name", "prepareEnumCases",
							"error", "Fail to unmarshal enum array value",
							"table", tableName,
							"columnID", columnID,
							"value", value).Debug("Fail to unmarshal enum array value")
						u.metricScope.Tagged(map[string]string{"table": tableName, "columnID": strconv.Itoa(columnID)}).
							Counter("abandoned_rows_enum_array_parse_error").Inc(1)
					} else {
						for _, val := range arrVal {
							if val != nil {
								item, ok := val.(string)
								if !ok {
									abandonRow = true
									u.logger.With(
										"name", "prepareEnumCases",
										"error", "Enum array value should be string",
										"table", tableName,
										"columnID", columnID,
										"value", value).Debug("Enum array value is not string")
									break
								}
								if caseInsensitive {
									item = strings.ToLower(item)
								}
								enumCaseSet[item] = struct{}{}
							}
						}
					}
				} else {
					if caseInsensitive {
						enumCase = strings.ToLower(enumCase)
					}
					enumCaseSet[enumCase] = struct{}{}
				}
			}
		} else {
			abandonRow = true
			u.logger.With(
				"name", "prepareEnumCases",
				"error", "Enum value should be string",
				"table", tableName,
				"columnID", columnID,
				"value", value).Debug("Enum value is not string")

		}
		if abandonRow {
			u.metricScope.Tagged(map[string]string{"table": tableName, "columnID": strconv.Itoa(columnID)}).
				Counter("abandoned_rows").Inc(1)
			abandonRows[rowIndex] = struct{}{}
		}
	}

	if len(enumCaseSet) > 0 {
		enumCases := make([]string, 0, len(enumCaseSet))
		for enumCase := range enumCaseSet {
			enumCases = append(enumCases, enumCase)
		}
		err := u.schemaHandler.PrepareEnumCases(tableName, columnName, enumCases)
		if err != nil {
			return err
		}
	}
	return nil
}