in banyand/measure/measure_topn.go [163:222]
func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64,
group string, data flow.Data, rankNum int,
) error {
var tagValues []*modelv1.TagValue
if len(t.topNSchema.GetGroupByTagNames()) > 0 {
var ok bool
if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
return errors.New("fail to extract tag values from topN result")
}
}
entity, entityValues, shardID, err := t.locate(tagValues, rankNum)
if err != nil {
return err
}
// measureID is consist of three parts,
// 1. groupValues
// 2. rankNumber
// 3. timeBucket
measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: t.topNSchema.GetMetadata(),
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: append([]*modelv1.TagValue{
// MeasureID
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: measureID,
},
},
},
}, data[0].(tsdb.EntityValues)...),
},
},
Fields: []*modelv1.FieldValue{
{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{
Value: fieldValue,
},
},
},
},
},
},
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
}
if t.l.Debug().Enabled() {
iwr.EntityValues = entityValues.Encode()
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
_, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
return errWritePub
}