func()

in component/xload/splitter.go [124:293]


func (ds *downloadSplitter) Process(item *WorkItem) (int, error) {
	log.Debug("downloadSplitter::Process : Splitting data for %s, size %v, mode %v, priority %v, access time %v, modified time %v", item.Path, item.DataLen,
		item.Mode, item.Priority, item.Atime.Format(time.DateTime), item.Mtime.Format(time.DateTime))

	var err error
	localPath := filepath.Join(ds.path, item.Path)

	// if priority is false, it means that it has been scheduled by the lister and not by the OpenFile call.
	// So, get a lock. If the locking goes into wait state, it means the file is already under download by the OpenFile thread.
	// Otherwise, if there are no other locks, acquire a lock to prevent any OpenFile call from adding a request again.
	// OpenFile thread already takes a lock on the file in its code, so don't take it again here.
	if !item.Priority {
		flock := ds.fileLocks.Get(item.Path)
		flock.Lock()
		defer flock.Unlock()
	}

	filePresent, isDir, size := isFilePresent(localPath)
	if filePresent {
		if isDir {
			log.Err("downloadSplitter::Process : %s is a directory", item.Path)
			return -1, fmt.Errorf("%s is a directory", item.Path)
		} else if item.DataLen == uint64(size) {
			log.Debug("downloadSplitter::Process : %s will be served from local path, priority %v", item.Path, item.Priority)
			return int(size), nil
		}
	}

	// TODO:: xload : should we delete the file if it already exists
	// TODO:: xload : what should be the flags
	// TODO:: xload : verify if the mode is set correctly
	// TODO:: xload : handle case if blob is a symlink
	item.FileHandle, err = os.OpenFile(localPath, os.O_RDWR|os.O_CREATE, item.Mode)
	if err != nil {
		log.Err("downloadSplitter::Process : Failed to create file %s [%s]", item.Path, err.Error())
		return -1, fmt.Errorf("failed to open file %s [%s]", item.Path, err.Error())
	}

	defer item.FileHandle.Close()

	if item.DataLen == 0 {
		log.Debug("downloadSplitter::Process : 0 byte file %s", item.Path)
		// send the status to stats manager
		ds.GetStatsManager().AddStats(&StatsItem{
			Component: SPLITTER,
			Name:      item.Path,
			Success:   true,
			Download:  true,
		})
		return 0, nil
	}

	// truncate the file to its size
	err = item.FileHandle.Truncate(int64(item.DataLen))
	if err != nil {
		log.Err("downloadSplitter::Process : Failed to truncate file %s, so deleting it from local path [%s]", item.Path, err.Error())

		// delete the file which failed to truncate from the local path
		err1 := os.Remove(localPath)
		if err1 != nil {
			log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err1.Error())
		}

		return -1, fmt.Errorf("failed to truncate file %s [%s]", item.Path, err.Error())
	}

	numBlocks := ((item.DataLen - 1) / ds.blockPool.GetBlockSize()) + 1
	offset := int64(0)

	wg := sync.WaitGroup{}
	wg.Add(1)

	responseChannel := make(chan *WorkItem, numBlocks)
	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	operationSuccess := true
	go func() {
		defer wg.Done()

		for i := 0; i < int(numBlocks); i++ {
			respSplitItem := <-responseChannel
			if respSplitItem.Err != nil {
				log.Err("downloadSplitter::Process : Failed to download data for file %s", item.Path)
				operationSuccess = false
				cancel() // cancel the context to stop download of other chunks
			} else {
				_, err := item.FileHandle.WriteAt(respSplitItem.Block.Data[:respSplitItem.DataLen], respSplitItem.Block.Offset)
				if err != nil {
					log.Err("downloadSplitter::Process : Failed to write data to file %s", item.Path)
					operationSuccess = false
					cancel() // cancel the context to stop download of other chunks
				}
			}

			if respSplitItem.Block != nil {
				// log.Debug("downloadSplitter::process : Download successful %s index %d offset %v", item.path, respSplitItem.block.index, respSplitItem.block.offset)
				ds.blockPool.Release(respSplitItem.Block)
			}
		}
	}()

	for i := 0; i < int(numBlocks); i++ {
		block := ds.blockPool.GetBlock(item.Priority)
		if block == nil {
			responseChannel <- &WorkItem{Err: fmt.Errorf("failed to get block from pool for file %s, offset %v", item.Path, offset)}
		} else {
			block.Index = i
			block.Offset = offset
			block.Length = int64(ds.blockPool.GetBlockSize())

			splitItem := &WorkItem{
				CompName:        ds.GetNext().GetName(),
				Path:            item.Path,
				DataLen:         item.DataLen,
				FileHandle:      item.FileHandle,
				Block:           block,
				ResponseChannel: responseChannel,
				Download:        true,
				Priority:        item.Priority,
				Ctx:             ctx,
			}
			// log.Debug("downloadSplitter::Process : Scheduling download for %s offset %v", item.Path, offset)
			ds.GetNext().Schedule(splitItem)
		}

		offset += int64(ds.blockPool.GetBlockSize())
	}

	wg.Wait()

	// update the last modified time
	// TODO:: xload : verify if the lmt is updated correctly
	err = os.Chtimes(localPath, item.Atime, item.Mtime)
	if err != nil {
		log.Err("downloadSplitter::Process : Failed to change times of file %s [%s]", item.Path, err.Error())
	}

	if ds.validateMD5 && operationSuccess {
		err = ds.checkConsistency(item)
		if err != nil {
			// TODO:: xload : retry if md5 validation fails
			log.Err("downloadSplitter::Process : unable to validate md5 for %s [%s]", item.Path, err.Error())
			operationSuccess = false
		}
	}

	// send the download status to stats manager
	ds.GetStatsManager().AddStats(&StatsItem{
		Component: SPLITTER,
		Name:      item.Path,
		Success:   operationSuccess,
		Download:  true,
	})

	if !operationSuccess {
		log.Err("downloadSplitter::Process : Failed to download data for file %s, so deleting it from local path", item.Path)

		// delete the file which failed to download from the local path
		err = os.Remove(localPath)
		if err != nil {
			log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err.Error())
		}

		return -1, fmt.Errorf("failed to download data for file %s", item.Path)
	}

	log.Debug("downloadSplitter::Process : Download completed for file %s, priority %v", item.Path, item.Priority)
	return 0, nil
}