func()

in nimo-shake/incr-sync/syncer.go [232:317]


func (d *Dispatcher) Run() {

	// fetch shardIt
	shardIt, ok := checkpoint.GlobalShardIteratorMap.Get(*d.shard.Shard.ShardId)
	if ok {
		checkpoint.GlobalShardIteratorMap.Delete(*d.shard.Shard.ShardId)

		LOG.Info("%s current shard already in ShardIteratorMap", d.String())
	} else {
		// check current checkpoint
		ckpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
		if err != nil {
			LOG.Crashf("%s query current[%v] checkpoint fail[%v]", d.String(), *d.shard.Shard.ShardId, err)
		}
		if ckpt.IteratorType == checkpoint.IteratorTypeLatest && checkpoint.IsStatusProcessing(ckpt.Status) {
			if ckpt.ShardIt == "" {
				/*
				 * iterator_type == "LATEST" means this shard has been found before full-sync.
				 * When checkpoint updated, this field will be updated to "AT_SEQUENCE_NUMBER" in incr_sync stage,
				 * so this case only happened when nimo-shake finished full-sync and then crashed before incr_sync
				 */
				LOG.Crashf("%s shard[%v] iterator type[%v] abnormal, status[%v], need full sync", d.String(),
					*d.shard.Shard.ShardId, ckpt.IteratorType, ckpt.Status)
			} else if ckpt.ShardIt == checkpoint.InitShardIt {
				// means generate new shardIt
				shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
					ShardId:           d.shard.Shard.ShardId,
					SequenceNumber:    aws.String(ckpt.SequenceNumber),
					ShardIteratorType: aws.String(checkpoint.IteratorTypeAtSequence),
					StreamArn:         aws.String(d.shard.ShardArn),
				})
				if err != nil {
					LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
						"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
						d.shard.ShardArn, err)
				}
				shardIt = *shardItOut.ShardIterator
			} else {
				// dynamodb rule: this is only used when restart in 30 minutes
				shardIt = ckpt.ShardIt
			}
		} else if ckpt.IteratorType == checkpoint.IteratorTypeTrimHorizon {
			shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
				ShardId:           d.shard.Shard.ShardId,
				ShardIteratorType: aws.String(checkpoint.IteratorTypeTrimHorizon),
				StreamArn:         aws.String(d.shard.ShardArn),
			})
			if err != nil {
				LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
					"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
					d.shard.ShardArn, err)
			}
			shardIt = *shardItOut.ShardIterator
		} else {
			shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
				ShardId:           d.shard.Shard.ShardId,
				SequenceNumber:    aws.String(ckpt.SequenceNumber),
				ShardIteratorType: aws.String(checkpoint.IteratorTypeAfterSequence),
				StreamArn:         aws.String(d.shard.ShardArn),
			})
			if err != nil {
				LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
					"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
					d.shard.ShardArn, err)
			}
			shardIt = *shardItOut.ShardIterator
		}
	}

	LOG.Info("%s start with shard iterator[%v]", d.String(), shardIt)

	// update checkpoint: in-processing
	err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
		checkpoint.FieldStatus: checkpoint.StatusInProcessing,
	}, d.ns.Collection)
	if err != nil {
		LOG.Crashf("%s update checkpoint to in-processing failed[%v]", d.String(), err)
	}
	LOG.Info("%s shard-id[%v] finish updating checkpoint", d.String(), shardIt)

	// get records
	d.getRecords(shardIt)
	LOG.Info("%s finish shard", d.String())

	LOG.Info("%s shard fetch done, Run() func exit", d.String())
}