nimo-shake/incr-sync/syncer.go (559 lines of code) (raw):
package incr_sync
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/dynamodb"
"nimo-shake/checkpoint"
utils "nimo-shake/common"
conf "nimo-shake/configure"
"nimo-shake/protocal"
"nimo-shake/qps"
"nimo-shake/writer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
const (
ShardChanSize = 4096
WaitFatherFinishInterval = 20 // seconds
GetRecordsInterval = 3
CheckpointFlushInterval = 20
DispatcherBatcherChanSize = 4096
DispatcherExecuterChanSize = 4096
IncrBatcherTimeout = 1
EventInsert = "INSERT"
EventMODIFY = "MODIFY"
EventRemove = "REMOVE"
)
var (
GlobalShardMap = make(map[string]int) // 1: running 2: finish
GlobalShardLock sync.Mutex
GlobalFetcherMoreFlag = make(map[string]int) // epoch of table
GlobalFetcherLock sync.Mutex
// move from const to var, used for UT
BatcherNumber = 25
BatcherSize = 2 * utils.MB
// incr sync metric
replMetric *utils.ReplicationMetric
ckptWriterG checkpoint.Writer
)
type IncrSycnMetric struct {
recordsGet uint64
recordsWrite uint64
}
func Start(streamMap map[string]*dynamodbstreams.Stream, ckptWriter checkpoint.Writer) {
replMetric = utils.NewMetric(utils.TypeIncr, utils.METRIC_CKPT_TIMES|utils.METRIC_SUCCESS|utils.METRIC_TPS)
ckptWriterG = ckptWriter
for table, stream := range streamMap {
LOG.Info("table[%v] stream[%v] begin", table, *stream.StreamArn)
shardChan := make(chan *utils.ShardNode, ShardChanSize)
fetcher := NewFetcher(table, stream, shardChan, ckptWriter, replMetric)
if fetcher == nil {
LOG.Crashf("table[%v] stream[%v] start fetcher failed", table, *stream.StreamArn)
}
go fetcher.Run()
for i := 0; i < int(conf.Options.IncreaseConcurrency); i++ {
go func(id int, table string) {
for {
shard := <-shardChan
LOG.Info("table[%s] dispatch id[%v] starts shard[%v]", table, id, *shard.Shard.ShardId)
// check whether current shard is running or finished
GlobalShardLock.Lock()
flag := GlobalShardMap[*shard.Shard.ShardId]
GlobalShardLock.Unlock()
switch flag {
case 0:
LOG.Info("table[%s] dispatch id[%v] shard[%v] isn't running, need to run",
table, id, *shard.Shard.ShardId)
case 1:
LOG.Warn("table[%s] dispatch id[%v] shard[%v] is running, no need to run again",
table, id, *shard.Shard.ShardId)
continue
case 2:
LOG.Warn("table[%s] dispatch id[%v] shard[%v] is finished, no need to run again",
table, id, *shard.Shard.ShardId)
continue
}
// set running flag
GlobalShardLock.Lock()
GlobalShardMap[*shard.Shard.ShardId] = 1
GlobalShardLock.Unlock()
d := NewDispatcher(id, shard, ckptWriter, replMetric)
d.Run()
// set finished flag
GlobalShardLock.Lock()
GlobalShardMap[*shard.Shard.ShardId] = 2
GlobalShardLock.Unlock()
// update table epoch
GlobalFetcherLock.Lock()
GlobalFetcherMoreFlag[shard.Table] += 1
GlobalFetcherLock.Unlock()
LOG.Info("dispatch id[%v] finishes shard[%v]", id, *shard.Shard.ShardId)
}
}(i, table)
}
}
select {}
}
/*-----------------------------------------------------------*/
// 1 dispatcher corresponding to 1 shard
type Dispatcher struct {
id int
shard *utils.ShardNode
table string
dynamoStreamSession *dynamodbstreams.DynamoDBStreams
targetWriter writer.Writer
batchChan chan *dynamodbstreams.Record
executorChan chan *ExecuteNode
converter protocal.Converter
ns utils.NS
checkpointPosition string
checkpointApproximateTime string
shardIt string // only used when checkpoint is empty
unitTestStr string // used for UT only
close bool // is close?
ckptWriter checkpoint.Writer
metric *utils.ReplicationMetric
}
func NewDispatcher(id int, shard *utils.ShardNode, ckptWriter checkpoint.Writer,
metric *utils.ReplicationMetric) *Dispatcher {
// create dynamo stream client
dynamoStreamSession, err := utils.CreateDynamoStreamSession(conf.Options.LogLevel)
if err != nil {
LOG.Crashf("table[%s] create dynamodb stream session failed[%v]", shard.Table, err)
return nil
}
ns := utils.NS{
Database: conf.Options.Id,
Collection: shard.Table,
}
// create target writer
targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ns, conf.Options.LogLevel)
if targetWriter == nil {
LOG.Crashf("table[%s] create target-writer with type[%v] and address[%v] failed", ns.Collection,
conf.Options.TargetType, conf.Options.TargetAddress)
}
// converter
converter := protocal.NewConverter(conf.Options.ConvertType)
if converter == nil {
LOG.Crashf("table[%s] create converter[%v] failed", conf.Options.ConvertType)
}
d := &Dispatcher{
id: id,
shard: shard,
dynamoStreamSession: dynamoStreamSession,
targetWriter: targetWriter,
batchChan: make(chan *dynamodbstreams.Record, DispatcherBatcherChanSize),
executorChan: make(chan *ExecuteNode, DispatcherExecuterChanSize),
converter: converter,
ns: ns,
ckptWriter: ckptWriter,
metric: metric,
}
go d.batcher()
go d.executor()
go d.ckptManager()
return d
}
func (d *Dispatcher) String() string {
if d.unitTestStr != "" {
return d.unitTestStr
}
return fmt.Sprintf("dispatcher[%v] table[%v] shard-id[%v]", d.id, d.ns.Collection, *d.shard.Shard.ShardId)
}
func (d *Dispatcher) waitFatherShardFinished() {
// re-check father shard finished
var father string
if d.shard.Shard.ParentShardId != nil {
father = *d.shard.Shard.ParentShardId
}
LOG.Info("%s begins, check father shard[%v] status", d.String(), father)
if father != "" {
// check father finished
for {
fatherCkpt, err := d.ckptWriter.Query(father, d.ns.Collection)
if err != nil && err.Error() != utils.NotFountErr {
LOG.Crashf("%s query father[%v] checkpoint fail[%v]", d.String(), father, err)
}
// err != nil means utils.NotFountErr
if err != nil || !checkpoint.IsStatusProcessing(fatherCkpt.Status) {
break
}
LOG.Warn("%s father shard[%v] is still running, waiting...", d.String(), father)
time.Sleep(WaitFatherFinishInterval * time.Second)
}
}
LOG.Info("%s father shard[%v] finished", d.String(), father)
}
func (d *Dispatcher) Run() {
// fetch shardIt
shardIt, ok := checkpoint.GlobalShardIteratorMap.Get(*d.shard.Shard.ShardId)
if ok {
checkpoint.GlobalShardIteratorMap.Delete(*d.shard.Shard.ShardId)
LOG.Info("%s current shard already in ShardIteratorMap", d.String())
} else {
// check current checkpoint
ckpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
if err != nil {
LOG.Crashf("%s query current[%v] checkpoint fail[%v]", d.String(), *d.shard.Shard.ShardId, err)
}
if ckpt.IteratorType == checkpoint.IteratorTypeLatest && checkpoint.IsStatusProcessing(ckpt.Status) {
if ckpt.ShardIt == "" {
/*
* iterator_type == "LATEST" means this shard has been found before full-sync.
* When checkpoint updated, this field will be updated to "AT_SEQUENCE_NUMBER" in incr_sync stage,
* so this case only happened when nimo-shake finished full-sync and then crashed before incr_sync
*/
LOG.Crashf("%s shard[%v] iterator type[%v] abnormal, status[%v], need full sync", d.String(),
*d.shard.Shard.ShardId, ckpt.IteratorType, ckpt.Status)
} else if ckpt.ShardIt == checkpoint.InitShardIt {
// means generate new shardIt
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
SequenceNumber: aws.String(ckpt.SequenceNumber),
ShardIteratorType: aws.String(checkpoint.IteratorTypeAtSequence),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
} else {
// dynamodb rule: this is only used when restart in 30 minutes
shardIt = ckpt.ShardIt
}
} else if ckpt.IteratorType == checkpoint.IteratorTypeTrimHorizon {
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
ShardIteratorType: aws.String(checkpoint.IteratorTypeTrimHorizon),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
} else {
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
SequenceNumber: aws.String(ckpt.SequenceNumber),
ShardIteratorType: aws.String(checkpoint.IteratorTypeAfterSequence),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
}
}
LOG.Info("%s start with shard iterator[%v]", d.String(), shardIt)
// update checkpoint: in-processing
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
checkpoint.FieldStatus: checkpoint.StatusInProcessing,
}, d.ns.Collection)
if err != nil {
LOG.Crashf("%s update checkpoint to in-processing failed[%v]", d.String(), err)
}
LOG.Info("%s shard-id[%v] finish updating checkpoint", d.String(), shardIt)
// get records
d.getRecords(shardIt)
LOG.Info("%s finish shard", d.String())
LOG.Info("%s shard fetch done, Run() func exit", d.String())
}
func (d *Dispatcher) getRecords(shardIt string) {
qos := qps.StartQoS(int(conf.Options.QpsIncr))
defer qos.Close()
next := &shardIt
for {
<-qos.Bucket
// LOG.Info("%s bbbb0 ", d.String())
records, err := d.dynamoStreamSession.GetRecords(&dynamodbstreams.GetRecordsInput{
ShardIterator: next,
Limit: aws.Int64(conf.Options.QpsIncrBatchNum),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
LOG.Warn("%s getRecords get records with iterator[%v] recv ProvisionedThroughputExceededException continue",
d.String(), *next)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeSerialization:
LOG.Warn("%s getRecords get records with iterator[%v] recv SerializationError[%v] continue",
d.String(), *next, err)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeRequestError, request.CanceledErrorCode,
request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead,
dynamodb.ErrCodeInternalServerError:
LOG.Warn("%s getRecords get records with iterator[%v] recv Error[%v] continue",
d.String(), *next, err)
time.Sleep(5 * time.Second)
continue
default:
LOG.Crashf("%s getRecords scan failed[%v] errcode[%v]", d.String(), err, aerr.Code())
}
} else {
LOG.Crashf("%s get records with iterator[%v] failed[%v]", d.String(), *next, err)
}
}
// LOG.Info("%s bbbb1 %v", d.String(), *next)
next = records.NextShardIterator
LOG.Debug("getRecords shardIt[%s] record_number[%d]", shardIt, len(records.Records))
if len(records.Records) == 0 && next != nil {
d.shardIt = *next // update shardIt
time.Sleep(GetRecordsInterval * time.Second)
continue
}
d.metric.AddGet(uint64(len(records.Records)))
// LOG.Info("bbbb2 ", records.Records)
// do write
for _, record := range records.Records {
d.batchChan <- record
}
if next == nil {
break
}
}
close(d.batchChan)
LOG.Info("%s getRecords exit", d.String())
}
type ExecuteNode struct {
tp string
operate []interface{}
index []interface{}
lastSequenceNumber string
approximateCreationDateTime string
}
func (d *Dispatcher) batcher() {
node := &ExecuteNode{
operate: make([]interface{}, 0, BatcherNumber),
index: make([]interface{}, 0, BatcherNumber),
}
var preEvent string
var batchNr int
var batchSize int
for {
var record *dynamodbstreams.Record
ok := true
timeout := false
select {
case record, ok = <-d.batchChan:
case <-time.After(time.Second * IncrBatcherTimeout):
timeout = true
record = nil
}
if !ok || timeout {
if len(node.operate) != 0 || len(node.index) != 0 {
d.executorChan <- node
node = &ExecuteNode{
tp: "",
operate: make([]interface{}, 0, BatcherNumber),
}
preEvent = ""
batchNr = 0
batchSize = 0
}
if !ok {
// channel close
break
}
// timeout
continue
}
if *record.EventName != preEvent || batchNr >= BatcherNumber || batchSize >= BatcherSize {
// need split
if len(node.operate) != 0 || len(node.index) != 0 {
// preEvent != ""
d.executorChan <- node
}
node = &ExecuteNode{
tp: *record.EventName,
operate: make([]interface{}, 0, BatcherNumber), // need fetch data field when type is RawData
index: make([]interface{}, 0, BatcherNumber), // index list
}
preEvent = *record.EventName
batchNr = 0
batchSize = 0
}
// parse index
index, err := d.converter.Run(record.Dynamodb.Keys)
if err != nil {
LOG.Crashf("%s convert parse[%v] failed[%v]", d.String(), record.Dynamodb.Keys, err)
}
// LOG.Info("~~~op[%v] data: %v", *record.EventName, record)
// batch into list
switch *record.EventName {
case EventInsert:
value, err := d.converter.Run(record.Dynamodb.NewImage)
if err != nil {
LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
}
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.operate = append(node.operate, value.(protocal.RawData).Data)
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.operate = append(node.operate, value)
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
case EventMODIFY:
value, err := d.converter.Run(record.Dynamodb.NewImage)
if err != nil {
LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
}
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.operate = append(node.operate, value.(protocal.RawData).Data)
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.operate = append(node.operate, value)
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
case EventRemove:
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
default:
LOG.Crashf("%s unknown event name[%v]", d.String(), *record.EventName)
}
node.lastSequenceNumber = *record.Dynamodb.SequenceNumber
if record.Dynamodb.ApproximateCreationDateTime != nil {
node.approximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime.String()
}
batchNr += 1
// batchSize += index.Size
}
LOG.Info("%s batcher exit", d.String())
close(d.executorChan)
}
func (d *Dispatcher) executor() {
if conf.Options.SyncMode == utils.SyncModeAll && conf.Options.IncrSyncParallel == true {
ckptWriter := checkpoint.NewWriter(conf.Options.CheckpointType, conf.Options.CheckpointAddress,
conf.Options.CheckpointDb)
for {
status, err := ckptWriter.FindStatus()
if err != nil || status != checkpoint.CheckpointStatusValueIncrSync {
LOG.Info("%s wait for full_sync_done[err:%v][status:%s][d.executorChan:%d]",
d.String(), err, status, len(d.executorChan))
time.Sleep(5 * time.Second)
} else {
LOG.Info("%s full_sync_done, do incr sync", d.String())
break
}
}
}
d.waitFatherShardFinished()
for node := range d.executorChan {
LOG.Info("%s try write data with length[%v], tp[%v] approximate[%v] [d.executorChan:%d]",
d.String(), len(node.index), node.tp, node.approximateCreationDateTime, len(d.executorChan))
var err error
switch node.tp {
case EventInsert:
err = d.targetWriter.Insert(node.operate, node.index)
case EventMODIFY:
err = d.targetWriter.Update(node.operate, node.index)
case EventRemove:
err = d.targetWriter.Delete(node.index)
default:
LOG.Crashf("unknown write operation[%v]", node.tp)
}
if err != nil {
LOG.Crashf("execute command[%v] failed[%v]", node.tp, err)
}
d.metric.AddSuccess(uint64(len(node.index)))
d.metric.AddCheckpoint(1)
d.checkpointPosition = node.lastSequenceNumber
d.checkpointApproximateTime = node.approximateCreationDateTime
}
// update checkpoint: finish
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
checkpoint.FieldStatus: checkpoint.StatusDone,
}, d.ns.Collection)
if err != nil {
LOG.Crashf("%s update checkpoint to done failed[%v]", d.String(), err)
}
LOG.Info("%s executor exit", d.String())
d.close = true
go func() {
<-time.NewTimer(time.Minute * 5).C
d.targetWriter.Close()
LOG.Info("%s incr sync writer close", d.String())
}()
LOG.Info("%s close in 5 minutes", d.String())
}
// used to set checkpoint
func (d *Dispatcher) ckptManager() {
var prevCkptPosition string
initCkpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
if err != nil && err.Error() != utils.NotFountErr {
LOG.Crashf("%s query checkpoint failed[%v]", d.String(), err)
}
for range time.NewTicker(CheckpointFlushInterval * time.Second).C {
if d.close {
break
}
var ckpt map[string]interface{}
if d.checkpointPosition == "" {
if d.shardIt != "" {
// update shardIt
ckpt = map[string]interface{}{
checkpoint.FieldShardIt: d.shardIt,
checkpoint.FieldTimestamp: time.Now().Format(utils.GolangSecurityTime),
checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
}
} else {
continue
}
} else {
if d.checkpointPosition == prevCkptPosition {
continue
}
// do not update when checkpoint < init checkpoint
if d.checkpointPosition < initCkpt.SequenceNumber {
LOG.Warn("%s current checkpoint[%v] < init checkpoint[%v], no need to update", d.String(),
d.checkpointPosition, initCkpt.SequenceNumber)
continue
}
ckpt = map[string]interface{}{
checkpoint.FieldSeqNum: d.checkpointPosition,
checkpoint.FieldIteratorType: checkpoint.IteratorTypeAtSequence,
checkpoint.FieldTimestamp: time.Now().Format(utils.GolangSecurityTime),
checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
}
}
prevCkptPosition = d.checkpointPosition
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, ckpt, d.ns.Collection)
if err != nil {
LOG.Error("%s update table[%v] shard[%v] input[%v] failed[%v]", d.String(), d.ns.Collection,
*d.shard.Shard.ShardId, ckpt, err)
} else {
LOG.Info("%s update table[%v] shard[%v] input[%v] ok", d.String(), d.ns.Collection,
*d.shard.Shard.ShardId, ckpt)
}
}
}
func RestAPI() {
type IncrSyncInfo struct {
Get uint64 `json:"records_get"`
Write uint64 `json:"records_write"`
CkptTimes uint64 `json:"checkpoint_times"`
UpdateTimes interface{} `json:"checkpoint_update_times"`
Error string `json:"error"`
}
type CheckpointInfo struct {
UpdateTime string `json:"update_time"`
ApproximateTime string `json:"sync_approximate_time"`
FatherShardId string `json:"father_shard_id"`
SequenceNumber string `json:sequence_number`
}
utils.IncrSyncHttpApi.RegisterAPI("/metric", nimo.HttpGet, func([]byte) interface{} {
ckpt, err := ckptWriterG.ExtractCheckpoint()
if err != nil {
return &IncrSyncInfo{
Error: err.Error(),
}
}
retCkptMap := make(map[string]map[string]interface{}, len(ckpt))
for table, ckptShardMap := range ckpt {
shardMap := make(map[string]interface{}, 1)
for shard, ckptVal := range ckptShardMap {
if ckptVal.Status != string(utils.StatusProcessing) {
continue
}
shardMap[shard] = &CheckpointInfo{
UpdateTime: ckptVal.UpdateDate,
FatherShardId: ckptVal.FatherId,
ApproximateTime: ckptVal.ApproximateTime,
SequenceNumber: ckptVal.SequenceNumber,
}
}
retCkptMap[table] = shardMap
}
return &IncrSyncInfo{
Get: replMetric.Get(),
Write: replMetric.Success(),
CkptTimes: replMetric.CheckpointTimes,
UpdateTimes: retCkptMap,
}
})
}