in banyand/measure/topn.go [181:272]
func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord, buf []byte) error {
tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
if !ok {
return errors.New("invalid data type")
}
// down-sample the start of the timeWindow to a time-bucket
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
var err error
publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
defer publisher.Close()
topNValue := GenerateTopNValue()
defer ReleaseTopNValue(topNValue)
for group, tuples := range tuplesGroups {
if e := t.l.Debug(); e.Enabled() {
for i := range tuples {
tuple := tuples[i]
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
e.
Int("rankNums", i+1).
Str("entityValues", fmt.Sprintf("%v", data[0])).
Int("value", int(data[2].(int64))).
Time("eventTime", eventTime).
Msgf("Write tuples %s %s", t.topNSchema.GetMetadata().GetName(), group)
}
}
topNValue.Reset()
topNValue.setMetadata(t.topNSchema.GetFieldName(), t.m.schema.Entity.TagNames)
var shardID uint32
for _, tuple := range tuples {
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
topNValue.addValue(
tuple.V1.(int64),
data[0].([]*modelv1.TagValue),
)
shardID = data[3].(uint32)
}
entityValues := []*modelv1.TagValue{
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: t.topNSchema.GetMetadata().GetName(),
},
},
},
{
Value: &modelv1.TagValue_Int{
Int: &modelv1.Int{
Value: int64(t.sortDirection),
},
},
},
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: group,
},
},
},
}
buf = buf[:0]
if buf, err = topNValue.marshal(buf); err != nil {
return err
}
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
MessageId: uint64(time.Now().UnixNano()),
Metadata: &commonv1.Metadata{Name: TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
{Tags: entityValues},
},
Fields: []*modelv1.FieldValue{
{
Value: &modelv1.FieldValue_BinaryData{
BinaryData: bytes.Clone(buf),
},
},
},
},
},
EntityValues: entityValues,
ShardId: shardID,
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
_, err = publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message)
if err != nil {
return err
}
}
return err
}