func()

in pulsar/table_view_impl.go [93:146]


func (tv *TableViewImpl) partitionUpdateCheck() error {
	partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
	if err != nil {
		return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err)
	}

	partitions := make(map[string]bool, len(partitionsArray))
	for _, partition := range partitionsArray {
		partitions[partition] = true
	}

	tv.readersMu.Lock()
	defer tv.readersMu.Unlock()

	for partition, cancelReader := range tv.cancelReaders {
		if _, ok := partitions[partition]; !ok {
			cancelReader.cancelFunc()
			cancelReader.reader.Close()
			delete(tv.cancelReaders, partition)
		}
	}

	for partition := range partitions {
		if _, ok := tv.cancelReaders[partition]; !ok {
			reader, err := newReader(tv.client, ReaderOptions{
				Topic:          partition,
				StartMessageID: EarliestMessageID(),
				ReadCompacted:  true,
				// TODO: Pooling?
				Schema: tv.options.Schema,
			})
			if err != nil {
				return fmt.Errorf("create new reader failed for %s: %w", partition, err)
			}
			for reader.HasNext() {
				msg, err := reader.Next(context.Background())
				if err != nil {
					tv.logger.Errorf("read next message failed for %s: %v", partition, err)
				}
				if msg != nil {
					tv.handleMessage(msg)
				}
			}
			ctx, cancelFunc := context.WithCancel(context.Background())
			tv.cancelReaders[partition] = cancelReader{
				reader:     reader,
				cancelFunc: cancelFunc,
			}
			go tv.watchReaderForNewMessages(ctx, reader)
		}
	}

	return nil
}