func()

in nimo-shake/full-sync/document-syncer.go [70:143]


func (ds *documentSyncer) Run() {
	batchNumber := int(conf.Options.FullDocumentWriteBatch)
	LOG.Info("%s start with batchSize[%v]", ds.String(), batchNumber)

	var data interface{}
	var ok bool
	batchGroup := make([]interface{}, 0, batchNumber)
	timeout := false
	batchGroupSize := 0
	exit := false
	for {
		StartT := time.Now()
		select {
		case data, ok = <-ds.inputChan:
			if !ok {
				exit = true
				LOG.Info("%s channel already closed, flushing cache and exiting...", ds.String())
			}
		case <-time.After(time.Second * batchTimeout):
			// timeout
			timeout = true
			data = nil
		}
		readParserChanDuration := time.Since(StartT)

		LOG.Debug("exit[%v], timeout[%v], len(batchGroup)[%v], batchGroupSize[%v], data[%v]", exit, timeout,
			len(batchGroup), batchGroupSize, data)

		if data != nil {
			if UT_TestDocumentSyncer {
				batchGroup = append(batchGroup, data)
			} else {
				switch v := data.(type) {
				case protocal.RawData:
					if v.Size > 0 {
						batchGroup = append(batchGroup, v.Data)
						batchGroupSize += v.Size
					}
				case map[string]*dynamodb.AttributeValue:
					batchGroup = append(batchGroup, v)
					// meaningless batchGroupSize
				}
			}
		}

		if exit || timeout || len(batchGroup) >= batchNumber || batchGroupSize >= batchSize {
			StartT = time.Now()
			batchGroupLen := len(batchGroup)
			if len(batchGroup) != 0 {
				if err := ds.write(batchGroup); err != nil {
					LOG.Crashf("%s write data failed[%v]", ds.String(), err)
				}

				batchGroup = make([]interface{}, 0, batchNumber)
				batchGroupSize = 0
			}
			writeDestDBDuration := time.Since(StartT)
			LOG.Info("%s write db batch[%v] parserChan.len[%v] readParserChanTime[%v] writeDestDbTime[%v]",
				ds.String(), batchGroupLen, len(ds.inputChan), readParserChanDuration, writeDestDBDuration)

			if exit {
				break
			}
			timeout = false
		}
	}

	go func() {
		<-time.NewTimer(time.Minute * 5).C
		ds.writer.Close()
		LOG.Info("%s full-sync writer close", ds.String())
	}()
	LOG.Info("%s finish writing", ds.String())
}