func()

in nimo-shake/incr-sync/syncer.go [404:526]


func (d *Dispatcher) batcher() {
	node := &ExecuteNode{
		operate: make([]interface{}, 0, BatcherNumber),
		index:   make([]interface{}, 0, BatcherNumber),
	}

	var preEvent string
	var batchNr int
	var batchSize int
	for {
		var record *dynamodbstreams.Record
		ok := true
		timeout := false

		select {
		case record, ok = <-d.batchChan:
		case <-time.After(time.Second * IncrBatcherTimeout):
			timeout = true
			record = nil
		}

		if !ok || timeout {
			if len(node.operate) != 0 || len(node.index) != 0 {
				d.executorChan <- node
				node = &ExecuteNode{
					tp:      "",
					operate: make([]interface{}, 0, BatcherNumber),
				}
				preEvent = ""
				batchNr = 0
				batchSize = 0
			}
			if !ok {
				// channel close
				break
			}
			// timeout
			continue
		}

		if *record.EventName != preEvent || batchNr >= BatcherNumber || batchSize >= BatcherSize {
			// need split
			if len(node.operate) != 0 || len(node.index) != 0 {
				// preEvent != ""
				d.executorChan <- node
			}

			node = &ExecuteNode{
				tp:      *record.EventName,
				operate: make([]interface{}, 0, BatcherNumber), // need fetch data field when type is RawData
				index:   make([]interface{}, 0, BatcherNumber), // index list
			}
			preEvent = *record.EventName
			batchNr = 0
			batchSize = 0
		}

		// parse index
		index, err := d.converter.Run(record.Dynamodb.Keys)
		if err != nil {
			LOG.Crashf("%s convert parse[%v] failed[%v]", d.String(), record.Dynamodb.Keys, err)
		}

		// LOG.Info("~~~op[%v] data: %v", *record.EventName, record)

		// batch into list
		switch *record.EventName {
		case EventInsert:
			value, err := d.converter.Run(record.Dynamodb.NewImage)
			if err != nil {
				LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
			}

			switch d.targetWriter.(type) {
			case *writer.MongoCommunityWriter:
				node.operate = append(node.operate, value.(protocal.RawData).Data)
				node.index = append(node.index, index.(protocal.RawData).Data)
			case *writer.DynamoProxyWriter:
				node.operate = append(node.operate, value)
				node.index = append(node.index, index)
			default:
				LOG.Crashf("unknown operator")
			}
		case EventMODIFY:
			value, err := d.converter.Run(record.Dynamodb.NewImage)
			if err != nil {
				LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
			}

			switch d.targetWriter.(type) {
			case *writer.MongoCommunityWriter:
				node.operate = append(node.operate, value.(protocal.RawData).Data)
				node.index = append(node.index, index.(protocal.RawData).Data)
			case *writer.DynamoProxyWriter:
				node.operate = append(node.operate, value)
				node.index = append(node.index, index)
			default:
				LOG.Crashf("unknown operator")
			}
		case EventRemove:
			switch d.targetWriter.(type) {
			case *writer.MongoCommunityWriter:
				node.index = append(node.index, index.(protocal.RawData).Data)
			case *writer.DynamoProxyWriter:
				node.index = append(node.index, index)
			default:
				LOG.Crashf("unknown operator")
			}
		default:
			LOG.Crashf("%s unknown event name[%v]", d.String(), *record.EventName)
		}

		node.lastSequenceNumber = *record.Dynamodb.SequenceNumber
		if record.Dynamodb.ApproximateCreationDateTime != nil {
			node.approximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime.String()
		}
		batchNr += 1
		// batchSize += index.Size
	}

	LOG.Info("%s batcher exit", d.String())
	close(d.executorChan)
}