in banyand/queue/pub/batch.go [61:180]
func (bp *batchPublisher) Publish(ctx context.Context, topic bus.Topic, messages ...bus.Message) (bus.Future, error) {
if bp.topic == nil {
bp.topic = &topic
}
var err error
for _, m := range messages {
r, errM2R := messageToRequest(topic, m)
if errM2R != nil {
err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errM2R))
continue
}
node := m.Node()
sendData := func() (success bool) {
if stream, ok := bp.streams[node]; ok {
defer func() {
if !success {
delete(bp.streams, node)
}
}()
select {
case <-ctx.Done():
return false
case <-stream.client.Context().Done():
return false
default:
}
errSend := stream.client.Send(r)
if errSend != nil {
err = multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend))
return false
}
return errSend == nil
}
return false
}
if sendData() {
continue
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if bp.failedNodes != nil {
if ce := bp.failedNodes[node]; ce != nil {
err = multierr.Append(err, ce)
}
continue
}
var client *client
// nolint: contextcheck
if func() bool {
bp.pub.mu.RLock()
defer bp.pub.mu.RUnlock()
var ok bool
client, ok = bp.pub.active[node]
if !ok {
err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node))
return true
}
succeed, ce := bp.pub.checkWritable(node, topic)
if succeed {
return false
}
if bp.failedNodes == nil {
bp.failedNodes = make(map[string]*common.Error)
}
bp.failedNodes[node] = ce
err = multierr.Append(err, ce)
return true
}() {
continue
}
streamCtx, cancel := context.WithTimeout(ctx, bp.timeout)
// this assignment is for getting around the go vet lint
deferFn := cancel
stream, errCreateStream := client.client.Send(streamCtx)
if errCreateStream != nil {
err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream))
continue
}
bp.streams[node] = writeStream{
client: stream,
ctxDoneCh: streamCtx.Done(),
}
bp.f.events = append(bp.f.events, make(chan batchEvent))
_ = sendData()
go func(s clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent) {
defer func() {
close(bc)
deferFn()
}()
select {
case <-ctx.Done():
return
default:
}
resp, errRecv := s.Recv()
if errRecv != nil {
if isFailoverError(errRecv) {
bc <- batchEvent{n: node, e: common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, errRecv.Error())}
}
return
}
if resp == nil {
return
}
if resp.Error == "" {
return
}
if isFailoverStatus(resp.Status) {
ce := common.NewErrorWithStatus(resp.Status, resp.Error)
bc <- batchEvent{n: node, e: ce}
}
}(stream, deferFn, bp.f.events[len(bp.f.events)-1])
}
return nil, err
}