in nimo-shake/incr-sync/syncer.go [232:317]
func (d *Dispatcher) Run() {
// fetch shardIt
shardIt, ok := checkpoint.GlobalShardIteratorMap.Get(*d.shard.Shard.ShardId)
if ok {
checkpoint.GlobalShardIteratorMap.Delete(*d.shard.Shard.ShardId)
LOG.Info("%s current shard already in ShardIteratorMap", d.String())
} else {
// check current checkpoint
ckpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
if err != nil {
LOG.Crashf("%s query current[%v] checkpoint fail[%v]", d.String(), *d.shard.Shard.ShardId, err)
}
if ckpt.IteratorType == checkpoint.IteratorTypeLatest && checkpoint.IsStatusProcessing(ckpt.Status) {
if ckpt.ShardIt == "" {
/*
* iterator_type == "LATEST" means this shard has been found before full-sync.
* When checkpoint updated, this field will be updated to "AT_SEQUENCE_NUMBER" in incr_sync stage,
* so this case only happened when nimo-shake finished full-sync and then crashed before incr_sync
*/
LOG.Crashf("%s shard[%v] iterator type[%v] abnormal, status[%v], need full sync", d.String(),
*d.shard.Shard.ShardId, ckpt.IteratorType, ckpt.Status)
} else if ckpt.ShardIt == checkpoint.InitShardIt {
// means generate new shardIt
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
SequenceNumber: aws.String(ckpt.SequenceNumber),
ShardIteratorType: aws.String(checkpoint.IteratorTypeAtSequence),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
} else {
// dynamodb rule: this is only used when restart in 30 minutes
shardIt = ckpt.ShardIt
}
} else if ckpt.IteratorType == checkpoint.IteratorTypeTrimHorizon {
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
ShardIteratorType: aws.String(checkpoint.IteratorTypeTrimHorizon),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
} else {
shardItOut, err := d.dynamoStreamSession.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
ShardId: d.shard.Shard.ShardId,
SequenceNumber: aws.String(ckpt.SequenceNumber),
ShardIteratorType: aws.String(checkpoint.IteratorTypeAfterSequence),
StreamArn: aws.String(d.shard.ShardArn),
})
if err != nil {
LOG.Crashf("%s get shard iterator[SequenceNumber:%v, ShardIteratorType:%s, StreamArn:%s] "+
"failed[%v]", d.String(), ckpt.SequenceNumber, checkpoint.IteratorTypeAtSequence,
d.shard.ShardArn, err)
}
shardIt = *shardItOut.ShardIterator
}
}
LOG.Info("%s start with shard iterator[%v]", d.String(), shardIt)
// update checkpoint: in-processing
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
checkpoint.FieldStatus: checkpoint.StatusInProcessing,
}, d.ns.Collection)
if err != nil {
LOG.Crashf("%s update checkpoint to in-processing failed[%v]", d.String(), err)
}
LOG.Info("%s shard-id[%v] finish updating checkpoint", d.String(), shardIt)
// get records
d.getRecords(shardIt)
LOG.Info("%s finish shard", d.String())
LOG.Info("%s shard fetch done, Run() func exit", d.String())
}