func()

in nmxact/xact/image.go [329:401]


func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
	res := newImageUploadResult()
	ch := make(chan int)
	rspc := make(chan nmp.NmpRsp, c.MaxWinSz)
	errc := make(chan error, c.MaxWinSz)

	t := ImageUploadIntTracker{
		TuneWS:   true,
		WCount:   0,
		WCap:     IMAGE_UPLOAD_START_WS,
		Off:      c.StartOff,
		RspMap:   make(map[int]int),
		MaxRxOff: 0,
	}

	for int(atomic.LoadInt32(&t.MaxRxOff)) < len(c.Data) {
		// Block if window is full
		if !t.CheckWindow() {
			ch <- 1
		}

		t.ProcessMissedChunks()

		if t.Off == len(c.Data) {
			continue
		}

		t.Mutex.Lock()
		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
		if err != nil {
			t.Mutex.Unlock()
			return nil, err
		}

		t.Off = (int(r.Off) + len(r.Data))

		// Use up a chunk in window
		t.WCount += 1
		err = txReqAsync(s, r.Msg(), &c.CmdBase, rspc, errc)
		if err != nil {
			log.Debugf("err txReqAsync %v", err)
			t.Mutex.Unlock()
			break
		}
		// Mark the expected offset in successful tx of this chunk. i.e off + len
		t.UpdateTracker(int(r.Off)+len(r.Data), IMAGE_UPLOAD_STATUS_EXPECTED)
		t.Mutex.Unlock()

		go func(off int) {
			select {
			case err := <-errc:
				sig := t.HandleError(off, err)
				if sig {
					<-ch
				}
				return
			case rsp := <-rspc:
				sig := t.HandleResponse(c, rsp, res)
				if sig {
					<-ch
				}
				return
			}
		}(int(r.Off))
	}

	if int(t.MaxRxOff) == len(c.Data) {
		return res, nil
	} else {
		return nil, fmt.Errorf("ImageUpload unexpected error after %d/%d bytes",
			t.MaxRxOff, len(c.Data))
	}
}