oplog/txn_buffer.go (273 lines of code) (raw):
// Copyright (C) MongoDB, Inc. 2019-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
// Package txn implements functions for examining and processing transaction
// oplog entries.
package oplog
import (
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"sync"
"go.mongodb.org/mongo-driver/bson"
)
var ErrBufferClosed = errors.New("transaction buffer already closed")
var ErrTxnAborted = errors.New("transaction aborted")
var ErrNotTransaction = errors.New("oplog entry is not a transaction")
type txnTask struct {
meta TxnMeta
op ParsedLog
}
// txnState tracks an individual transaction, including storage of related ops
// and communication channels. It includes a WaitGroup for waiting on
// transaction-related goroutines.
type txnState struct {
buffer []ParsedLog
ingestChan chan txnTask
ingestDone chan struct{}
ingestErr error
stopChan chan struct{}
startTime TxnOpTime
wg sync.WaitGroup
}
func newTxnState(op ParsedLog) *txnState {
return &txnState{
ingestChan: make(chan txnTask),
ingestDone: make(chan struct{}),
stopChan: make(chan struct{}),
buffer: make([]ParsedLog, 0),
startTime: GetTxnOpTimeFromOplogEntry(&op),
}
}
// Because state is currently kept in memory, purge merely drops the reference
// so the GC will eventually clean up. Eventually, this might clean up a file
// on disk.
func (ts *txnState) purge() error {
ts.buffer = nil
return nil
}
// TxnBuffer stores transaction oplog entries until they are needed
// to commit them to a desination. It includes a WaitGroup for tracking
// all goroutines across all transactions for use in global shutdown.
type TxnBuffer struct {
sync.Mutex
stopped bool
txns map[TxnID]*txnState
wg sync.WaitGroup
}
// NewBuffer initializes a transaction oplog buffer.
func NewBuffer() *TxnBuffer {
return &TxnBuffer{
txns: make(map[TxnID]*txnState),
}
}
func (b *TxnBuffer) Size() int {
return len(b.txns)
}
// Concurrency notes:
//
// We require that AddOp, GetTxnStream and PurgeTxn be called serially as part
// of orchestrating replay of oplog entries. The only method that could run
// concurrently is Stop. If Stop is called, we're in some sort of global
// shutdown, so we don't care how other methods and goroutines resolve, only
// that they do so without panicking.
//
// AddOp sends a transaction oplog entry to a background goroutine (starting
// one for a new transaction TxnID) for asynchronous pre-processing and storage.
// If the oplog entry is not a transaction, an error will be returned. Any
// errors during processing can be discovered later via the error channel from
// `GetTxnStream`.
//
// Must not be called concurrently with other transaction-related operations.
// Must not be called for a given transaction after starting to stream that
// transaction.
func (b *TxnBuffer) AddOp(m TxnMeta, op ParsedLog) error {
b.Lock()
defer b.Unlock()
if b.stopped {
return ErrBufferClosed
}
if !m.IsTxn() {
return ErrNotTransaction
}
// Get or initialize transaction state
state, ok := b.txns[m.id]
if !ok {
state = newTxnState(op)
b.txns[m.id] = state
b.wg.Add(1)
state.wg.Add(1)
go b.ingester(state)
}
// Send unless the ingester has shut down, e.g. on error
select {
case <-state.ingestDone:
case state.ingestChan <- txnTask{meta: m, op: op}:
}
return nil
}
func (b *TxnBuffer) ingester(state *txnState) {
LOOP:
for {
select {
case t := <-state.ingestChan:
if t.meta.IsData() {
// process it
innerOps, err := ExtractInnerOps(&t.op)
if err != nil {
state.ingestErr = err
break LOOP
}
// store it
for _, op := range innerOps {
state.buffer = append(state.buffer, op)
}
}
if t.meta.IsFinal() {
break LOOP
}
case <-state.stopChan:
break LOOP
}
}
close(state.ingestDone)
state.wg.Done()
b.wg.Done()
}
// GetTxnStream returns a channel of Oplog entries in a transaction and a
// channel for errors. If the buffer has been stopped, the returned op channel
// will be closed and the error channel will have an error on it.
//
// Must not be called concurrently with other transaction-related operations.
// For a given transaction, it must not be called until after a final oplog
// entry has been passed to AddOp and it must not be called more than once.
func (b *TxnBuffer) GetTxnStream(m TxnMeta) (<-chan ParsedLog, <-chan error) {
b.Lock()
defer b.Unlock()
opChan := make(chan ParsedLog)
errChan := make(chan error, 1)
if b.stopped {
return sendErrAndClose(opChan, errChan, ErrBufferClosed)
}
if !m.IsTxn() {
return sendErrAndClose(opChan, errChan, ErrNotTransaction)
}
state := b.txns[m.id]
if state == nil {
return sendErrAndClose(opChan, errChan, fmt.Errorf("GetTxnStream found no state for %v", m.id))
}
// The final oplog entry must have been passed to AddOp before calling this
// method, so we know this will be able to make progress.
<-state.ingestDone
if state.ingestErr != nil {
return sendErrAndClose(opChan, errChan, state.ingestErr)
}
// Launch streaming goroutine
b.wg.Add(1)
state.wg.Add(1)
go b.streamer(state, opChan, errChan)
return opChan, errChan
}
func (b *TxnBuffer) streamer(state *txnState, opChan chan<- ParsedLog, errChan chan<- error) {
LOOP:
for _, op := range state.buffer {
select {
case opChan <- op:
case <-state.stopChan:
errChan <- ErrTxnAborted
break LOOP
}
}
close(opChan)
close(errChan)
state.wg.Done()
b.wg.Done()
}
// OldestOpTime returns the optime of the oldest buffered transaction, or
// an empty optime if no transactions are buffered. This will include
// committed transactions until they are purged.
func (b *TxnBuffer) OldestOpTime() TxnOpTime {
b.Lock()
defer b.Unlock()
oldest := TxnOpTime{}
for _, v := range b.txns {
if TxnOpTimeIsEmpty(oldest) || TxnOpTimeLessThan(v.startTime, oldest) {
oldest = v.startTime
}
}
return oldest
}
// PurgeTxn closes any transaction streams in progress and deletes all oplog
// entries associated with a transaction.
//
// Must not be called concurrently with other transaction-related operations.
// For a given transaction, it must not be called until after a final oplog
// entry has been passed to AddOp and it must not be called more than once.
func (b *TxnBuffer) PurgeTxn(m TxnMeta) error {
b.Lock()
defer b.Unlock()
if b.stopped {
return ErrBufferClosed
}
state := b.txns[m.id]
if state == nil {
return fmt.Errorf("PurgeTxn found no state for %v", m.id)
}
// When the lock is dropped, we don't want Stop to find this transaction and
// double-close it.
delete(b.txns, m.id)
close(state.stopChan)
// Wait for goroutines to terminate, then clean up.
state.wg.Wait()
state.purge()
return nil
}
// Stop shuts down processing and cleans up. Subsequent calls to Stop() will return nil.
// All other methods error after this is called.
func (b *TxnBuffer) Stop() error {
b.Lock()
if b.stopped {
b.Unlock()
return nil
}
b.stopped = true
for _, state := range b.txns {
close(state.stopChan)
}
b.Unlock()
// At this point we know any subsequent public method will see the buffer
// is stopped, no new goroutines will be launched, and existing goroutines
// have been signaled to close. Next, wait for goroutines to stop, then
// clean up.
b.wg.Wait()
var firstErr error
for _, state := range b.txns {
err := state.purge()
if err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
// sendErrAndClose is a utility for putting an error on a channel before closing.
func sendErrAndClose(o chan ParsedLog, e chan error, err error) (chan ParsedLog, chan error) {
e <- err
close(o)
close(e)
return o, e
}
func findValueByKey(keyName string, document *bson.D) (interface{}, error) {
for _, key := range *document {
if key.Key == keyName {
return key.Value, nil
}
}
return nil, errors.New("no such field")
}
const extractErrorFmt = "error extracting transaction ops: %s: %v"
// ExtractInnerOps
// doc.applyOps[i].ts(Let ckpt use the last ts to judge complete)
// applyOps[0 - n-1].ts = doc.ts - 1
// applyOps[n-1].ts = doc.ts
func ExtractInnerOps(tranOp *ParsedLog) ([]ParsedLog, error) {
doc := tranOp.Object
rawAO, err := findValueByKey("applyOps", &doc)
if err != nil {
return nil, fmt.Errorf(extractErrorFmt, "applyOps field", err)
}
ao, ok := rawAO.(bson.A)
if !ok {
return nil, fmt.Errorf(extractErrorFmt, "applyOps field", "not a BSON array")
}
tmpTimestamp := tranOp.Timestamp
tmpTimestamp.I = tmpTimestamp.I - 1
ops := make([]ParsedLog, len(ao))
for i, v := range ao {
opDoc, ok := v.(bson.D)
if !ok {
return nil, fmt.Errorf(extractErrorFmt, "applyOps op", "not a BSON document")
}
op, err := bsonDocToOplog(opDoc)
if err != nil {
return nil, fmt.Errorf(extractErrorFmt, "applyOps op", err)
}
// The inner ops doesn't have these fields and they are required by lastAppliedTime.Latest in Mongomirror,
// so we are assigning them from the parent transaction op
op.Timestamp = tmpTimestamp
op.Term = tranOp.Term
op.Hash = tranOp.Hash
ops[i] = *op
}
if len(ops) > 0 {
// applyOps maybe empty : https://jira.mongodb.org/browse/SERVER-50769
ops[len(ops)-1].Timestamp = tranOp.Timestamp
}
return ops, nil
}
const opConvertErrorFmt = "error converting bson.D to op: %s: %v"
func bsonDocToOplog(doc bson.D) (*ParsedLog, error) {
op := ParsedLog{}
for _, v := range doc {
switch v.Key {
case "op":
s, ok := v.Value.(string)
if !ok {
return nil, fmt.Errorf(opConvertErrorFmt, "op field", "not a string")
}
op.Operation = s
case "ns":
s, ok := v.Value.(string)
if !ok {
return nil, fmt.Errorf(opConvertErrorFmt, "ns field", "not a string")
}
op.Namespace = s
case "o":
d, ok := v.Value.(bson.D)
if !ok {
return nil, fmt.Errorf(opConvertErrorFmt, "o field", "not a BSON Document")
}
op.Object = d
case "o2":
d, ok := v.Value.(bson.D)
if !ok {
return nil, fmt.Errorf(opConvertErrorFmt, "o2 field", "not a BSON Document")
}
op.Query = d
case "ui":
u, ok := v.Value.(primitive.Binary)
if !ok {
return nil, fmt.Errorf(opConvertErrorFmt, "ui field", "not binary data")
}
op.UI = &u
}
}
return &op, nil
}