in redolog/kafka_redolog_manager.go [203:277]
func (k *kafkaRedoLogManager) Iterator() (NextUpsertFunc, error) {
if k.partitionConsumer != nil {
// close previous created partition consumer
k.partitionConsumer.Close()
}
offsetFrom, offsetTo := k.getKafkaOffsets()
var err error
k.partitionConsumer, err = k.consumer.ConsumePartition(k.Topic, int32(k.Shard), offsetFrom)
if err != nil {
utils.GetLogger().Panic("Failed to consumer kafka partition", err)
}
if k.includeRecovery {
utils.GetLogger().With("action", "recover", "table", k.TableName, "shard", k.Shard, "offsetFrom", offsetFrom, "offsetTo", offsetTo).
Info("start recover from kafka")
} else {
utils.GetLogger().With("action", "ingestion", "table", k.TableName, "shard", k.Shard, "offsetFrom", offsetFrom).
Info("start play redolog from kafka")
}
return func() *NextUpsertBatchInfo {
if k.partitionConsumer == nil {
// partition consumer closed
return nil
}
if !k.recoveryDone && (offsetTo == 0 || offsetTo <= offsetFrom) {
k.setRecoveryDone()
}
for {
select {
case msg, ok := <-k.partitionConsumer.Messages():
if !ok {
// consumer closed
utils.GetLogger().With(
"table", k.TableName,
"shard", k.Shard).Error("partition consumer channel closed")
return nil
}
if msg != nil {
upsertBatch, err := common.NewUpsertBatch(msg.Value)
if err != nil {
utils.GetLogger().With(
"table", k.TableName,
"shard", k.Shard, "error", err.Error()).Error("failed to create upsert batch from msg")
}
if !k.recoveryDone && msg.Offset > offsetTo {
k.setRecoveryDone()
}
fileID, fileOffset := k.getFileOffset(msg.Offset)
k.addMessage(fileID, msg.Offset, len(upsertBatch.GetBuffer()))
return &NextUpsertBatchInfo{
Batch: upsertBatch,
RedoLogFile: fileID,
BatchOffset: fileOffset,
Recovery: !k.recoveryDone,
}
}
case err, ok := <-k.partitionConsumer.Errors():
if !ok {
// consumer closed
utils.GetLogger().With(
"table", k.TableName,
"shard", k.Shard).Error("partition consumer error channel closed")
return nil
} else {
utils.GetLogger().With("table", k.TableName, "shard", k.Shard, "error", err.Error()).
Error("received consumer error")
}
case <-k.done:
return nil
}
}
}, nil
}