in nmxact/mgmt/transceiver.go [211:265]
func (t *Transceiver) txRxOmpAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
seq := req.Hdr.Seq
nl, err := t.od.AddNmpListener(seq)
if err != nil {
return err
}
var b []byte
if t.isTcp {
b, err = omp.EncodeOmpTcp(t.txFilter, req)
} else {
b, err = omp.EncodeOmpDgram(t.txFilter, req)
}
if err != nil {
return err
}
log.Debugf("Tx OMP request: %v %s", seq, hex.Dump(b))
if t.isTcp == false && len(b) > mtu {
return fmt.Errorf("Request too big")
}
frags := nmxutil.Fragment(b, mtu)
for _, frag := range frags {
if err := txCb(frag); err != nil {
log.Debugf("txCb error %v", err)
t.od.RemoveNmpListener(seq)
return err
}
}
// Now wait for NMP response.
go func() {
defer t.od.RemoveNmpListener(seq)
for {
select {
case err := <-nl.ErrChan:
log.Debugf("Error reported %v seq %v", err, seq)
errc <- err
return
case rsp := <-nl.RspChan:
ch <- rsp
return
case _, ok := <-nl.AfterTimeout(timeout):
if ok {
errc <- fmt.Errorf("Request timedout")
return
}
}
}
}()
return nil
}