in nimo-shake/checkpoint/manager.go [17:97]
func CheckCkpt(ckptWriter Writer,
dynamoStreams *dynamodbstreams.DynamoDBStreams) (bool, map[string]*dynamodbstreams.Stream, error) {
// fetch target checkpoint status table
status, err := ckptWriter.FindStatus()
if err != nil {
return false, nil, fmt.Errorf("find checkpoint status failed[%v]", err)
}
// need full sync if status != CheckpointStatusValueIncrSync which means last time sync isn't incr sync
if status != CheckpointStatusValueIncrSync {
LOG.Info("checkpoint status[%v] != %v, need full sync", status, CheckpointStatusValueIncrSync)
return false, nil, nil
}
// extract checkpoint from mongodb, map[tableName] = (map[shardId] = checkpoint element)
ckptMap, err := ckptWriter.ExtractCheckpoint()
if err != nil {
LOG.Error("extract checkpoint failed[%v]", err)
return false, nil, err
}
LOG.Info("checkpoint map: %v", ckptMap)
// fetch source stream shard information
/*
* Currently, nimo-shake can't handle DDL like create table and drop table.
* So, if user crate or drop table and restart, nimo-shake will run full-sync.
* Plus, DDL can't be run when nimo-shake starts, no matter in full-sync or
* incr-sync.
*/
streamMap := make(map[string]*dynamodbstreams.Stream, len(ckptMap))
var lastEvaluateString *string = nil
for {
var listStreamInput *dynamodbstreams.ListStreamsInput
if lastEvaluateString != nil {
listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
} else {
listStreamInput = &dynamodbstreams.ListStreamsInput{}
}
streamList, err := dynamoStreams.ListStreams(listStreamInput)
if err != nil {
fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
}
// handle stream list, one table may have several streams
for _, stream := range streamList.Streams {
if _, ok := streamMap[*stream.TableName]; ok {
continue
}
if filter.IsFilter(*stream.TableName) {
LOG.Info("table[%v] filtered", *stream.TableName)
continue
}
if exist, err := CheckSingleStream(stream, dynamoStreams, ckptMap); err != nil {
return false, nil, err
} else if exist {
streamMap[*stream.TableName] = stream
} else {
// need full sync
LOG.Warn("table[%v] not exists on checkpoint, need full sync", *stream.TableName)
return false, nil, nil
}
}
if streamList.LastEvaluatedStreamArn == nil {
// end
break
} else {
lastEvaluateString = streamList.LastEvaluatedStreamArn
}
}
for key := range ckptMap {
if _, ok := streamMap[key]; !ok {
LOG.Info("table[%v] not exist on streams", key)
return false, nil, nil
}
}
return true, streamMap, nil
}