func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [734:813]


func (btc *BigtableClient) ApplyBulkMutation(ctx context.Context, tableName string, mutationData []MutationData, keyspace string) (BulkOperationResponse, error) {
	rowKeyToMutationMap := make(map[string]*bigtable.Mutation)
	for _, md := range mutationData {
		btc.Logger.Info("mutating row BULK", zap.String("key", hex.EncodeToString([]byte(md.RowKey))))
		if _, exists := rowKeyToMutationMap[md.RowKey]; !exists {
			rowKeyToMutationMap[md.RowKey] = bigtable.NewMutation()
		}
		mut := rowKeyToMutationMap[md.RowKey]
		switch md.MutationType {
		case mutationTypeInsert:
			{
				for _, column := range md.Columns {
					mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents)
				}
			}
		case mutationTypeDelete:
			{
				mut.DeleteRow()
			}
		case mutationTypeDeleteColumnFamily:
			{
				mut.DeleteCellsInFamily(md.ColumnFamily)
			}
		case mutationTypeUpdate:
			{
				for _, column := range md.Columns {
					mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents)
				}
			}
		default:
			return BulkOperationResponse{
				FailedRows: "",
			}, fmt.Errorf("invalid mutation type `%s`", md.MutationType)
		}
	}
	// create mutations from mutation data
	var mutations []*bigtable.Mutation
	var rowKeys []string

	for key, mutation := range rowKeyToMutationMap {
		mutations = append(mutations, mutation)
		rowKeys = append(rowKeys, key)
	}
	otelgo.AddAnnotation(ctx, applyingBulkMutation)

	client, ok := btc.Clients[keyspace]
	if !ok {
		return BulkOperationResponse{
			FailedRows: "All Rows are failed",
		}, fmt.Errorf("invalid keySpace - `%s`", keyspace)
	}

	tbl := client.Open(tableName)
	errs, err := tbl.ApplyBulk(ctx, rowKeys, mutations)

	if err != nil {
		return BulkOperationResponse{
			FailedRows: "All Rows are failed",
		}, fmt.Errorf("ApplyBulk: %w", err)
	}

	var failedRows []string
	for i, e := range errs {
		if e != nil {
			failedRows = append(failedRows, rowKeys[i])
		}
	}
	var res BulkOperationResponse
	if len(failedRows) > 0 {
		res = BulkOperationResponse{
			FailedRows: fmt.Sprintf("failed rowkeys: %v", failedRows),
		}
	} else {
		res = BulkOperationResponse{
			FailedRows: "",
		}
	}
	otelgo.AddAnnotation(ctx, bulkMutationApplied)
	return res, nil
}