pkg/sql-util/BatchInsertManager.go (128 lines of code) (raw):
package sql_util
import (
"context"
"fmt"
"log/slog"
"runtime"
"sync"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"golang.org/x/sync/errgroup"
)
type BatchInsertManager struct {
mutex sync.Mutex
batch driver.Batch
Db driver.Conn
insertSql string
BatchSize int
queuedItems int
logger *slog.Logger
group *errgroup.Group
InsertContext context.Context
dependencies []*BatchInsertManager
}
func NewBatchInsertManager(insertContext context.Context, db driver.Conn, insertSql string, insertWorkerCount int, logger *slog.Logger) *BatchInsertManager {
poolCapacity := insertWorkerCount
if insertWorkerCount == -1 {
// not enough RAM (if docker has access to 4 GB on a machine where there is only 16 GB)
poolCapacity = runtime.NumCPU() - 4
if poolCapacity < 2 {
poolCapacity = 1
}
} else if poolCapacity > 99 {
poolCapacity = 99
}
logger.Info("insert pool capacity", "count", poolCapacity)
errorGroup, ctx := errgroup.WithContext(insertContext)
errorGroup.SetLimit(poolCapacity)
manager := &BatchInsertManager{
queuedItems: 0,
Db: db,
InsertContext: ctx,
insertSql: insertSql,
logger: logger,
group: errorGroup,
// large inserts leads to large memory usage, so, insert by 2000 items
BatchSize: 2000,
}
return manager
}
func (t *BatchInsertManager) AddDependency(dependency *BatchInsertManager) {
t.dependencies = append(t.dependencies, dependency)
}
func (t *BatchInsertManager) GetQueuedItemCount() int {
return t.queuedItems
}
func (t *BatchInsertManager) ScheduleSendBatch() {
batch := t.prepareForFlush()
if batch != nil {
t.group.Go(func() error {
return t.sendBatch(batch)
})
}
}
func (t *BatchInsertManager) SendBatchNow() error {
batch := t.prepareForFlush()
if batch != nil {
return t.sendBatch(batch)
}
return nil
}
func (t *BatchInsertManager) Close() error {
// flush
t.ScheduleSendBatch()
err := t.group.Wait()
if t.batch != nil {
t.logger.Error("abort batch; was not sent, but close is called")
abortErr := t.batch.Abort()
t.batch = nil
if err == nil {
err = abortErr
}
}
return err
}
func (t *BatchInsertManager) PrepareForAppend() (driver.Batch, error) {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.queuedItems >= t.BatchSize {
t.mutex.Unlock()
t.ScheduleSendBatch()
t.mutex.Lock()
} else {
t.queuedItems++
}
if t.batch == nil {
var err error
t.batch, err = t.Db.PrepareBatch(t.InsertContext, t.insertSql, driver.WithReleaseConnection())
if err != nil {
return nil, fmt.Errorf("cannot prepare batch: %w", err)
}
}
return t.batch, nil
}
func (t *BatchInsertManager) prepareForFlush() driver.Batch {
t.mutex.Lock()
defer t.mutex.Unlock()
batch := t.batch
if batch == nil {
return nil
}
t.batch = nil
queuedItems := t.queuedItems
t.queuedItems = 0
t.logger.Info("items scheduled to be sent", "count", queuedItems)
return batch
}
func (t *BatchInsertManager) sendBatch(batch driver.Batch) error {
for _, dependency := range t.dependencies {
err := dependency.SendBatchNow()
if err != nil {
t.logger.Error("cannot commit dependency", "error", err)
return err
}
}
t.logger.Info("send batch")
err := batch.Send()
if err != nil {
t.logger.Error("cannot send batch", "error", err)
return err
}
return nil
}