func()

in nimo-shake/incr-sync/fetcher.go [48:205]


func (f *Fetcher) Run() {
	md5Map := make(map[string]uint64)
	tableEpoch := make(map[string]int) // GlobalFetcherMoreFlag, restore previous epoch

	qos := qps.StartQoS(10)
	defer qos.Close()

	for range time.NewTicker(FetcherInterval * time.Second).C {
		shardList := make([]*utils.ShardNode, 0)
		// LOG.Debug("fetch table[%v] stream", table)

		preEpoch, ok := tableEpoch[f.table]
		if !ok {
			tableEpoch[f.table] = 0
		}

		var allShards []*dynamodbstreams.Shard
		var lastShardIdString *string = nil
		for {
			var describeStreamInput *dynamodbstreams.DescribeStreamInput
			if lastShardIdString != nil {
				describeStreamInput = &dynamodbstreams.DescribeStreamInput{
					StreamArn:             f.stream.StreamArn,
					ExclusiveStartShardId: lastShardIdString,
				}
			} else {
				describeStreamInput = &dynamodbstreams.DescribeStreamInput{
					StreamArn: f.stream.StreamArn,
				}
			}

			// limit qos of api DescribeStreamInput
			<-qos.Bucket

			desStream, err := f.dynamoClient.DescribeStream(describeStreamInput)
			if err != nil {
				LOG.Crashf("describe table[%v] with stream[%v] failed[%v]", f.table, *f.stream.StreamArn, err)
			}
			if *desStream.StreamDescription.StreamStatus == "DISABLED" {
				LOG.Crashf("table[%v] with stream[%v] has already been disabled", f.table, *f.stream.StreamArn)
			}

			allShards = append(allShards, desStream.StreamDescription.Shards...)

			if desStream.StreamDescription.LastEvaluatedShardId == nil {
				break
			} else {
				lastShardIdString = desStream.StreamDescription.LastEvaluatedShardId
				LOG.Info("table[%v] have next shardId,LastEvaluatedShardId[%v]",
					f.table, *desStream.StreamDescription.LastEvaluatedShardId)
			}
		}
		LOG.Info("fetch.Run table[%v] allShards(len:%d)[%v]", f.table, len(allShards), allShards)

		rootNode := utils.BuildShardTree(allShards, f.table, *f.stream.StreamArn)
		md5 := utils.CalMd5(rootNode)

		GlobalFetcherLock.Lock()
		curEpoch := GlobalFetcherMoreFlag[f.table]
		GlobalFetcherLock.Unlock()

		if val, ok := md5Map[f.table]; !ok || val != md5 {
			// shards is changed
			LOG.Info("table[%v] md5 changed from old[%v] to new[%v], need fetch shard", f.table, val, md5)
			md5Map[f.table] = md5
		} else if preEpoch != curEpoch {
			// old shard has already been finished
			LOG.Info("table[%v] curEpoch[%v] != preEpoch[%v]", f.table, curEpoch, preEpoch)
			tableEpoch[f.table] = curEpoch
		} else {
			LOG.Info("table[%v] md5-old[%v] md5-new[%v]", f.table, val, md5)

			continue
		}

		// extract checkpoint from mongodb
		ckptSingleMap, err := f.ckptWriter.ExtractSingleCheckpoint(f.table)
		if err != nil {
			LOG.Crashf("extract checkpoint failed[%v]", err)
		} else {
			LOG.Info("table:[%v] ckptSingleMap:[%v]", f.table, ckptSingleMap)
		}

		if tree, err := utils.PrintShardTree(rootNode); err != nil {
			LOG.Info("table[%v] traverse to print tree failed[%v]", f.table, err)
		} else {
			LOG.Info("traverse stream tree for table[%v](father->child): \n-----\n%v\n-----", f.table, tree)
		}

		// traverse shards
		err = utils.TraverseShard(rootNode, func(node *utils.ShardNode) error {
			LOG.Info("traverse shard[%v]", *node.Shard.ShardId)
			id := *node.Shard.ShardId
			var father string
			if node.Shard.ParentShardId != nil {
				father = *node.Shard.ParentShardId
			}

			ckpt, ok := ckptSingleMap[id]
			if !ok {
				// insert checkpoint
				newCkpt := &checkpoint.Checkpoint{
					ShardId:        id,
					SequenceNumber: *node.Shard.SequenceNumberRange.StartingSequenceNumber,
					Status:         checkpoint.StatusPrepareProcess,
					WorkerId:       "unknown",
					FatherId:       father,
					IteratorType:   checkpoint.IteratorTypeTrimHorizon,
					UpdateDate:     "", // empty at first
				}
				f.ckptWriter.Insert(newCkpt, f.table)
				shardList = append(shardList, node)
				LOG.Info("insert new checkpoint: %v ckptSingleMap[id]:%v", *newCkpt, ckptSingleMap[id])
				return utils.StopTraverseSonErr
			}
			switch ckpt.Status {
			case checkpoint.StatusNoNeedProcess:
				LOG.Info("no need to process: %v", *ckpt)
				return nil
			case checkpoint.StatusPrepareProcess:
				LOG.Info("status already in prepare: %v", *ckpt)
				shardList = append(shardList, node)
				return utils.StopTraverseSonErr
			case checkpoint.StatusInProcessing:
				LOG.Info("status already in processing: %v", *ckpt)
				shardList = append(shardList, node)
				return utils.StopTraverseSonErr
			case checkpoint.StatusNotProcess:
				fallthrough
			case checkpoint.StatusWaitFather:
				LOG.Info("status need to process: %v", *ckpt)
				ckpt.SequenceNumber = *node.Shard.SequenceNumberRange.StartingSequenceNumber
				ckpt.Status = checkpoint.StatusPrepareProcess
				ckpt.IteratorType = checkpoint.IteratorTypeTrimHorizon
				f.ckptWriter.Update(ckpt.ShardId, ckpt, f.table)
				shardList = append(shardList, node)
				return utils.StopTraverseSonErr
			case checkpoint.StatusDone:
				LOG.Info("already done: %v", *ckpt)
				return nil
			default:
				return fmt.Errorf("unknown checkpoint status[%v]", ckpt.Status)
			}

			return nil
		})
		if err != nil {
			LOG.Crashf("traverse shard tree failed[%v]", err)
		}

		// dispatch shard list
		for _, shard := range shardList {
			LOG.Info("need to dispatch shard[%v]", *shard.Shard.ShardId)
			f.shardChan <- shard
		}
	}
	LOG.Crashf("can't see me!")
}