in nmxact/xact/image.go [329:401]
func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
res := newImageUploadResult()
ch := make(chan int)
rspc := make(chan nmp.NmpRsp, c.MaxWinSz)
errc := make(chan error, c.MaxWinSz)
t := ImageUploadIntTracker{
TuneWS: true,
WCount: 0,
WCap: IMAGE_UPLOAD_START_WS,
Off: c.StartOff,
RspMap: make(map[int]int),
MaxRxOff: 0,
}
for int(atomic.LoadInt32(&t.MaxRxOff)) < len(c.Data) {
// Block if window is full
if !t.CheckWindow() {
ch <- 1
}
t.ProcessMissedChunks()
if t.Off == len(c.Data) {
continue
}
t.Mutex.Lock()
r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
if err != nil {
t.Mutex.Unlock()
return nil, err
}
t.Off = (int(r.Off) + len(r.Data))
// Use up a chunk in window
t.WCount += 1
err = txReqAsync(s, r.Msg(), &c.CmdBase, rspc, errc)
if err != nil {
log.Debugf("err txReqAsync %v", err)
t.Mutex.Unlock()
break
}
// Mark the expected offset in successful tx of this chunk. i.e off + len
t.UpdateTracker(int(r.Off)+len(r.Data), IMAGE_UPLOAD_STATUS_EXPECTED)
t.Mutex.Unlock()
go func(off int) {
select {
case err := <-errc:
sig := t.HandleError(off, err)
if sig {
<-ch
}
return
case rsp := <-rspc:
sig := t.HandleResponse(c, rsp, res)
if sig {
<-ch
}
return
}
}(int(r.Off))
}
if int(t.MaxRxOff) == len(c.Data) {
return res, nil
} else {
return nil, fmt.Errorf("ImageUpload unexpected error after %d/%d bytes",
t.MaxRxOff, len(c.Data))
}
}