func convertAllValuesToRowKeyType()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/utils.go [2017:2093]


func convertAllValuesToRowKeyType(primaryKeys []schemaMapping.Column, values map[string]interface{}) (map[string]interface{}, error) {
	result := make(map[string]interface{})
	for _, pmk := range primaryKeys {
		if !pmk.IsPrimaryKey {
			continue
		}

		value, exists := values[pmk.ColumnName]
		if !exists {
			return nil, fmt.Errorf("missing primary key `%s`", pmk.ColumnName)
		}

		cqlType, err := utilities.GetCassandraColumnType(pmk.ColumnType)
		if err != nil {
			return nil, err
		}

		switch cqlType {
		case datatype.Int:
			switch v := value.(type) {
			// bigtable row keys don't support int32 so convert all int32 values to int64
			case int:
				result[pmk.ColumnName] = int64(v)
			case int32:
				result[pmk.ColumnName] = int64(v)
			case int64:
				result[pmk.ColumnName] = v
			case string:
				i, err := strconv.Atoi(v)
				if err != nil {
					return nil, fmt.Errorf("failed to convert Int value %s for key %s", value.(string), pmk.ColumnName)
				}
				result[pmk.ColumnName] = int64(i)
			default:
				return nil, fmt.Errorf("failed to convert %T to Int for key %s", value, pmk.ColumnName)
			}
		case datatype.Bigint:
			switch v := value.(type) {
			case int:
				result[pmk.ColumnName] = int64(v)
			case int32:
				result[pmk.ColumnName] = int64(v)
			case int64:
				result[pmk.ColumnName] = value
			case string:
				i, err := strconv.ParseInt(v, 10, 0)
				if err != nil {
					return nil, fmt.Errorf("failed to convert BigInt value %s for key %s", value.(string), pmk.ColumnName)
				}
				result[pmk.ColumnName] = i
			default:
				return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
			}
		case datatype.Varchar:
			switch v := value.(type) {
			case string:
				// todo move this validation to all columns not just keys
				if !utf8.Valid([]byte(v)) {
					return nil, fmt.Errorf("invalid utf8 value provided for varchar row key field %s", pmk.ColumnName)
				}
				result[pmk.ColumnName] = v
			default:
				return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
			}
		case datatype.Blob:
			switch v := value.(type) {
			case string:
				result[pmk.ColumnName] = v
			default:
				return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
			}
		default:
			return nil, fmt.Errorf("unsupported primary key type %s for key %s", cqlType.String(), pmk.ColumnName)
		}
	}
	return result, nil
}