func PrepareFullSyncCkpt()

in nimo-shake/checkpoint/manager.go [171:422]


func PrepareFullSyncCkpt(ckptManager Writer, dynamoSession *dynamodb.DynamoDB,
	dynamoStreams *dynamodbstreams.DynamoDBStreams) (map[string]*dynamodbstreams.Stream, error) {
	// fetch source tables
	sourceTableList, err := utils.FetchTableList(dynamoSession)
	if err != nil {
		return nil, fmt.Errorf("fetch dynamodb table list failed[%v]", err)
	}

	// filter
	sourceTableList = filter.FilterList(sourceTableList)
	sourceTableMap := utils.StringListToMap(sourceTableList)

	LOG.Info("traverse current streams")
	// traverse streams to check whether all streams enabled
	var lastEvaluateString *string = nil
	for {
		var listStreamInput *dynamodbstreams.ListStreamsInput
		if lastEvaluateString != nil {
			listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
		} else {
			listStreamInput = &dynamodbstreams.ListStreamsInput{}
		}
		streamList, err := dynamoStreams.ListStreams(listStreamInput)
		if err != nil {
			return nil, fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
		}

		LOG.Info("PrepareFullSyncCkpt streamList[%v]", streamList)

		for _, stream := range streamList.Streams {
			LOG.Info("check stream with table[%v]", *stream.TableName)
			describeStreamResult, err := dynamoStreams.DescribeStream(&dynamodbstreams.DescribeStreamInput{
				StreamArn: stream.StreamArn,
			})
			if err != nil {
				return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", *stream.StreamArn,
					*stream.TableName, err)
			}

			if *describeStreamResult.StreamDescription.StreamStatus == "DISABLED" {
				// stream is disabled
				continue
			}

			if filter.IsFilter(*stream.TableName) {
				LOG.Info("table[%v] filtered", *stream.TableName)
				continue
			}

			// remove from sourceTableMap marks this stream already enabled
			delete(sourceTableMap, *stream.TableName)
		}

		LOG.Info("PrepareFullSyncCkpt after traverse current streams")

		if streamList.LastEvaluatedStreamArn == nil {
			// end
			break
		} else {
			lastEvaluateString = streamList.LastEvaluatedStreamArn
		}
	}

	if len(sourceTableMap) != 0 {
		LOG.Info("enable and marks streams: %v", sourceTableMap)
		// enable and marks new stream
		for table := range sourceTableMap {
			_, err := dynamoSession.UpdateTable(&dynamodb.UpdateTableInput{
				TableName: aws.String(table),
				StreamSpecification: &dynamodb.StreamSpecification{
					StreamEnabled:  aws.Bool(true),
					StreamViewType: aws.String(StreamViewType),
				},
			})
			if err != nil {
				return nil, fmt.Errorf("enable stream for table[%v] failed[%v]", table, err)
			}
		}

		LOG.Info("wait 30 seconds for new streams created[%v]...", sourceTableMap)
		time.Sleep(30 * time.Second) // wait new stream created
	}

	// re-create a new map to mark whether current table exists on the target checkpoint table
	sourceCkptTableMap := utils.StringListToMap(sourceTableList)

	LOG.Info("traverse all streams: %v", sourceCkptTableMap)
	streamMap := make(map[string]*dynamodbstreams.Stream)
	// traverse streams
	lastEvaluateString = nil
	for {
		var listStreamInput *dynamodbstreams.ListStreamsInput
		if lastEvaluateString != nil {
			listStreamInput = &dynamodbstreams.ListStreamsInput{ExclusiveStartStreamArn: lastEvaluateString}
		} else {
			listStreamInput = &dynamodbstreams.ListStreamsInput{}
		}
		streamList, err := dynamoStreams.ListStreams(listStreamInput)
		if err != nil {
			return nil, fmt.Errorf("fetch dynamodb stream list failed[%v]", err)
		}

		for _, stream := range streamList.Streams {
			if filter.IsFilter(*stream.TableName) {
				LOG.Info("table[%v] filtered", *stream.TableName)
				continue
			}
			LOG.Info("check checkpoint stream with table[%v]", *stream.TableName)

			preDescribeStreamResult, preErr := dynamoStreams.DescribeStream(&dynamodbstreams.DescribeStreamInput{
				StreamArn: stream.StreamArn,
			})
			if preErr != nil {
				return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
					stream.TableName, err)
			}
			if *preDescribeStreamResult.StreamDescription.StreamStatus == "DISABLED" {
				// stream is disabled
				LOG.Info("stream with table[%v] disabled", *stream.TableName)
				continue
			}
			if _, ok := sourceCkptTableMap[*stream.TableName]; !ok {
				LOG.Info("table[%v] already in checkpoint table", *stream.TableName)
				continue
			}
			delete(sourceCkptTableMap, *stream.TableName)

			// add into steamMap which will be used in incr-sync fetcher
			streamMap[*stream.TableName] = stream

			var allShards []*dynamodbstreams.Shard
			var lastShardIdString *string = nil
			for {
				var describeStreamInput *dynamodbstreams.DescribeStreamInput
				if lastShardIdString != nil {
					describeStreamInput = &dynamodbstreams.DescribeStreamInput{
						StreamArn:             stream.StreamArn,
						ExclusiveStartShardId: lastShardIdString,
					}
				} else {
					describeStreamInput = &dynamodbstreams.DescribeStreamInput{
						StreamArn: stream.StreamArn,
					}
				}
				describeStreamResult, err := dynamoStreams.DescribeStream(describeStreamInput)

				if err != nil {
					return nil, fmt.Errorf("describe stream[%v] with table[%v] failed[%v]", stream.StreamArn,
						stream.TableName, err)
				}

				allShards = append(allShards, describeStreamResult.StreamDescription.Shards...)

				if describeStreamResult.StreamDescription.LastEvaluatedShardId == nil {
					break
				} else {
					lastShardIdString = describeStreamResult.StreamDescription.LastEvaluatedShardId
					LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
						*stream.TableName, *describeStreamResult.StreamDescription.LastEvaluatedShardId)
				}
			}
			LOG.Info("PrepareFullSyncCkpt table[%v] allShards[%v]", *stream.TableName, allShards)

			// traverse shard
			LOG.Info("table[%v] doesn't have checkpoint, try to insert", *stream.TableName)
			rootNode := utils.BuildShardTree(allShards, *stream.TableName,
				*stream.StreamArn)

			if tree, err := utils.PrintShardTree(rootNode); err != nil {
				LOG.Info("table[%v] traverse to print tree failed[%v]", *stream.TableName, err)
			} else {
				LOG.Info("traverse stream tree for table[%v]: \n-----\n%v\n-----", *stream.TableName, tree)
			}

			err = utils.TraverseShard(rootNode, func(node *utils.ShardNode) error {
				var father string
				if node.Shard.ParentShardId != nil {
					father = *node.Shard.ParentShardId
				}
				ckpt := &Checkpoint{
					ShardId:        *node.Shard.ShardId,
					FatherId:       father,
					SequenceNumber: "",
					Status:         StatusNotProcess,
					WorkerId:       "unknown-worker",
					IteratorType:   IteratorTypeAtSequence,
				}

				LOG.Info("TraverseShard ShardId[%c] FatherId[%v] begin", *node.Shard.ShardId, father)
				if node.Shard.SequenceNumberRange.EndingSequenceNumber != nil {
					// shard already closed
					ckpt.Status = StatusNoNeedProcess
					LOG.Info("insert table[%v] checkpoint[%v]: %v", *stream.TableName, *ckpt, ckpt.Status)
					return ckptManager.Insert(ckpt, *stream.TableName)
				} else {
					ckpt.Status = StatusPrepareProcess
					ckpt.IteratorType = IteratorTypeLatest
					outShardIt, err := dynamoStreams.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
						ShardId:           node.Shard.ShardId,
						ShardIteratorType: aws.String(IteratorTypeLatest),
						StreamArn:         stream.StreamArn,
					})
					if err != nil {
						return fmt.Errorf("construct shard[%v] iterator failed[%v]", node.Shard.ShardId, err)
					}
					// ckpt.ShardIt = *outShardIt.ShardIterator
					ckpt.ShardIt = InitShardIt

					LOG.Info("TraverseShard noEndsequence ShardId[%c] FatherId[%v] outShardIt[%v]",
						*node.Shard.ShardId, father, outShardIt)
					// fetch sequence number based on first record
					if seq, approximate, err := fetchSeqNumber(outShardIt.ShardIterator, dynamoStreams, node.Table); err != nil {
						return fmt.Errorf("fetch shard[%v] sequence number failed[%v]", node.Shard.ShardId, err)
					} else if seq != "" {
						ckpt.SequenceNumber = seq
						ckpt.ApproximateTime = approximate
					} else {
						// set the shard start sequence number as sequence number if no data return
						ckpt.SequenceNumber = *node.Shard.SequenceNumberRange.StartingSequenceNumber
					}

					// v1.0.4: do not set
					// set shard iterator map which will be used in incr-sync
					// GlobalShardIteratorMap.Set(*node.Shard.ShardId, *outShardIt.ShardIterator)

					LOG.Info("insert table[%v] checkpoint[%v]", *stream.TableName, *ckpt)
					if err = ckptManager.Insert(ckpt, *stream.TableName); err != nil {
						return fmt.Errorf("shard[%v] insert checkpoint failed[%v]", *node.Shard.ShardId, err)
					}

					if len(node.Sons) != 0 {
						LOG.Info("shard[%v] has sons[%v], wait current node finished", *node.Shard.ShardId, node.Sons)
						return utils.StopTraverseSonErr
					}
				}
				return nil
			})
			if err != nil {
				LOG.Crashf("traverse stream tree for table[%v] failed[%v]", *stream.TableName, err)
			}
		}

		if streamList.LastEvaluatedStreamArn == nil {
			// end
			break
		} else {
			lastEvaluateString = streamList.LastEvaluatedStreamArn
		}
	}

	return streamMap, nil
}