func CheckCkpt()

in nimo-shake/checkpoint/manager.go [17:97]


func CheckCkpt(ckptWriter Writer,
	dynamoStreams *dynamodbstreams.DynamoDBStreams) (bool, map[string]*dynamodbstreams.Stream, error) {
	// fetch target checkpoint status table
	status, err := ckptWriter.FindStatus()
	if err != nil {
		return false, nil, fmt.Errorf("find checkpoint status failed[%v]", err)
	}
	// need full sync if status != CheckpointStatusValueIncrSync which means last time sync isn't incr sync
	if status != CheckpointStatusValueIncrSync {
		LOG.Info("checkpoint status[%v] != %v, need full sync", status, CheckpointStatusValueIncrSync)
		return false, nil, nil
	}

	// extract checkpoint from mongodb, map[tableName] = (map[shardId] = checkpoint element)
	ckptMap, err := ckptWriter.ExtractCheckpoint()
	if err != nil {
		LOG.Error("extract checkpoint failed[%v]", err)
		return false, nil, err
	}
	LOG.Info("checkpoint map: %v", ckptMap)

	// fetch source stream shard information
	/*
	 * Currently, nimo-shake can't handle DDL like create table and drop table.
	 * So, if user crate or drop table and restart, nimo-shake will run full-sync.
	 * Plus, DDL can't be run when nimo-shake starts, no matter in full-sync or
	 * incr-sync.
	 */
	streamMap := make(map[string]*dynamodbstreams.Stream, len(ckptMap))
	var lastEvaluateString *string = nil
	for {
		var listStreamInput *dynamodbstreams.ListStreamsInput
		if lastEvaluateString != nil {
			listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
		} else {
			listStreamInput = &dynamodbstreams.ListStreamsInput{}
		}

		streamList, err := dynamoStreams.ListStreams(listStreamInput)
		if err != nil {
			fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
		}

		// handle stream list, one table may have several streams
		for _, stream := range streamList.Streams {
			if _, ok := streamMap[*stream.TableName]; ok {
				continue
			}

			if filter.IsFilter(*stream.TableName) {
				LOG.Info("table[%v] filtered", *stream.TableName)
				continue
			}

			if exist, err := CheckSingleStream(stream, dynamoStreams, ckptMap); err != nil {
				return false, nil, err
			} else if exist {
				streamMap[*stream.TableName] = stream
			} else {
				// need full sync
				LOG.Warn("table[%v] not exists on checkpoint, need full sync", *stream.TableName)
				return false, nil, nil
			}
		}

		if streamList.LastEvaluatedStreamArn == nil {
			// end
			break
		} else {
			lastEvaluateString = streamList.LastEvaluatedStreamArn
		}
	}

	for key := range ckptMap {
		if _, ok := streamMap[key]; !ok {
			LOG.Info("table[%v] not exist on streams", key)
			return false, nil, nil
		}
	}
	return true, streamMap, nil
}