in pulsar/table_view_impl.go [93:146]
func (tv *TableViewImpl) partitionUpdateCheck() error {
partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
if err != nil {
return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err)
}
partitions := make(map[string]bool, len(partitionsArray))
for _, partition := range partitionsArray {
partitions[partition] = true
}
tv.readersMu.Lock()
defer tv.readersMu.Unlock()
for partition, cancelReader := range tv.cancelReaders {
if _, ok := partitions[partition]; !ok {
cancelReader.cancelFunc()
cancelReader.reader.Close()
delete(tv.cancelReaders, partition)
}
}
for partition := range partitions {
if _, ok := tv.cancelReaders[partition]; !ok {
reader, err := newReader(tv.client, ReaderOptions{
Topic: partition,
StartMessageID: EarliestMessageID(),
ReadCompacted: true,
// TODO: Pooling?
Schema: tv.options.Schema,
})
if err != nil {
return fmt.Errorf("create new reader failed for %s: %w", partition, err)
}
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
tv.logger.Errorf("read next message failed for %s: %v", partition, err)
}
if msg != nil {
tv.handleMessage(msg)
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
tv.cancelReaders[partition] = cancelReader{
reader: reader,
cancelFunc: cancelFunc,
}
go tv.watchReaderForNewMessages(ctx, reader)
}
}
return nil
}