func()

in banyand/liaison/grpc/discovery.go [63:136]


func (ds *discoveryService) initialize() error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx)
	cancel()
	if err != nil {
		return err
	}
	for _, g := range groups {
		switch ds.kind {
		case schema.KindMeasure:
		case schema.KindStream:
		default:
			continue
		}
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: g.Metadata.Name})
		cancel()
		if innerErr != nil {
			return innerErr
		}
		for _, s := range shards {
			ds.shardRepo.OnAddOrUpdate(schema.Metadata{
				TypeMeta: schema.TypeMeta{
					Kind:  schema.KindShard,
					Name:  s.Metadata.Name,
					Group: s.Metadata.Group,
				},
				Spec: s,
			})
		}

		switch ds.kind {
		case schema.KindMeasure:
			ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
			mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name})
			cancel()
			if innerErr != nil {
				return innerErr
			}
			for _, m := range mm {
				ds.entityRepo.OnAddOrUpdate(schema.Metadata{
					TypeMeta: schema.TypeMeta{
						Kind:  schema.KindMeasure,
						Name:  m.Metadata.Name,
						Group: m.Metadata.Group,
					},
					Spec: m,
				})
			}
		case schema.KindStream:
			ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
			ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name})
			cancel()
			if innerErr != nil {
				return innerErr
			}
			for _, s := range ss {
				ds.entityRepo.OnAddOrUpdate(schema.Metadata{
					TypeMeta: schema.TypeMeta{
						Kind:  schema.KindStream,
						Name:  s.Metadata.Name,
						Group: s.Metadata.Group,
					},
					Spec: s,
				})
			}
		default:
			return fmt.Errorf("unsupported kind: %d", ds.kind)
		}
	}
	ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
	ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
	return nil
}