func()

in banyand/queue/pub/batch.go [61:180]


func (bp *batchPublisher) Publish(ctx context.Context, topic bus.Topic, messages ...bus.Message) (bus.Future, error) {
	if bp.topic == nil {
		bp.topic = &topic
	}
	var err error
	for _, m := range messages {
		r, errM2R := messageToRequest(topic, m)
		if errM2R != nil {
			err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errM2R))
			continue
		}
		node := m.Node()
		sendData := func() (success bool) {
			if stream, ok := bp.streams[node]; ok {
				defer func() {
					if !success {
						delete(bp.streams, node)
					}
				}()
				select {
				case <-ctx.Done():
					return false
				case <-stream.client.Context().Done():
					return false
				default:
				}
				errSend := stream.client.Send(r)
				if errSend != nil {
					err = multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend))
					return false
				}
				return errSend == nil
			}
			return false
		}
		if sendData() {
			continue
		}

		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}
		if bp.failedNodes != nil {
			if ce := bp.failedNodes[node]; ce != nil {
				err = multierr.Append(err, ce)
			}
			continue
		}
		var client *client
		// nolint: contextcheck
		if func() bool {
			bp.pub.mu.RLock()
			defer bp.pub.mu.RUnlock()
			var ok bool
			client, ok = bp.pub.active[node]
			if !ok {
				err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node))
				return true
			}
			succeed, ce := bp.pub.checkWritable(node, topic)
			if succeed {
				return false
			}
			if bp.failedNodes == nil {
				bp.failedNodes = make(map[string]*common.Error)
			}
			bp.failedNodes[node] = ce
			err = multierr.Append(err, ce)
			return true
		}() {
			continue
		}

		streamCtx, cancel := context.WithTimeout(ctx, bp.timeout)
		// this assignment is for getting around the go vet lint
		deferFn := cancel
		stream, errCreateStream := client.client.Send(streamCtx)
		if errCreateStream != nil {
			err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream))
			continue
		}
		bp.streams[node] = writeStream{
			client:    stream,
			ctxDoneCh: streamCtx.Done(),
		}
		bp.f.events = append(bp.f.events, make(chan batchEvent))
		_ = sendData()
		go func(s clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent) {
			defer func() {
				close(bc)
				deferFn()
			}()
			select {
			case <-ctx.Done():
				return
			default:
			}
			resp, errRecv := s.Recv()
			if errRecv != nil {
				if isFailoverError(errRecv) {
					bc <- batchEvent{n: node, e: common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, errRecv.Error())}
				}
				return
			}
			if resp == nil {
				return
			}
			if resp.Error == "" {
				return
			}
			if isFailoverStatus(resp.Status) {
				ce := common.NewErrorWithStatus(resp.Status, resp.Error)
				bc <- batchEvent{n: node, e: ce}
			}
		}(stream, deferFn, bp.f.events[len(bp.f.events)-1])
	}
	return nil, err
}