func()

in redolog/kafka_redolog_manager.go [203:277]


func (k *kafkaRedoLogManager) Iterator() (NextUpsertFunc, error) {
	if k.partitionConsumer != nil {
		// close previous created partition consumer
		k.partitionConsumer.Close()
	}
	offsetFrom, offsetTo := k.getKafkaOffsets()
	var err error
	k.partitionConsumer, err = k.consumer.ConsumePartition(k.Topic, int32(k.Shard), offsetFrom)
	if err != nil {
		utils.GetLogger().Panic("Failed to consumer kafka partition", err)
	}

	if k.includeRecovery {
		utils.GetLogger().With("action", "recover", "table", k.TableName, "shard", k.Shard, "offsetFrom", offsetFrom, "offsetTo", offsetTo).
			Info("start recover from kafka")
	} else {
		utils.GetLogger().With("action", "ingestion", "table", k.TableName, "shard", k.Shard, "offsetFrom", offsetFrom).
			Info("start play redolog from kafka")
	}

	return func() *NextUpsertBatchInfo {
		if k.partitionConsumer == nil {
			// partition consumer closed
			return nil
		}
		if !k.recoveryDone && (offsetTo == 0 || offsetTo <= offsetFrom) {
			k.setRecoveryDone()
		}
		for {
			select {
			case msg, ok := <-k.partitionConsumer.Messages():
				if !ok {
					// consumer closed
					utils.GetLogger().With(
						"table", k.TableName,
						"shard", k.Shard).Error("partition consumer channel closed")
					return nil
				}
				if msg != nil {
					upsertBatch, err := common.NewUpsertBatch(msg.Value)
					if err != nil {
						utils.GetLogger().With(
							"table", k.TableName,
							"shard", k.Shard, "error", err.Error()).Error("failed to create upsert batch from msg")
					}
					if !k.recoveryDone && msg.Offset > offsetTo {
						k.setRecoveryDone()
					}

					fileID, fileOffset := k.getFileOffset(msg.Offset)
					k.addMessage(fileID, msg.Offset, len(upsertBatch.GetBuffer()))
					return &NextUpsertBatchInfo{
						Batch:       upsertBatch,
						RedoLogFile: fileID,
						BatchOffset: fileOffset,
						Recovery:    !k.recoveryDone,
					}
				}
			case err, ok := <-k.partitionConsumer.Errors():
				if !ok {
					// consumer closed
					utils.GetLogger().With(
						"table", k.TableName,
						"shard", k.Shard).Error("partition consumer error channel closed")
					return nil
				} else {
					utils.GetLogger().With("table", k.TableName, "shard", k.Shard, "error", err.Error()).
						Error("received consumer error")
				}
			case <-k.done:
				return nil
			}
		}
	}, nil
}