in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [734:813]
func (btc *BigtableClient) ApplyBulkMutation(ctx context.Context, tableName string, mutationData []MutationData, keyspace string) (BulkOperationResponse, error) {
rowKeyToMutationMap := make(map[string]*bigtable.Mutation)
for _, md := range mutationData {
btc.Logger.Info("mutating row BULK", zap.String("key", hex.EncodeToString([]byte(md.RowKey))))
if _, exists := rowKeyToMutationMap[md.RowKey]; !exists {
rowKeyToMutationMap[md.RowKey] = bigtable.NewMutation()
}
mut := rowKeyToMutationMap[md.RowKey]
switch md.MutationType {
case mutationTypeInsert:
{
for _, column := range md.Columns {
mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents)
}
}
case mutationTypeDelete:
{
mut.DeleteRow()
}
case mutationTypeDeleteColumnFamily:
{
mut.DeleteCellsInFamily(md.ColumnFamily)
}
case mutationTypeUpdate:
{
for _, column := range md.Columns {
mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents)
}
}
default:
return BulkOperationResponse{
FailedRows: "",
}, fmt.Errorf("invalid mutation type `%s`", md.MutationType)
}
}
// create mutations from mutation data
var mutations []*bigtable.Mutation
var rowKeys []string
for key, mutation := range rowKeyToMutationMap {
mutations = append(mutations, mutation)
rowKeys = append(rowKeys, key)
}
otelgo.AddAnnotation(ctx, applyingBulkMutation)
client, ok := btc.Clients[keyspace]
if !ok {
return BulkOperationResponse{
FailedRows: "All Rows are failed",
}, fmt.Errorf("invalid keySpace - `%s`", keyspace)
}
tbl := client.Open(tableName)
errs, err := tbl.ApplyBulk(ctx, rowKeys, mutations)
if err != nil {
return BulkOperationResponse{
FailedRows: "All Rows are failed",
}, fmt.Errorf("ApplyBulk: %w", err)
}
var failedRows []string
for i, e := range errs {
if e != nil {
failedRows = append(failedRows, rowKeys[i])
}
}
var res BulkOperationResponse
if len(failedRows) > 0 {
res = BulkOperationResponse{
FailedRows: fmt.Sprintf("failed rowkeys: %v", failedRows),
}
} else {
res = BulkOperationResponse{
FailedRows: "",
}
}
otelgo.AddAnnotation(ctx, bulkMutationApplied)
return res, nil
}