banyand/queue/pub/batch.go (222 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pub import ( "context" "fmt" "sync" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" ) type writeStream struct { client clusterv1.Service_SendClient ctxDoneCh <-chan struct{} } type batchPublisher struct { pub *pub streams map[string]writeStream topic *bus.Topic failedNodes map[string]*common.Error f batchFuture timeout time.Duration } // NewBatchPublisher returns a new batch publisher. func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher { return &batchPublisher{ pub: p, streams: make(map[string]writeStream), timeout: timeout, f: batchFuture{errNodes: make(map[string]struct{}), errors: make(map[string]batchEvent), l: p.log}, } } func (bp *batchPublisher) Publish(ctx context.Context, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { if bp.topic == nil { bp.topic = &topic } var err error for _, m := range messages { r, errM2R := messageToRequest(topic, m) if errM2R != nil { err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errM2R)) continue } node := m.Node() sendData := func() (success bool) { if stream, ok := bp.streams[node]; ok { defer func() { if !success { delete(bp.streams, node) } }() select { case <-ctx.Done(): return false case <-stream.client.Context().Done(): return false default: } errSend := stream.client.Send(r) if errSend != nil { err = multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend)) return false } return errSend == nil } return false } if sendData() { continue } select { case <-ctx.Done(): return nil, ctx.Err() default: } if bp.failedNodes != nil { if ce := bp.failedNodes[node]; ce != nil { err = multierr.Append(err, ce) } continue } var client *client // nolint: contextcheck if func() bool { bp.pub.mu.RLock() defer bp.pub.mu.RUnlock() var ok bool client, ok = bp.pub.active[node] if !ok { err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) return true } succeed, ce := bp.pub.checkWritable(node, topic) if succeed { return false } if bp.failedNodes == nil { bp.failedNodes = make(map[string]*common.Error) } bp.failedNodes[node] = ce err = multierr.Append(err, ce) return true }() { continue } streamCtx, cancel := context.WithTimeout(ctx, bp.timeout) // this assignment is for getting around the go vet lint deferFn := cancel stream, errCreateStream := client.client.Send(streamCtx) if errCreateStream != nil { err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) continue } bp.streams[node] = writeStream{ client: stream, ctxDoneCh: streamCtx.Done(), } bp.f.events = append(bp.f.events, make(chan batchEvent)) _ = sendData() go func(s clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent) { defer func() { close(bc) deferFn() }() select { case <-ctx.Done(): return default: } resp, errRecv := s.Recv() if errRecv != nil { if isFailoverError(errRecv) { bc <- batchEvent{n: node, e: common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, errRecv.Error())} } return } if resp == nil { return } if resp.Error == "" { return } if isFailoverStatus(resp.Status) { ce := common.NewErrorWithStatus(resp.Status, resp.Error) bc <- batchEvent{n: node, e: ce} } }(stream, deferFn, bp.f.events[len(bp.f.events)-1]) } return nil, err } func (bp *batchPublisher) Close() (cee map[string]*common.Error, err error) { for i := range bp.streams { err = multierr.Append(err, bp.streams[i].client.CloseSend()) } for i := range bp.streams { <-bp.streams[i].ctxDoneCh } batchEvents := bp.f.get() if len(batchEvents) < 1 { return nil, err } if bp.pub.closer.AddRunning() { go func() { defer bp.pub.closer.Done() for n, e := range batchEvents { if bp.topic == nil { bp.pub.failover(n, e.e, data.TopicCommon) continue } bp.pub.failover(n, e.e, *bp.topic) } }() } cee = make(map[string]*common.Error, len(batchEvents)) for n, be := range batchEvents { cee[n] = be.e } return cee, err } type batchEvent struct { e *common.Error n string } type batchFuture struct { errNodes map[string]struct{} errors map[string]batchEvent l *logger.Logger events []chan batchEvent } func (b *batchFuture) get() map[string]batchEvent { var wg sync.WaitGroup var mux sync.Mutex wg.Add(len(b.events)) for _, e := range b.events { go func(e chan batchEvent, mux *sync.Mutex) { defer wg.Done() for evt := range e { func() { mux.Lock() defer mux.Unlock() b.l.Error().Str("err_msg", evt.e.Error()).Str("code", modelv1.Status_name[int32(evt.e.Status())]).Msgf("node %s returns error", evt.n) b.errors[evt.n] = evt }() } }(e, &mux) } wg.Wait() mux.Lock() defer mux.Unlock() if len(b.errors) < 1 { return nil } result := make(map[string]batchEvent, len(b.errors)) for k, v := range b.errors { result[k] = v } return result } func isFailoverStatus(s modelv1.Status) bool { return s == modelv1.Status_STATUS_DISK_FULL }