spanner/writer/batchwriter.go (235 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package spanner provides high-level abstractions for working with
// Cloud Spanner that are not available from the core Cloud Spanner
// libraries.
package writer
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
sp "cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
)
// Parameters used to control building batches to write to Spanner.
// Batches are built by adding rows until we hit one of the thresholds.
// Bigger batches are usually more efficient, but we need to be careful
// not to exceed Spanner's limits. Also, sending huge RPCs is potentially
// unreliable.
const (
countThreshold = 10 * 1000 // Spanner per-operation limit is 20,000.
byteThreshold = 20 * 1 << 20 // Spanner per-operation limit is 100MB.
)
// BatchWriter accumulates rows of data (via AddRow) and assembles them
// into batches that it asynchronously writes to Spanner. Rows are
// written to Spanner using insert semantics i.e. if a row already exists
// in the database, the row will fail with error 'AlreadyExists'. If
// Spanner returns an error for a batch, BatchWriter splits the batch
// into smaller chunks to retry, as it attempts to isolate which row(s)
// in a batch is bad. BatchWriter respects Spanner's limits on byte size
// and mutation count and has configurable limits on the number of
// in-progress writes, amount of data buffered and retry behavior.
// BatchWriter is not threadsafe: only one call to AddRow or Flush should
// be active at any time. See ExampleBatchWriter (batchwriter_test.go)
// for sample usage code.
type BatchWriter struct {
rows []*row // Buffered rows.
rBytes int64 // Estimate of bytes for buffered rows.
rCount int64 // Mutation count for buffered rows.
write func([]*sp.Mutation) error // Typically a closure that calls client.Apply, but structured this way for testing.
wg sync.WaitGroup // Tracks in-progress writes.
writeLimit int64 // Limit on number of in-progress writes.
bytesLimit int64 // Limit on bytes buffered. AddRow blocks if rBytes exceeded this value.
retryLimit int64 // Limit on retries.
verbose bool // If true, print out messages about each write batch.
async asyncState
}
type row struct {
table string
cols []string
vals []interface{}
}
// Fields in this struct are modified asynchronously e.g. by go routines writing
// data to Spanner. Either hold a lock or use atomics, as detailed below.
//
// Note on terminology. A bad row is a row that generated an error. A dropped
// row is a row that we didn't write to Spanner (either because it generated
// an error, or because it was part of a batch that generate errors and we'd
// exhausted our retry budget and didn't split the batch and try again).
type asyncState struct {
writes int64 // Number of in-progress writes; access using atomic.
retries int64 // Number of retries; access using atomic.
lock sync.Mutex // Protects errors and badRows
errors map[string]int64 // Errors encountered; protected by lock.
sampleBadRows []*row // A sample of rows that generated errors; protected by lock.
sampleBadRowsBytes int64 // Estimate of bytes for sampleBadRows; protected by lock.
droppedRows map[string]int64 // Count of dropped rows, broken down by table.
}
// BatchWriterConfig specifies parameters for configuring BatchWriter.
type BatchWriterConfig struct {
WriteLimit int64 // Limit on number of in-progress writes.
BytesLimit int64 // Limit on bytes buffered.
RetryLimit int64 // Limit on retries.
Write func([]*sp.Mutation) error // Function to call to write to Spanner (typically a closure that calls client.Apply).
Verbose bool // If true, print out messages about each write batch.
}
// NewBatchWriter returns a new BatchWriter with parameters defined by config.
func NewBatchWriter(config BatchWriterConfig) *BatchWriter {
return &BatchWriter{
write: config.Write,
writeLimit: config.WriteLimit,
bytesLimit: config.BytesLimit,
retryLimit: config.RetryLimit,
verbose: config.Verbose,
async: asyncState{
errors: make(map[string]int64),
droppedRows: make(map[string]int64),
},
}
}
// AddRow appends a new row of data to bw's buffer of rows. Depending on the
// state of BatchWriter, AddRow may immediately return, or it may initiate writes,
// or it may block (waiting for some of the writes already in progress to
// complete) and then initiate writes.
func (bw *BatchWriter) AddRow(table string, cols []string, vals []interface{}) {
r := &row{table, cols, vals}
bw.rows = append(bw.rows, r)
bw.rBytes += byteSize(r)
bw.rCount += int64(len(r.cols))
bw.writeData()
}
// Flush initiates writes to Spanner of all buffered rows of data, and waits
// for them to complete.
func (bw *BatchWriter) Flush() {
for len(bw.rows) > 0 {
if atomic.LoadInt64(&bw.async.writes) < bw.writeLimit {
m, count, bytes := bw.getBatch()
if bw.verbose {
fmt.Printf("Starting write of %d rows to Spanner (%d bytes, %d mutations) [%d in progress]\n",
len(m), bytes, count, atomic.LoadInt64(&bw.async.writes))
}
logger.Log.Debug(fmt.Sprintf("Starting write of %d rows to Spanner (%d bytes, %d mutations) [%d in progress]\n",
len(m), bytes, count, atomic.LoadInt64(&bw.async.writes)))
bw.startWrite(m)
} else {
time.Sleep(10 * time.Millisecond)
}
}
bw.wg.Wait()
}
// DroppedRowsByTable returns a map of tables to counts of dropped rows.
// Dropped rows are rows that were not written to Spanner.
func (bw *BatchWriter) DroppedRowsByTable() map[string]int64 {
// Make a copy of bw.async.droppedRows since it is not thread-safe.
m := make(map[string]int64)
bw.async.lock.Lock()
defer bw.async.lock.Unlock()
for t, n := range bw.async.droppedRows {
m[t] = n
}
return m
}
// SampleBadRows returns a string-formatted list of sample rows that
// generated errors. Returns at most n rows.
// Note that we split up batches to isolate errors. Each row returned
// by SampleBadRows generated an error when sent to Spanner as a
// single-row batch.
func (bw *BatchWriter) SampleBadRows(n int) []string {
var l []string
bw.async.lock.Lock()
defer bw.async.lock.Unlock()
for _, x := range bw.async.sampleBadRows {
if len(l) >= n {
break
}
l = append(l, fmt.Sprintf("table=%s cols=%v data=%v", x.table, x.cols, x.vals))
}
return l
}
// Errors returns a map summarizing errors encountered. Keys are error
// strings, and values are the count of that error.
func (bw *BatchWriter) Errors() map[string]int64 {
// Make a copy of bw.async.errors since it is not thread-safe.
m := make(map[string]int64)
bw.async.lock.Lock()
defer bw.async.lock.Unlock()
for k, v := range bw.async.errors {
m[k] = v
}
return m
}
func (bw *BatchWriter) getBadRowsForTest() []*row {
return bw.async.sampleBadRows
}
// getBatch returns a slice of data from the front of bw.rows. The slice
// returned is the largest one not exceeding countThreshold and byteThreshold.
func (bw *BatchWriter) getBatch() (rows []*row, count int64, bytes int64) {
for i := range bw.rows {
c := count + int64(len(bw.rows[i].cols))
b := bytes + byteSize(bw.rows[i])
// If next row puts us over the thresholds, then stop. But make sure
// we have at least one row. If a single row puts us over the
// thresholds, there's not much we can do: we just try sending it to Spanner
// (it might succeed, since our thresholds are conservative).
if (c >= countThreshold || b >= byteThreshold) && len(rows) >= 1 {
bw.rCount -= count
bw.rBytes -= bytes
bw.rows = bw.rows[i:]
return rows, count, bytes
}
count = c
bytes = b
rows = append(rows, bw.rows[i])
}
bw.rCount = 0
bw.rBytes = 0
bw.rows = nil
return rows, count, bytes
}
func (bw *BatchWriter) errorStats(rows []*row, err error, retry bool) {
if bw.verbose {
fmt.Printf("Error while writing %d rows to Spanner: %v\n", len(rows), err)
}
logger.Log.Debug(fmt.Sprintf("Error while writing %d rows to Spanner: %v\n", len(rows), err))
bw.async.lock.Lock()
defer bw.async.lock.Unlock()
bw.async.errors[err.Error()]++
if retry {
return
}
// All rows in r will be dropped.
if len(rows) == 1 {
// This is a confirmed bad row: add it to the badRows list.
r := rows[0]
n := byteSize(r)
// Use bw.bytesLimit to cap storage used by badRows. Keep at least one bad row.
if bw.async.sampleBadRowsBytes+n < bw.bytesLimit || len(bw.async.sampleBadRows) == 0 {
bw.async.sampleBadRows = append(bw.async.sampleBadRows, r)
bw.async.sampleBadRowsBytes += n
}
}
for _, x := range rows {
bw.async.droppedRows[x.table]++
}
return
}
// Note: doWriteAndHandleErrors must be thread-safe because it is run
// inside a go routine.
func (bw *BatchWriter) doWriteAndHandleErrors(rows []*row) {
var m []*sp.Mutation
for _, x := range rows {
m = append(m, sp.Insert(x.table, x.cols, x.vals))
}
if err := bw.write(m); err != nil {
hitRetryLimit := atomic.LoadInt64(&bw.async.retries) >= bw.retryLimit
retry := len(rows) > 1 && !hitRetryLimit
bw.errorStats(rows, err, retry)
if !retry {
if hitRetryLimit && bw.verbose {
fmt.Printf("Have hit %d retries: will not do any more\n", atomic.LoadInt64(&bw.async.retries))
}
if hitRetryLimit {
logger.Log.Debug(fmt.Sprintf("Have hit %d retries: will not do any more\n", atomic.LoadInt64(&bw.async.retries)))
}
return
}
// Split into 10 pieces and retry. This is useful
// if a batch contains a bad data row (Spanner
// will fail the entire batch). In effect we attempt
// to narrow down which row (or rows) are bad, and
// write the 'good' rows to Spanner.
k := 1 + len(rows)/10
min := func(i, j int) int {
if i <= j {
return i
}
return j
}
for i := 0; i < len(rows); i += k {
atomic.AddInt64(&bw.async.retries, 1)
bw.doWriteAndHandleErrors(rows[i:min(i+k, len(rows))])
}
}
}
// Note: backgroundWrite must be thread-safe because it is run as
// a go routine.
func (bw *BatchWriter) backgroundWrite(rows []*row) {
defer bw.wg.Done()
defer atomic.AddInt64(&bw.async.writes, -1)
bw.doWriteAndHandleErrors(rows)
}
// startWrite initiates an asynchronous write of rows to Spanner.
func (bw *BatchWriter) startWrite(rows []*row) {
bw.wg.Add(1)
atomic.AddInt64(&bw.async.writes, 1)
go bw.backgroundWrite(rows)
}
// writeData initiates writes to Spanner until either:
// a) we have less than a 'batch' to write, or
// b) we've hit writeLimit and we're under bytesLimit.
// It will block and re-try till either (a) or (b) holds.
func (bw *BatchWriter) writeData() {
for bw.rCount > countThreshold || bw.rBytes > byteThreshold {
if atomic.LoadInt64(&bw.async.writes) < bw.writeLimit {
m, count, bytes := bw.getBatch()
if bw.verbose {
fmt.Printf("Starting write of %d rows to Spanner (%d bytes, %d mutations) [%d in progress]\n",
len(m), bytes, count, atomic.LoadInt64(&bw.async.writes))
}
logger.Log.Debug(fmt.Sprintf("Starting write of %d rows to Spanner (%d bytes, %d mutations) [%d in progress]\n",
len(m), bytes, count, atomic.LoadInt64(&bw.async.writes)))
bw.startWrite(m)
} else {
if bw.rBytes < bw.bytesLimit {
return
}
time.Sleep(10 * time.Millisecond)
}
}
}
func byteSize(r *row) int64 {
n := int64(len(r.table))
for _, c := range r.cols {
n += int64(len(c))
}
for _, v := range r.vals {
switch x := v.(type) {
case string:
n += int64(len(x))
default:
n += int64(unsafe.Sizeof(v))
}
}
return n
}