in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/utils.go [2017:2093]
func convertAllValuesToRowKeyType(primaryKeys []schemaMapping.Column, values map[string]interface{}) (map[string]interface{}, error) {
result := make(map[string]interface{})
for _, pmk := range primaryKeys {
if !pmk.IsPrimaryKey {
continue
}
value, exists := values[pmk.ColumnName]
if !exists {
return nil, fmt.Errorf("missing primary key `%s`", pmk.ColumnName)
}
cqlType, err := utilities.GetCassandraColumnType(pmk.ColumnType)
if err != nil {
return nil, err
}
switch cqlType {
case datatype.Int:
switch v := value.(type) {
// bigtable row keys don't support int32 so convert all int32 values to int64
case int:
result[pmk.ColumnName] = int64(v)
case int32:
result[pmk.ColumnName] = int64(v)
case int64:
result[pmk.ColumnName] = v
case string:
i, err := strconv.Atoi(v)
if err != nil {
return nil, fmt.Errorf("failed to convert Int value %s for key %s", value.(string), pmk.ColumnName)
}
result[pmk.ColumnName] = int64(i)
default:
return nil, fmt.Errorf("failed to convert %T to Int for key %s", value, pmk.ColumnName)
}
case datatype.Bigint:
switch v := value.(type) {
case int:
result[pmk.ColumnName] = int64(v)
case int32:
result[pmk.ColumnName] = int64(v)
case int64:
result[pmk.ColumnName] = value
case string:
i, err := strconv.ParseInt(v, 10, 0)
if err != nil {
return nil, fmt.Errorf("failed to convert BigInt value %s for key %s", value.(string), pmk.ColumnName)
}
result[pmk.ColumnName] = i
default:
return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
}
case datatype.Varchar:
switch v := value.(type) {
case string:
// todo move this validation to all columns not just keys
if !utf8.Valid([]byte(v)) {
return nil, fmt.Errorf("invalid utf8 value provided for varchar row key field %s", pmk.ColumnName)
}
result[pmk.ColumnName] = v
default:
return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
}
case datatype.Blob:
switch v := value.(type) {
case string:
result[pmk.ColumnName] = v
default:
return nil, fmt.Errorf("failed to convert %T to BigInt for key %s", value, pmk.ColumnName)
}
default:
return nil, fmt.Errorf("unsupported primary key type %s for key %s", cqlType.String(), pmk.ColumnName)
}
}
return result, nil
}