in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/utils.go [788:875]
func processCollectionColumnsForPrepareQueries(input ProcessPrepareCollectionsInput) (*ProcessPrepareCollectionsOutput, error) {
output := &ProcessPrepareCollectionsOutput{
Unencrypted: make(map[string]interface{}),
}
var compleUpdateMeta *ComplexOperation
for i, column := range input.ColumnsResponse {
// todo validate the column exists again, in case the column was dropped after the query was prepared.
if input.Translator.IsCollection(input.KeySpace, input.TableName, column.Name) {
colFamily := input.Translator.GetColumnFamily(input.KeySpace, input.TableName, column.Name)
dt, err := utilities.GetCassandraColumnType(column.CQLType)
if err != nil {
return nil, err
}
if meta, ok := input.ComplexMeta[column.Name]; ok {
collectionType := strings.Split(column.CQLType, "<")[0]
switch collectionType {
case "list":
compleUpdateMeta = meta
if compleUpdateMeta.UpdateListIndex != "" {
// list index operation e.g. list[index]=val
// list update for specific index
listType := ExtractListType(column.CQLType)
valInInterface, err := DataConversionInInsertionIfRequired(input.Values[i].Contents, input.ProtocolV, listType, "byte")
if err != nil {
return nil, fmt.Errorf("error while encoding list<%s> value: %w", listType, err)
}
compleUpdateMeta.Value = valInInterface.([]byte)
continue
}
// other cases like prepend, append for list are handled in processListColumnGeneric()
// append is handled the same as normal list insert but in that case we are not deleting the column family
case "set": //handling Delete for set e.g set=set-['key1','key2']
// append case is same as normal set insert just no deletion of column family
if meta.Delete {
err := processDeleteOperationForMapAndSet(column, meta, input, i, output)
if err != nil {
return nil, err
}
continue
}
case "map":
if meta.Append && meta.mapKey != "" { //handling update for specific key e.g map[key]=val
mapType := dt.(datatype.MapType)
err := processMapKeyAppend(column, meta, input, i, output, mapType.GetValueType().String())
if err != nil {
return nil, err
}
continue
}
if meta.Delete { //handling Delete for specific key e.g map=map-['key1','key2']
err := processDeleteOperationForMapAndSet(column, meta, input, i, output)
if err != nil {
return nil, err
}
continue
}
}
} else {
// case of when new value is being set for collection type {e.g collection=newCollection}
output.DelColumnFamily = append(output.DelColumnFamily, column.Name)
}
if err := handleCollectionColumn(column, colFamily, input.Values[i], input.ProtocolV, &output.NewColumns, &output.NewValues, compleUpdateMeta); err != nil {
return nil, err
}
} else {
valInInterface, err := DataConversionInInsertionIfRequired(input.Values[i].Contents, input.ProtocolV, column.CQLType, "byte")
if err != nil {
return nil, fmt.Errorf("error converting primitives: %w", err)
}
input.Values[i].Contents = valInInterface.([]byte)
output.NewColumns = append(output.NewColumns, column)
output.NewValues = append(output.NewValues, input.Values[i].Contents)
if slices.Contains(input.PrimaryKeys, column.Name) {
ColPrimitiveType, err := utilities.GetCassandraColumnType(column.CQLType)
if err != nil {
return nil, fmt.Errorf("invalid CQL Type %s err: %w", column.CQLType, err)
}
val, _ := utilities.DecodeBytesToCassandraColumnType(input.Values[i].Contents, ColPrimitiveType, input.ProtocolV)
output.Unencrypted[column.Name] = val
}
}
output.IndexEnd = i
}
return output, nil
}