func()

in banyand/measure/topn.go [181:272]


func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord, buf []byte) error {
	tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
	if !ok {
		return errors.New("invalid data type")
	}
	// down-sample the start of the timeWindow to a time-bucket
	eventTime := t.downSampleTimeBucket(record.TimestampMillis())
	var err error
	publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
	defer publisher.Close()
	topNValue := GenerateTopNValue()
	defer ReleaseTopNValue(topNValue)
	for group, tuples := range tuplesGroups {
		if e := t.l.Debug(); e.Enabled() {
			for i := range tuples {
				tuple := tuples[i]
				data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
				e.
					Int("rankNums", i+1).
					Str("entityValues", fmt.Sprintf("%v", data[0])).
					Int("value", int(data[2].(int64))).
					Time("eventTime", eventTime).
					Msgf("Write tuples %s %s", t.topNSchema.GetMetadata().GetName(), group)
			}
		}
		topNValue.Reset()
		topNValue.setMetadata(t.topNSchema.GetFieldName(), t.m.schema.Entity.TagNames)
		var shardID uint32
		for _, tuple := range tuples {
			data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
			topNValue.addValue(
				tuple.V1.(int64),
				data[0].([]*modelv1.TagValue),
			)
			shardID = data[3].(uint32)
		}
		entityValues := []*modelv1.TagValue{
			{
				Value: &modelv1.TagValue_Str{
					Str: &modelv1.Str{
						Value: t.topNSchema.GetMetadata().GetName(),
					},
				},
			},
			{
				Value: &modelv1.TagValue_Int{
					Int: &modelv1.Int{
						Value: int64(t.sortDirection),
					},
				},
			},
			{
				Value: &modelv1.TagValue_Str{
					Str: &modelv1.Str{
						Value: group,
					},
				},
			},
		}
		buf = buf[:0]
		if buf, err = topNValue.marshal(buf); err != nil {
			return err
		}
		iwr := &measurev1.InternalWriteRequest{
			Request: &measurev1.WriteRequest{
				MessageId: uint64(time.Now().UnixNano()),
				Metadata:  &commonv1.Metadata{Name: TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
				DataPoint: &measurev1.DataPointValue{
					Timestamp: timestamppb.New(eventTime),
					TagFamilies: []*modelv1.TagFamilyForWrite{
						{Tags: entityValues},
					},
					Fields: []*modelv1.FieldValue{
						{
							Value: &modelv1.FieldValue_BinaryData{
								BinaryData: bytes.Clone(buf),
							},
						},
					},
				},
			},
			EntityValues: entityValues,
			ShardId:      shardID,
		}
		message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
		_, err = publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message)
		if err != nil {
			return err
		}
	}
	return err
}