in pkg/discovery/content/provider/provider.go [28:81]
func Provide(ctx context.Context, r routing.Router, containerdStore containerd.Store, filesChan <-chan string) {
l := zerolog.Ctx(ctx).With().Str("component", "state").Logger()
l.Debug().Msg("advertising start")
s := time.Now()
defer func() {
l.Debug().Dur("duration", time.Since(s)).Msg("advertising stop")
}()
eventCh, errCh := containerdStore.Subscribe(ctx)
immediate := make(chan time.Time, 1)
immediate <- time.Now()
expirationTicker := time.NewTicker(routing.MaxRecordAge - time.Minute)
defer expirationTicker.Stop()
ticker := merge(immediate, expirationTicker.C)
for {
select {
case <-ctx.Done():
return
case <-ticker:
l.Info().Msg("scheduled advertisement")
err := provideAll(ctx, l, containerdStore, r)
if err != nil {
l.Error().Err(err).Msg("schedule: error advertising")
continue
}
case ref := <-eventCh:
l.Debug().Str("image", ref.Name()).Str("digest", ref.Digest().String()).Msg("advertising image")
_, err := provideRef(ctx, l, containerdStore, r, ref)
if err != nil {
l.Error().Err(err).Msg("image: advertising error")
continue
}
case blob := <-filesChan:
l.Debug().Str("blob", blob).Msg("advertising file")
err := r.Provide(ctx, []string{blob})
if err != nil {
l.Error().Err(err).Str("blob", blob).Msg("file: advertising error")
continue
}
case err := <-errCh:
l.Error().Err(err).Msg("channel error")
continue
}
}
}