nmxact/omp/dispatch.go (144 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package omp import ( "fmt" "sync" "github.com/runtimeco/go-coap" "mynewt.apache.org/newtmgr/nmxact/nmcoap" "mynewt.apache.org/newtmgr/nmxact/nmp" "mynewt.apache.org/newtmgr/nmxact/nmxutil" ) type Listener struct { nmpl *nmp.Listener coapl *nmcoap.Listener stopCh chan struct{} } // The dispatcher is the owner of the listeners it points to. Only the // dispatcher writes to these listeners. type Dispatcher struct { seqListenerMap map[uint8]*Listener coapd *nmcoap.Dispatcher wg sync.WaitGroup rxFilter nmcoap.RxMsgFilter stopped bool logDepth int mtx sync.Mutex } func NewDispatcher(rxFilter nmcoap.RxMsgFilter, isTcp bool, logDepth int) (*Dispatcher, error) { d := &Dispatcher{ seqListenerMap: map[uint8]*Listener{}, coapd: nmcoap.NewDispatcher(isTcp, logDepth+1), rxFilter: rxFilter, logDepth: logDepth + 2, } return d, nil } func (d *Dispatcher) addOmpListener(seq uint8) (*Listener, error) { d.mtx.Lock() defer d.mtx.Unlock() if d.seqListenerMap[seq] != nil { return nil, fmt.Errorf("duplicate OMP listener; seq=%d", seq) } mc := nmcoap.MsgCriteria{ Token: nmxutil.SeqToToken(seq), Path: "", } ol, err := d.AddCoapListener(mc) if err != nil { return nil, err } ompl := &Listener{ nmpl: nmp.NewListener(), coapl: ol, stopCh: make(chan struct{}), } d.seqListenerMap[seq] = ompl d.wg.Add(1) go func() { defer d.RemoveCoapListener(mc) defer d.wg.Done() // Listen for events. All feedback is sent to the client via the NMP // listener channels. It is done this way so that client code can be // OMP/NMP agnostic. for { select { case m := <-ompl.coapl.RspChan: rsp, err := DecodeOmp(m, d.rxFilter) if err != nil { ompl.nmpl.ErrChan <- err } else if rsp != nil { ompl.nmpl.RspChan <- rsp } else { /* no error, no response */ } case err := <-ompl.coapl.ErrChan: if err != nil { ompl.nmpl.ErrChan <- err } case <-ompl.stopCh: return } } }() return ompl, nil } func (d *Dispatcher) Stop() { d.mtx.Lock() defer d.mtx.Unlock() if d.stopped { return } d.stopped = true for seq, ompl := range d.seqListenerMap { delete(d.seqListenerMap, seq) close(ompl.stopCh) } d.wg.Wait() } func (d *Dispatcher) Dispatch(data []byte) bool { return d.coapd.Dispatch(data) } func (d *Dispatcher) ProcessCoapReq(data []byte) (coap.Message, error) { return d.coapd.ProcessCoapReq(data) } func (d *Dispatcher) AddCoapListener( mc nmcoap.MsgCriteria) (*nmcoap.Listener, error) { return d.coapd.AddListener(mc) } func (d *Dispatcher) RemoveCoapListener( mc nmcoap.MsgCriteria) *nmcoap.Listener { return d.coapd.RemoveListener(mc) } func (d *Dispatcher) AddNmpListener(seq uint8) (*nmp.Listener, error) { ompl, err := d.addOmpListener(seq) if err != nil { return nil, err } nmxutil.LogAddNmpListener(d.logDepth, seq) return ompl.nmpl, nil } func (d *Dispatcher) RemoveNmpListener(seq uint8) *nmp.Listener { d.mtx.Lock() defer d.mtx.Unlock() ompl := d.seqListenerMap[seq] if ompl == nil { return nil } delete(d.seqListenerMap, seq) close(ompl.stopCh) nmxutil.LogRemoveNmpListener(d.logDepth, seq) return ompl.nmpl } func (d *Dispatcher) ErrorOneNmp(seq uint8, err error) error { d.mtx.Lock() defer d.mtx.Unlock() ompl := d.seqListenerMap[seq] if ompl == nil { return fmt.Errorf("no nmp listener for seq %d", seq) } ompl.nmpl.ErrChan <- err return nil } func (d *Dispatcher) ErrorAll(err error) { d.coapd.ErrorAll(err) } func (d *Dispatcher) SetRxFilter(rxFilter nmcoap.RxMsgFilter) { d.rxFilter = rxFilter } func (d *Dispatcher) RxFilter() nmcoap.RxMsgFilter { return d.rxFilter }