in pkg/schema/metadata.go [373:431]
func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
g.mapMutex.Lock()
defer g.mapMutex.Unlock()
key := resourceSchema.GetMetadata().GetName()
preResource := g.schemaMap[key]
if preResource != nil &&
resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() {
// we only need to check the max modifications revision observed for index rules
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
cancel()
if errIndexRules != nil {
return nil, errIndexRules
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
cancel()
if errTopN != nil {
return nil, errTopN
}
if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
if preResource.MaxObservedModRevision() >= maxModRevision {
return preResource, nil
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
idxRules, err := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
if err != nil {
return nil, err
}
var topNAggrs []*databasev1.TopNAggregation
if _, ok := resourceSchema.(*databasev1.Measure); ok {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
var innerErr error
topNAggrs, innerErr = g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
cancel()
if innerErr != nil {
return nil, innerErr
}
}
sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, ResourceSpec{
Schema: resourceSchema,
IndexRules: idxRules,
Aggregations: topNAggrs,
})
if errTS != nil {
return nil, errTS
}
g.schemaMap[key] = sm
if preResource != nil {
_ = preResource.Close()
}
return sm, nil
}