func processCollectionColumnsForPrepareQueries()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/utils.go [788:875]


func processCollectionColumnsForPrepareQueries(input ProcessPrepareCollectionsInput) (*ProcessPrepareCollectionsOutput, error) {
	output := &ProcessPrepareCollectionsOutput{
		Unencrypted: make(map[string]interface{}),
	}
	var compleUpdateMeta *ComplexOperation

	for i, column := range input.ColumnsResponse {
		// todo validate the column exists again, in case the column was dropped after the query was prepared.
		if input.Translator.IsCollection(input.KeySpace, input.TableName, column.Name) {
			colFamily := input.Translator.GetColumnFamily(input.KeySpace, input.TableName, column.Name)
			dt, err := utilities.GetCassandraColumnType(column.CQLType)
			if err != nil {
				return nil, err
			}

			if meta, ok := input.ComplexMeta[column.Name]; ok {
				collectionType := strings.Split(column.CQLType, "<")[0]
				switch collectionType {
				case "list":
					compleUpdateMeta = meta
					if compleUpdateMeta.UpdateListIndex != "" {
						// list index operation e.g. list[index]=val
						// list update for specific index
						listType := ExtractListType(column.CQLType)
						valInInterface, err := DataConversionInInsertionIfRequired(input.Values[i].Contents, input.ProtocolV, listType, "byte")
						if err != nil {
							return nil, fmt.Errorf("error while encoding list<%s> value: %w", listType, err)
						}
						compleUpdateMeta.Value = valInInterface.([]byte)
						continue
					}
					// other cases like prepend, append for list are handled in processListColumnGeneric()
					// append is handled the same as normal list insert but in that case we are not deleting the column family
				case "set": //handling Delete for set e.g set=set-['key1','key2']
					// append case is same as normal set insert just no deletion of column family
					if meta.Delete {
						err := processDeleteOperationForMapAndSet(column, meta, input, i, output)
						if err != nil {
							return nil, err
						}
						continue
					}
				case "map":
					if meta.Append && meta.mapKey != "" { //handling update for specific key e.g map[key]=val
						mapType := dt.(datatype.MapType)
						err := processMapKeyAppend(column, meta, input, i, output, mapType.GetValueType().String())
						if err != nil {
							return nil, err
						}
						continue
					}
					if meta.Delete { //handling Delete for specific key e.g map=map-['key1','key2']
						err := processDeleteOperationForMapAndSet(column, meta, input, i, output)
						if err != nil {
							return nil, err
						}
						continue
					}
				}
			} else {
				// case of when new value is being set for collection type {e.g collection=newCollection}
				output.DelColumnFamily = append(output.DelColumnFamily, column.Name)
			}
			if err := handleCollectionColumn(column, colFamily, input.Values[i], input.ProtocolV, &output.NewColumns, &output.NewValues, compleUpdateMeta); err != nil {
				return nil, err
			}
		} else {
			valInInterface, err := DataConversionInInsertionIfRequired(input.Values[i].Contents, input.ProtocolV, column.CQLType, "byte")
			if err != nil {
				return nil, fmt.Errorf("error converting primitives: %w", err)
			}
			input.Values[i].Contents = valInInterface.([]byte)
			output.NewColumns = append(output.NewColumns, column)
			output.NewValues = append(output.NewValues, input.Values[i].Contents)
			if slices.Contains(input.PrimaryKeys, column.Name) {
				ColPrimitiveType, err := utilities.GetCassandraColumnType(column.CQLType)
				if err != nil {
					return nil, fmt.Errorf("invalid CQL Type %s err: %w", column.CQLType, err)
				}

				val, _ := utilities.DecodeBytesToCassandraColumnType(input.Values[i].Contents, ColPrimitiveType, input.ProtocolV)
				output.Unencrypted[column.Name] = val
			}
		}
		output.IndexEnd = i
	}
	return output, nil
}