func CheckSingleStream()

in nimo-shake/checkpoint/manager.go [100:168]


func CheckSingleStream(stream *dynamodbstreams.Stream, dynamoStreams *dynamodbstreams.DynamoDBStreams,
	ckptMap map[string]map[string]*Checkpoint) (bool, error) {

	// check table exists on checkpoint map
	ckptInnerMap, ok := ckptMap[*stream.TableName]
	if !ok {
		LOG.Warn("collection[%v] isn't exist on checkpoint map", *stream.TableName)
		return false, nil
	}

	shardMap := make(map[string]*dynamodbstreams.Shard)

	var lastShardIdString *string = nil
	for {
		var describeStreamInput *dynamodbstreams.DescribeStreamInput
		if lastShardIdString != nil {
			describeStreamInput = &dynamodbstreams.DescribeStreamInput{
				StreamArn:             stream.StreamArn,
				ExclusiveStartShardId: lastShardIdString,
			}
		} else {
			describeStreamInput = &dynamodbstreams.DescribeStreamInput{
				StreamArn: stream.StreamArn,
			}
		}
		describeResult, err := dynamoStreams.DescribeStream(describeStreamInput)
		if err != nil {
			return false, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
				stream.TableName, err)
		}
		if *describeResult.StreamDescription.StreamStatus == "DISABLED" {
			// stream is disabled
			return false, nil
		}

		// convert shard list to map
		for _, shard := range describeResult.StreamDescription.Shards {
			shardMap[*shard.ShardId] = shard
		}

		if describeResult.StreamDescription.LastEvaluatedShardId == nil {
			break
		} else {
			lastShardIdString = describeResult.StreamDescription.LastEvaluatedShardId
			LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
				*stream.TableName, *describeResult.StreamDescription.LastEvaluatedShardId)
		}
	}

	LOG.Info("dynamo stream shard map: %v", shardMap)

	// check shards exist
	for key, ckpt := range ckptInnerMap {
		if shard, ok := shardMap[key]; !ok {
			LOG.Warn("collection[%v] with shard[%v] isn't exist on the stream, status[%v]", *stream.TableName,
				key, ckpt.Status)
			if !IsStatusNoNeedProcess(ckpt.Status) {
				return false, nil
			}
		} else if ckpt.Status == StatusInProcessing && *shard.SequenceNumberRange.StartingSequenceNumber > ckpt.SequenceNumber &&
			ckpt.SequenceNumber != "" {
			LOG.Warn("collection[%v] with shard[%v] shard.StartingSequenceNumber[%v] > checkpoint.SequenceNumber[%v]",
				*stream.TableName, key, *shard.SequenceNumberRange.StartingSequenceNumber, ckpt.SequenceNumber)
			return false, nil
		}
	}

	return true, nil
}