in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [163:248]
func (btc *BigtableClient) mutateRow(ctx context.Context, tableName, rowKey string, columns []translator.Column, values []any, deleteColumnFamilies []string, deleteQualifiers []translator.Column, timestamp bigtable.Timestamp, ifSpec translator.IfSpec, keyspace string, ComplexOperation map[string]*translator.ComplexOperation) (*message.RowsResult, error) {
otelgo.AddAnnotation(ctx, applyingBigtableMutation)
mut := bigtable.NewMutation()
btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(rowKey))))
client, ok := btc.Clients[keyspace]
if !ok {
return nil, fmt.Errorf("invalid keySpace - `%s`", keyspace)
}
tbl := client.Open(tableName)
if timestamp == 0 {
timestamp = bigtable.Timestamp(bigtable.Now().Time().UnixMicro())
}
// Delete column families
for _, cf := range deleteColumnFamilies {
mut.DeleteCellsInFamily(cf)
}
// Handle complex updates
for cf, meta := range ComplexOperation {
if meta.UpdateListIndex != "" {
index, err := strconv.Atoi(meta.UpdateListIndex)
if err != nil {
return nil, err
}
reqTimestamp, err := btc.getIndexOpTimestamp(ctx, tableName, rowKey, cf, keyspace, index)
if err != nil {
return nil, err
}
mut.Set(cf, reqTimestamp, timestamp, meta.Value)
}
if meta.ListDelete {
if err := btc.setMutationforListDelete(ctx, tableName, rowKey, cf, keyspace, meta.ListDeleteValues, mut); err != nil {
return nil, err
}
}
}
// Delete specific column qualifiers
for _, q := range deleteQualifiers {
mut.DeleteCellsInColumn(q.ColumnFamily, q.Name)
}
// Set values for columns
for i, column := range columns {
if bv, ok := values[i].([]byte); ok {
mut.Set(column.ColumnFamily, column.Name, timestamp, bv)
} else {
btc.Logger.Error("Value is not of type []byte", zap.String("column", column.Name), zap.Any("value", values[i]))
return nil, fmt.Errorf("value for column %s is not of type []byte", column.Name)
}
}
if ifSpec.IfExists || ifSpec.IfNotExists {
predicateFilter := bigtable.CellsPerRowLimitFilter(1)
matched := true
conditionalMutation := bigtable.NewCondMutation(predicateFilter, mut, nil)
if ifSpec.IfNotExists {
conditionalMutation = bigtable.NewCondMutation(predicateFilter, nil, mut)
}
err := tbl.Apply(ctx, rowKey, conditionalMutation, bigtable.GetCondMutationResult(&matched))
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
if err != nil {
return nil, err
}
return GenerateAppliedRowsResult(keyspace, tableName, ifSpec.IfExists == matched), nil
}
// If no conditions, apply the mutation directly
err := tbl.Apply(ctx, rowKey, mut)
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
if err != nil {
return nil, err
}
return &message.RowsResult{
Metadata: &message.RowsMetadata{
LastContinuousPage: true,
},
}, nil
}