nimo-shake/checkpoint/manager.go (352 lines of code) (raw):
package checkpoint
import (
"fmt"
"time"
"nimo-shake/common"
"nimo-shake/filter"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
LOG "github.com/vinllen/log4go"
)
// check whether need full sync
func CheckCkpt(ckptWriter Writer,
dynamoStreams *dynamodbstreams.DynamoDBStreams) (bool, map[string]*dynamodbstreams.Stream, error) {
// fetch target checkpoint status table
status, err := ckptWriter.FindStatus()
if err != nil {
return false, nil, fmt.Errorf("find checkpoint status failed[%v]", err)
}
// need full sync if status != CheckpointStatusValueIncrSync which means last time sync isn't incr sync
if status != CheckpointStatusValueIncrSync {
LOG.Info("checkpoint status[%v] != %v, need full sync", status, CheckpointStatusValueIncrSync)
return false, nil, nil
}
// extract checkpoint from mongodb, map[tableName] = (map[shardId] = checkpoint element)
ckptMap, err := ckptWriter.ExtractCheckpoint()
if err != nil {
LOG.Error("extract checkpoint failed[%v]", err)
return false, nil, err
}
LOG.Info("checkpoint map: %v", ckptMap)
// fetch source stream shard information
/*
* Currently, nimo-shake can't handle DDL like create table and drop table.
* So, if user crate or drop table and restart, nimo-shake will run full-sync.
* Plus, DDL can't be run when nimo-shake starts, no matter in full-sync or
* incr-sync.
*/
streamMap := make(map[string]*dynamodbstreams.Stream, len(ckptMap))
var lastEvaluateString *string = nil
for {
var listStreamInput *dynamodbstreams.ListStreamsInput
if lastEvaluateString != nil {
listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
} else {
listStreamInput = &dynamodbstreams.ListStreamsInput{}
}
streamList, err := dynamoStreams.ListStreams(listStreamInput)
if err != nil {
fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
}
// handle stream list, one table may have several streams
for _, stream := range streamList.Streams {
if _, ok := streamMap[*stream.TableName]; ok {
continue
}
if filter.IsFilter(*stream.TableName) {
LOG.Info("table[%v] filtered", *stream.TableName)
continue
}
if exist, err := CheckSingleStream(stream, dynamoStreams, ckptMap); err != nil {
return false, nil, err
} else if exist {
streamMap[*stream.TableName] = stream
} else {
// need full sync
LOG.Warn("table[%v] not exists on checkpoint, need full sync", *stream.TableName)
return false, nil, nil
}
}
if streamList.LastEvaluatedStreamArn == nil {
// end
break
} else {
lastEvaluateString = streamList.LastEvaluatedStreamArn
}
}
for key := range ckptMap {
if _, ok := streamMap[key]; !ok {
LOG.Info("table[%v] not exist on streams", key)
return false, nil, nil
}
}
return true, streamMap, nil
}
// CheckSingleStream check ckptMap[*stream.TableName] shards sequence number is not expired(deleted)
func CheckSingleStream(stream *dynamodbstreams.Stream, dynamoStreams *dynamodbstreams.DynamoDBStreams,
ckptMap map[string]map[string]*Checkpoint) (bool, error) {
// check table exists on checkpoint map
ckptInnerMap, ok := ckptMap[*stream.TableName]
if !ok {
LOG.Warn("collection[%v] isn't exist on checkpoint map", *stream.TableName)
return false, nil
}
shardMap := make(map[string]*dynamodbstreams.Shard)
var lastShardIdString *string = nil
for {
var describeStreamInput *dynamodbstreams.DescribeStreamInput
if lastShardIdString != nil {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
ExclusiveStartShardId: lastShardIdString,
}
} else {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
}
}
describeResult, err := dynamoStreams.DescribeStream(describeStreamInput)
if err != nil {
return false, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
stream.TableName, err)
}
if *describeResult.StreamDescription.StreamStatus == "DISABLED" {
// stream is disabled
return false, nil
}
// convert shard list to map
for _, shard := range describeResult.StreamDescription.Shards {
shardMap[*shard.ShardId] = shard
}
if describeResult.StreamDescription.LastEvaluatedShardId == nil {
break
} else {
lastShardIdString = describeResult.StreamDescription.LastEvaluatedShardId
LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
*stream.TableName, *describeResult.StreamDescription.LastEvaluatedShardId)
}
}
LOG.Info("dynamo stream shard map: %v", shardMap)
// check shards exist
for key, ckpt := range ckptInnerMap {
if shard, ok := shardMap[key]; !ok {
LOG.Warn("collection[%v] with shard[%v] isn't exist on the stream, status[%v]", *stream.TableName,
key, ckpt.Status)
if !IsStatusNoNeedProcess(ckpt.Status) {
return false, nil
}
} else if ckpt.Status == StatusInProcessing && *shard.SequenceNumberRange.StartingSequenceNumber > ckpt.SequenceNumber &&
ckpt.SequenceNumber != "" {
LOG.Warn("collection[%v] with shard[%v] shard.StartingSequenceNumber[%v] > checkpoint.SequenceNumber[%v]",
*stream.TableName, key, *shard.SequenceNumberRange.StartingSequenceNumber, ckpt.SequenceNumber)
return false, nil
}
}
return true, nil
}
// write new checkpoint before full sync
func PrepareFullSyncCkpt(ckptManager Writer, dynamoSession *dynamodb.DynamoDB,
dynamoStreams *dynamodbstreams.DynamoDBStreams) (map[string]*dynamodbstreams.Stream, error) {
// fetch source tables
sourceTableList, err := utils.FetchTableList(dynamoSession)
if err != nil {
return nil, fmt.Errorf("fetch dynamodb table list failed[%v]", err)
}
// filter
sourceTableList = filter.FilterList(sourceTableList)
sourceTableMap := utils.StringListToMap(sourceTableList)
LOG.Info("traverse current streams")
// traverse streams to check whether all streams enabled
var lastEvaluateString *string = nil
for {
var listStreamInput *dynamodbstreams.ListStreamsInput
if lastEvaluateString != nil {
listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
} else {
listStreamInput = &dynamodbstreams.ListStreamsInput{}
}
streamList, err := dynamoStreams.ListStreams(listStreamInput)
if err != nil {
return nil, fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
}
LOG.Info("PrepareFullSyncCkpt streamList[%v]", streamList)
for _, stream := range streamList.Streams {
LOG.Info("check stream with table[%v]", *stream.TableName)
describeStreamResult, err := dynamoStreams.DescribeStream(&dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
})
if err != nil {
return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", *stream.StreamArn,
*stream.TableName, err)
}
if *describeStreamResult.StreamDescription.StreamStatus == "DISABLED" {
// stream is disabled
continue
}
if filter.IsFilter(*stream.TableName) {
LOG.Info("table[%v] filtered", *stream.TableName)
continue
}
// remove from sourceTableMap marks this stream already enabled
delete(sourceTableMap, *stream.TableName)
}
LOG.Info("PrepareFullSyncCkpt after traverse current streams")
if streamList.LastEvaluatedStreamArn == nil {
// end
break
} else {
lastEvaluateString = streamList.LastEvaluatedStreamArn
}
}
if len(sourceTableMap) != 0 {
LOG.Info("enable and marks streams: %v", sourceTableMap)
// enable and marks new stream
for table := range sourceTableMap {
_, err := dynamoSession.UpdateTable(&dynamodb.UpdateTableInput{
TableName: aws.String(table),
StreamSpecification: &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(true),
StreamViewType: aws.String(StreamViewType),
},
})
if err != nil {
return nil, fmt.Errorf("enable stream for table[%v] failed[%v]", table, err)
}
}
LOG.Info("wait 30 seconds for new streams created[%v]...", sourceTableMap)
time.Sleep(30 * time.Second) // wait new stream created
}
// re-create a new map to mark whether current table exists on the target checkpoint table
sourceCkptTableMap := utils.StringListToMap(sourceTableList)
LOG.Info("traverse all streams: %v", sourceCkptTableMap)
streamMap := make(map[string]*dynamodbstreams.Stream)
// traverse streams
lastEvaluateString = nil
for {
var listStreamInput *dynamodbstreams.ListStreamsInput
if lastEvaluateString != nil {
listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
} else {
listStreamInput = &dynamodbstreams.ListStreamsInput{}
}
streamList, err := dynamoStreams.ListStreams(listStreamInput)
if err != nil {
return nil, fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
}
for _, stream := range streamList.Streams {
if filter.IsFilter(*stream.TableName) {
LOG.Info("table[%v] filtered", *stream.TableName)
continue
}
LOG.Info("check checkpoint stream with table[%v]", *stream.TableName)
preDescribeStreamResult, preErr := dynamoStreams.DescribeStream(&dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
})
if preErr != nil {
return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
stream.TableName, err)
}
if *preDescribeStreamResult.StreamDescription.StreamStatus == "DISABLED" {
// stream is disabled
LOG.Info("stream with table[%v] disabled", *stream.TableName)
continue
}
if _, ok := sourceCkptTableMap[*stream.TableName]; !ok {
LOG.Info("table[%v] already in checkpoint table", *stream.TableName)
continue
}
delete(sourceCkptTableMap, *stream.TableName)
// add into steamMap which will be used in incr-sync fetcher
streamMap[*stream.TableName] = stream
var allShards []*dynamodbstreams.Shard
var lastShardIdString *string = nil
for {
var describeStreamInput *dynamodbstreams.DescribeStreamInput
if lastShardIdString != nil {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
ExclusiveStartShardId: lastShardIdString,
}
} else {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
}
}
describeStreamResult, err := dynamoStreams.DescribeStream(describeStreamInput)
if err != nil {
return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
stream.TableName, err)
}
allShards = append(allShards, describeStreamResult.StreamDescription.Shards...)
if describeStreamResult.StreamDescription.LastEvaluatedShardId == nil {
break
} else {
lastShardIdString = describeStreamResult.StreamDescription.LastEvaluatedShardId
LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
*stream.TableName, *describeStreamResult.StreamDescription.LastEvaluatedShardId)
}
}
LOG.Info("PrepareFullSyncCkpt table[%v] allShards[%v]", *stream.TableName, allShards)
// traverse shard
LOG.Info("table[%v] doesn't have checkpoint, try to insert", *stream.TableName)
rootNode := utils.BuildShardTree(allShards, *stream.TableName,
*stream.StreamArn)
if tree, err := utils.PrintShardTree(rootNode); err != nil {
LOG.Info("table[%v] traverse to print tree failed[%v]", *stream.TableName, err)
} else {
LOG.Info("traverse stream tree for table[%v]: \n-----\n%v\n-----", *stream.TableName, tree)
}
err = utils.TraverseShard(rootNode, func(node *utils.ShardNode) error {
var father string
if node.Shard.ParentShardId != nil {
father = *node.Shard.ParentShardId
}
ckpt := &Checkpoint{
ShardId: *node.Shard.ShardId,
FatherId: father,
SequenceNumber: "",
Status: StatusNotProcess,
WorkerId: "unknown-worker",
IteratorType: IteratorTypeAtSequence,
}
LOG.Info("TraverseShard ShardId[%c] FatherId[%v] begin", *node.Shard.ShardId, father)
if node.Shard.SequenceNumberRange.EndingSequenceNumber != nil {
// shard already closed
ckpt.Status = StatusNoNeedProcess
LOG.Info("insert table[%v] checkpoint[%v]: %v", *stream.TableName, *ckpt, ckpt.Status)
return ckptManager.Insert(ckpt, *stream.TableName)
} else {
ckpt.Status = StatusPrepareProcess
ckpt.IteratorType = IteratorTypeLatest
outShardIt, err := dynamoStreams.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: node.Shard.ShardId,
ShardIteratorType: aws.String(IteratorTypeLatest),
StreamArn: stream.StreamArn,
})
if err != nil {
return fmt.Errorf("construct shard[%v] iterator failed[%v]", node.Shard.ShardId, err)
}
// ckpt.ShardIt = *outShardIt.ShardIterator
ckpt.ShardIt = InitShardIt
LOG.Info("TraverseShard noEndsequence ShardId[%c] FatherId[%v] outShardIt[%v]",
*node.Shard.ShardId, father, outShardIt)
// fetch sequence number based on first record
if seq, approximate, err := fetchSeqNumber(outShardIt.ShardIterator, dynamoStreams, node.Table); err != nil {
return fmt.Errorf("fetch shard[%v] sequence number failed[%v]", node.Shard.ShardId, err)
} else if seq != "" {
ckpt.SequenceNumber = seq
ckpt.ApproximateTime = approximate
} else {
// set the shard start sequence number as sequence number if no data return
ckpt.SequenceNumber = *node.Shard.SequenceNumberRange.StartingSequenceNumber
}
// v1.0.4: do not set
// set shard iterator map which will be used in incr-sync
// GlobalShardIteratorMap.Set(*node.Shard.ShardId, *outShardIt.ShardIterator)
LOG.Info("insert table[%v] checkpoint[%v]", *stream.TableName, *ckpt)
if err = ckptManager.Insert(ckpt, *stream.TableName); err != nil {
return fmt.Errorf("shard[%v] insert checkpoint failed[%v]", *node.Shard.ShardId, err)
}
if len(node.Sons) != 0 {
LOG.Info("shard[%v] has sons[%v], wait current node finished", *node.Shard.ShardId, node.Sons)
return utils.StopTraverseSonErr
}
}
return nil
})
if err != nil {
LOG.Crashf("traverse stream tree for table[%v] failed[%v]", *stream.TableName, err)
}
}
if streamList.LastEvaluatedStreamArn == nil {
// end
break
} else {
lastEvaluateString = streamList.LastEvaluatedStreamArn
}
}
return streamMap, nil
}
// fetch first sequence number based on given shardIt
func fetchSeqNumber(shardIt *string, dynamoStreams *dynamodbstreams.DynamoDBStreams, table string) (string, string, error) {
LOG.Info("fetch sequence number of shard[%v] table[%v]", *shardIt, table)
for i := 0; i < 7; i++ {
records, err := dynamoStreams.GetRecords(&dynamodbstreams.GetRecordsInput{
ShardIterator: shardIt,
Limit: aws.Int64(1),
})
if err != nil {
// TODO fix err( shardIt is nil after GetRecords)
LOG.Error("fetch sequence number of table[%v]: err[%v]", table, err)
return "", "", nil
}
if len(records.Records) > 0 {
LOG.Info("fetch sequence number of shard[%v] table[%v]:found", *shardIt, table)
return *records.Records[0].Dynamodb.SequenceNumber,
records.Records[0].Dynamodb.ApproximateCreationDateTime.String(), nil
}
time.Sleep(1 * time.Second)
shardIt = records.NextShardIterator
}
LOG.Info("fetch sequence number of shard[%v] table[%v]: not found, return empty", *shardIt, table)
// what if no data? return empty
return "", "", nil
}