in nmxact/omp/dispatch.go [64:121]
func (d *Dispatcher) addOmpListener(seq uint8) (*Listener, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.seqListenerMap[seq] != nil {
return nil, fmt.Errorf("duplicate OMP listener; seq=%d", seq)
}
mc := nmcoap.MsgCriteria{
Token: nmxutil.SeqToToken(seq),
Path: "",
}
ol, err := d.AddCoapListener(mc)
if err != nil {
return nil, err
}
ompl := &Listener{
nmpl: nmp.NewListener(),
coapl: ol,
stopCh: make(chan struct{}),
}
d.seqListenerMap[seq] = ompl
d.wg.Add(1)
go func() {
defer d.RemoveCoapListener(mc)
defer d.wg.Done()
// Listen for events. All feedback is sent to the client via the NMP
// listener channels. It is done this way so that client code can be
// OMP/NMP agnostic.
for {
select {
case m := <-ompl.coapl.RspChan:
rsp, err := DecodeOmp(m, d.rxFilter)
if err != nil {
ompl.nmpl.ErrChan <- err
} else if rsp != nil {
ompl.nmpl.RspChan <- rsp
} else {
/* no error, no response */
}
case err := <-ompl.coapl.ErrChan:
if err != nil {
ompl.nmpl.ErrChan <- err
}
case <-ompl.stopCh:
return
}
}
}()
return ompl, nil
}