in router/pkg/pubsub/nats/nats.go [93:207]
func (p *natsPubSub) Subscribe(ctx context.Context, event pubsub_datasource.NatsSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error {
log := p.logger.With(
zap.String("provider_id", event.ProviderID),
zap.String("method", "subscribe"),
zap.Strings("subjects", event.Subjects),
)
if event.StreamConfiguration != nil {
durableConsumerName, err := p.getDurableConsumerName(event.StreamConfiguration.Consumer, event.Subjects)
if err != nil {
return err
}
consumerConfig := jetstream.ConsumerConfig{
Durable: durableConsumerName,
FilterSubjects: event.Subjects,
}
// Durable consumers are removed automatically only if the InactiveThreshold value is set
if event.StreamConfiguration.ConsumerInactiveThreshold > 0 {
consumerConfig.InactiveThreshold = time.Duration(event.StreamConfiguration.ConsumerInactiveThreshold) * time.Second
}
consumer, err := p.js.CreateOrUpdateConsumer(ctx, event.StreamConfiguration.StreamName, consumerConfig)
if err != nil {
log.Error("error creating or updating consumer", zap.Error(err))
return pubsub.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, event.StreamConfiguration.StreamName), err)
}
p.closeWg.Add(1)
go func() {
defer p.closeWg.Done()
for {
select {
case <-p.ctx.Done():
// When the application context is done, we stop the subscription
return
case <-ctx.Done():
// When the subscription context is done, we stop the subscription
return
default:
msgBatch, consumerFetchErr := consumer.FetchNoWait(300)
if consumerFetchErr != nil {
log.Error("error fetching messages", zap.Error(consumerFetchErr))
return
}
for msg := range msgBatch.Messages() {
log.Debug("subscription update", zap.String("message_subject", msg.Subject()), zap.ByteString("data", msg.Data()))
updater.Update(msg.Data())
// Acknowledge the message after it has been processed
ackErr := msg.Ack()
if ackErr != nil {
log.Error("error acknowledging message", zap.String("message_subject", msg.Subject()), zap.Error(ackErr))
return
}
}
}
}
}()
return nil
}
msgChan := make(chan *nats.Msg)
subscriptions := make([]*nats.Subscription, len(event.Subjects))
for i, subject := range event.Subjects {
subscription, err := p.conn.ChanSubscribe(subject, msgChan)
if err != nil {
log.Error("error subscribing to NATS subject", zap.Error(err), zap.String("subscription_subject", subject))
return pubsub.NewError(fmt.Sprintf(`failed to subscribe to NATS subject "%s"`, subject), err)
}
subscriptions[i] = subscription
}
p.closeWg.Add(1)
go func() {
defer p.closeWg.Done()
for {
select {
case msg := <-msgChan:
log.Debug("subscription update", zap.String("message_subject", msg.Subject), zap.ByteString("data", msg.Data))
updater.Update(msg.Data)
case <-p.ctx.Done():
// When the application context is done, we stop the subscriptions
for _, subscription := range subscriptions {
if err := subscription.Unsubscribe(); err != nil {
log.Error("error unsubscribing from NATS subject after application context cancellation",
zap.Error(err), zap.String("subject", subscription.Subject),
)
}
}
return
case <-ctx.Done():
// When the subscription context is done, we stop the subscription
for _, subscription := range subscriptions {
if err := subscription.Unsubscribe(); err != nil {
log.Error("error unsubscribing from NATS subject after subscription context cancellation",
zap.Error(err), zap.String("subscription_subject", subscription.Subject),
)
}
}
return
}
}
}()
return nil
}