in runtime/core/protocol/grpc/producer/producer_service.go [49:73]
func (p *ProducerService) Publish(ctx context.Context, msg *pb.SimpleMessage) (*pb.Response, error) {
ctx = context.WithValue(ctx, "UNIQID", msg.UniqueId)
log.Infof("cmd:%v, protocol:grpc, from:%v", "AsyncPublish", msg.Header.Ip)
tmCtx, cancel := context.WithTimeout(ctx, defaultAsyncTimeout)
defer cancel()
var (
resp *pb.Response
errChan = make(chan error)
err error
)
p.sendPool.Submit(func() {
resp, err = p.process.AsyncMessage(ctx, p.producerMgr, msg)
errChan <- err
})
select {
case <-tmCtx.Done():
log.Warnf("timeout in subscribe")
case <-errChan:
break
}
if err != nil {
log.Warnf("failed to handle publish, err:%v", err)
}
return resp, err
}