in banyand/liaison/grpc/discovery.go [63:136]
func (ds *discoveryService) initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx)
cancel()
if err != nil {
return err
}
for _, g := range groups {
switch ds.kind {
case schema.KindMeasure:
case schema.KindStream:
default:
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: g.Metadata.Name})
cancel()
if innerErr != nil {
return innerErr
}
for _, s := range shards {
ds.shardRepo.OnAddOrUpdate(schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindShard,
Name: s.Metadata.Name,
Group: s.Metadata.Group,
},
Spec: s,
})
}
switch ds.kind {
case schema.KindMeasure:
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name})
cancel()
if innerErr != nil {
return innerErr
}
for _, m := range mm {
ds.entityRepo.OnAddOrUpdate(schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindMeasure,
Name: m.Metadata.Name,
Group: m.Metadata.Group,
},
Spec: m,
})
}
case schema.KindStream:
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name})
cancel()
if innerErr != nil {
return innerErr
}
for _, s := range ss {
ds.entityRepo.OnAddOrUpdate(schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindStream,
Name: s.Metadata.Name,
Group: s.Metadata.Group,
},
Spec: s,
})
}
default:
return fmt.Errorf("unsupported kind: %d", ds.kind)
}
}
ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
return nil
}