in nimo-shake/checkpoint/manager.go [100:168]
func CheckSingleStream(stream *dynamodbstreams.Stream, dynamoStreams *dynamodbstreams.DynamoDBStreams,
ckptMap map[string]map[string]*Checkpoint) (bool, error) {
// check table exists on checkpoint map
ckptInnerMap, ok := ckptMap[*stream.TableName]
if !ok {
LOG.Warn("collection[%v] isn't exist on checkpoint map", *stream.TableName)
return false, nil
}
shardMap := make(map[string]*dynamodbstreams.Shard)
var lastShardIdString *string = nil
for {
var describeStreamInput *dynamodbstreams.DescribeStreamInput
if lastShardIdString != nil {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
ExclusiveStartShardId: lastShardIdString,
}
} else {
describeStreamInput = &dynamodbstreams.DescribeStreamInput{
StreamArn: stream.StreamArn,
}
}
describeResult, err := dynamoStreams.DescribeStream(describeStreamInput)
if err != nil {
return false, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
stream.TableName, err)
}
if *describeResult.StreamDescription.StreamStatus == "DISABLED" {
// stream is disabled
return false, nil
}
// convert shard list to map
for _, shard := range describeResult.StreamDescription.Shards {
shardMap[*shard.ShardId] = shard
}
if describeResult.StreamDescription.LastEvaluatedShardId == nil {
break
} else {
lastShardIdString = describeResult.StreamDescription.LastEvaluatedShardId
LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
*stream.TableName, *describeResult.StreamDescription.LastEvaluatedShardId)
}
}
LOG.Info("dynamo stream shard map: %v", shardMap)
// check shards exist
for key, ckpt := range ckptInnerMap {
if shard, ok := shardMap[key]; !ok {
LOG.Warn("collection[%v] with shard[%v] isn't exist on the stream, status[%v]", *stream.TableName,
key, ckpt.Status)
if !IsStatusNoNeedProcess(ckpt.Status) {
return false, nil
}
} else if ckpt.Status == StatusInProcessing && *shard.SequenceNumberRange.StartingSequenceNumber > ckpt.SequenceNumber &&
ckpt.SequenceNumber != "" {
LOG.Warn("collection[%v] with shard[%v] shard.StartingSequenceNumber[%v] > checkpoint.SequenceNumber[%v]",
*stream.TableName, key, *shard.SequenceNumberRange.StartingSequenceNumber, ckpt.SequenceNumber)
return false, nil
}
}
return true, nil
}