func()

in banyand/measure/measure_topn.go [163:222]


func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64,
	group string, data flow.Data, rankNum int,
) error {
	var tagValues []*modelv1.TagValue
	if len(t.topNSchema.GetGroupByTagNames()) > 0 {
		var ok bool
		if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
			return errors.New("fail to extract tag values from topN result")
		}
	}
	entity, entityValues, shardID, err := t.locate(tagValues, rankNum)
	if err != nil {
		return err
	}

	// measureID is consist of three parts,
	// 1. groupValues
	// 2. rankNumber
	// 3. timeBucket
	measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
	iwr := &measurev1.InternalWriteRequest{
		Request: &measurev1.WriteRequest{
			Metadata: t.topNSchema.GetMetadata(),
			DataPoint: &measurev1.DataPointValue{
				Timestamp: timestamppb.New(eventTime),
				TagFamilies: []*modelv1.TagFamilyForWrite{
					{
						Tags: append([]*modelv1.TagValue{
							// MeasureID
							{
								Value: &modelv1.TagValue_Str{
									Str: &modelv1.Str{
										Value: measureID,
									},
								},
							},
						}, data[0].(tsdb.EntityValues)...),
					},
				},
				Fields: []*modelv1.FieldValue{
					{
						Value: &modelv1.FieldValue_Int{
							Int: &modelv1.Int{
								Value: fieldValue,
							},
						},
					},
				},
			},
		},
		ShardId:    uint32(shardID),
		SeriesHash: tsdb.HashEntity(entity),
	}
	if t.l.Debug().Enabled() {
		iwr.EntityValues = entityValues.Encode()
	}
	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
	_, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
	return errWritePub
}