func Start()

in nimo-shake/run/run.go [37:155]


func Start() {
	LOG.Info("check connections")

	utils.FullSyncInitHttpApi(conf.Options.FullSyncHTTPListenPort)
	utils.IncrSyncInitHttpApi(conf.Options.IncrSyncHTTPListenPort)

	// init filter
	filter.Init(conf.Options.FilterCollectionWhite, conf.Options.FilterCollectionBlack)

	if err := utils.InitSession(conf.Options.SourceAccessKeyID, conf.Options.SourceSecretAccessKey,
		conf.Options.SourceSessionToken, conf.Options.SourceRegion, conf.Options.SourceEndpointUrl,
		conf.Options.SourceSessionMaxRetries, conf.Options.SourceSessionTimeout); err != nil {
		LOG.Crashf("init global session failed[%v]", err)
	}

	// check writer connection
	w := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress,
		utils.NS{"nimo-shake", "shake_writer_test"}, conf.Options.LogLevel)
	if w == nil {
		LOG.Crashf("connect type[%v] address[%v] failed[%v]",
			conf.Options.TargetType, conf.Options.TargetAddress)
	}

	// create dynamo session
	dynamoSession, err := utils.CreateDynamoSession(conf.Options.LogLevel)
	if err != nil {
		LOG.Crashf("create dynamodb session failed[%v]", err)
	}

	// create dynamo stream client
	dynamoStreamSession, err := utils.CreateDynamoStreamSession(conf.Options.LogLevel)
	if err != nil {
		LOG.Crashf("create dynamodb stream session failed[%v]", err)
	}

	LOG.Info("create checkpoint writer: type=%v", conf.Options.CheckpointType)
	ckptWriter := checkpoint.NewWriter(conf.Options.CheckpointType, conf.Options.CheckpointAddress,
		conf.Options.CheckpointDb)

	var skipFull bool
	var streamMap map[string]*dynamodbstreams.Stream
	if conf.Options.SyncMode == utils.SyncModeAll {
		LOG.Info("------------------------check checkpoint------------------------")
		skipFull, streamMap, err = checkpoint.CheckCkpt(ckptWriter, dynamoStreamSession)
		if err != nil {
			LOG.Crashf("check checkpoint failed[%v]", err)
		}
		LOG.Info("------------------------end check checkpoint------------------------")
	}

	// full sync
	skipIncrSync := false
	if skipFull == false {
		// register restful api
		full_sync.RestAPI()

		// start http server.
		nimo.GoRoutine(func() {
			// before starting, we must register all interface
			if err := utils.FullSyncHttpApi.Listen(); err != nil {
				LOG.Critical("start full sync server with port[%v] failed: %v", conf.Options.FullSyncHTTPListenPort,
					err)
			}
		})

		if conf.Options.SyncMode == utils.SyncModeAll {
			LOG.Info("------------------------drop old checkpoint------------------------")
			if err := ckptWriter.DropAll(); err != nil && err.Error() != utils.NotFountErr {
				LOG.Crashf("drop checkpoint failed[%v]", err)
			}

			LOG.Info("------------------------prepare checkpoint start------------------------")
			streamMap, err = checkpoint.PrepareFullSyncCkpt(ckptWriter, dynamoSession, dynamoStreamSession)
			if err != nil {
				LOG.Crashf("prepare checkpoint failed[%v]", err)
			}
			LOG.Info("------------------------prepare checkpoint done------------------------")

			// select{}
		} else {
			LOG.Info("sync.mode is 'full', no need to check checkpoint")
		}

		if conf.Options.IncrSyncParallel == true {
			skipIncrSync = true
			go incrStart(streamMap, ckptWriter)
		}

		// update checkpoint
		if err := ckptWriter.UpdateStatus(checkpoint.CheckpointStatusValueFullSync); err != nil {
			LOG.Crashf("set checkpoint to [%v] failed[%v]", checkpoint.CheckpointStatusValueFullSync, err)
		}

		LOG.Info("------------------------start full sync------------------------")
		full_sync.Start(dynamoSession, w)
		LOG.Info("------------------------full sync done!------------------------")
	}

	if conf.Options.SyncMode == utils.SyncModeFull {
		LOG.Info("sync.mode is 'full', finish")
		return
	}

	if conf.Options.SyncSchemaOnly {
		LOG.Info("sync_schema_only enabled, finish")
		return
	}

	// update checkpoint
	if err := ckptWriter.UpdateStatus(checkpoint.CheckpointStatusValueIncrSync); err != nil {
		LOG.Crashf("set checkpoint to [%v] failed[%v]", checkpoint.CheckpointStatusValueIncrSync, err)
	}

	if skipIncrSync == false {
		go incrStart(streamMap, ckptWriter)
	}

	select {}
}