pkg/sql-util/BatchInsertManager.go (128 lines of code) (raw):

package sql_util import ( "context" "fmt" "log/slog" "runtime" "sync" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "golang.org/x/sync/errgroup" ) type BatchInsertManager struct { mutex sync.Mutex batch driver.Batch Db driver.Conn insertSql string BatchSize int queuedItems int logger *slog.Logger group *errgroup.Group InsertContext context.Context dependencies []*BatchInsertManager } func NewBatchInsertManager(insertContext context.Context, db driver.Conn, insertSql string, insertWorkerCount int, logger *slog.Logger) *BatchInsertManager { poolCapacity := insertWorkerCount if insertWorkerCount == -1 { // not enough RAM (if docker has access to 4 GB on a machine where there is only 16 GB) poolCapacity = runtime.NumCPU() - 4 if poolCapacity < 2 { poolCapacity = 1 } } else if poolCapacity > 99 { poolCapacity = 99 } logger.Info("insert pool capacity", "count", poolCapacity) errorGroup, ctx := errgroup.WithContext(insertContext) errorGroup.SetLimit(poolCapacity) manager := &BatchInsertManager{ queuedItems: 0, Db: db, InsertContext: ctx, insertSql: insertSql, logger: logger, group: errorGroup, // large inserts leads to large memory usage, so, insert by 2000 items BatchSize: 2000, } return manager } func (t *BatchInsertManager) AddDependency(dependency *BatchInsertManager) { t.dependencies = append(t.dependencies, dependency) } func (t *BatchInsertManager) GetQueuedItemCount() int { return t.queuedItems } func (t *BatchInsertManager) ScheduleSendBatch() { batch := t.prepareForFlush() if batch != nil { t.group.Go(func() error { return t.sendBatch(batch) }) } } func (t *BatchInsertManager) SendBatchNow() error { batch := t.prepareForFlush() if batch != nil { return t.sendBatch(batch) } return nil } func (t *BatchInsertManager) Close() error { // flush t.ScheduleSendBatch() err := t.group.Wait() if t.batch != nil { t.logger.Error("abort batch; was not sent, but close is called") abortErr := t.batch.Abort() t.batch = nil if err == nil { err = abortErr } } return err } func (t *BatchInsertManager) PrepareForAppend() (driver.Batch, error) { t.mutex.Lock() defer t.mutex.Unlock() if t.queuedItems >= t.BatchSize { t.mutex.Unlock() t.ScheduleSendBatch() t.mutex.Lock() } else { t.queuedItems++ } if t.batch == nil { var err error t.batch, err = t.Db.PrepareBatch(t.InsertContext, t.insertSql, driver.WithReleaseConnection()) if err != nil { return nil, fmt.Errorf("cannot prepare batch: %w", err) } } return t.batch, nil } func (t *BatchInsertManager) prepareForFlush() driver.Batch { t.mutex.Lock() defer t.mutex.Unlock() batch := t.batch if batch == nil { return nil } t.batch = nil queuedItems := t.queuedItems t.queuedItems = 0 t.logger.Info("items scheduled to be sent", "count", queuedItems) return batch } func (t *BatchInsertManager) sendBatch(batch driver.Batch) error { for _, dependency := range t.dependencies { err := dependency.SendBatchNow() if err != nil { t.logger.Error("cannot commit dependency", "error", err) return err } } t.logger.Info("send batch") err := batch.Send() if err != nil { t.logger.Error("cannot send batch", "error", err) return err } return nil }