func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [163:248]


func (btc *BigtableClient) mutateRow(ctx context.Context, tableName, rowKey string, columns []translator.Column, values []any, deleteColumnFamilies []string, deleteQualifiers []translator.Column, timestamp bigtable.Timestamp, ifSpec translator.IfSpec, keyspace string, ComplexOperation map[string]*translator.ComplexOperation) (*message.RowsResult, error) {
	otelgo.AddAnnotation(ctx, applyingBigtableMutation)
	mut := bigtable.NewMutation()

	btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(rowKey))))

	client, ok := btc.Clients[keyspace]
	if !ok {
		return nil, fmt.Errorf("invalid keySpace - `%s`", keyspace)
	}

	tbl := client.Open(tableName)
	if timestamp == 0 {
		timestamp = bigtable.Timestamp(bigtable.Now().Time().UnixMicro())
	}

	// Delete column families
	for _, cf := range deleteColumnFamilies {
		mut.DeleteCellsInFamily(cf)
	}

	// Handle complex updates
	for cf, meta := range ComplexOperation {
		if meta.UpdateListIndex != "" {
			index, err := strconv.Atoi(meta.UpdateListIndex)
			if err != nil {
				return nil, err
			}
			reqTimestamp, err := btc.getIndexOpTimestamp(ctx, tableName, rowKey, cf, keyspace, index)
			if err != nil {
				return nil, err
			}
			mut.Set(cf, reqTimestamp, timestamp, meta.Value)
		}
		if meta.ListDelete {
			if err := btc.setMutationforListDelete(ctx, tableName, rowKey, cf, keyspace, meta.ListDeleteValues, mut); err != nil {
				return nil, err
			}
		}
	}

	// Delete specific column qualifiers
	for _, q := range deleteQualifiers {
		mut.DeleteCellsInColumn(q.ColumnFamily, q.Name)
	}

	// Set values for columns
	for i, column := range columns {
		if bv, ok := values[i].([]byte); ok {
			mut.Set(column.ColumnFamily, column.Name, timestamp, bv)
		} else {
			btc.Logger.Error("Value is not of type []byte", zap.String("column", column.Name), zap.Any("value", values[i]))
			return nil, fmt.Errorf("value for column %s is not of type []byte", column.Name)
		}
	}

	if ifSpec.IfExists || ifSpec.IfNotExists {
		predicateFilter := bigtable.CellsPerRowLimitFilter(1)
		matched := true
		conditionalMutation := bigtable.NewCondMutation(predicateFilter, mut, nil)
		if ifSpec.IfNotExists {
			conditionalMutation = bigtable.NewCondMutation(predicateFilter, nil, mut)
		}

		err := tbl.Apply(ctx, rowKey, conditionalMutation, bigtable.GetCondMutationResult(&matched))
		otelgo.AddAnnotation(ctx, bigtableMutationApplied)
		if err != nil {
			return nil, err
		}

		return GenerateAppliedRowsResult(keyspace, tableName, ifSpec.IfExists == matched), nil
	}

	// If no conditions, apply the mutation directly
	err := tbl.Apply(ctx, rowKey, mut)
	otelgo.AddAnnotation(ctx, bigtableMutationApplied)
	if err != nil {
		return nil, err
	}

	return &message.RowsResult{
		Metadata: &message.RowsMetadata{
			LastContinuousPage: true,
		},
	}, nil
}