func()

in component/block_cache/block_cache.go [1564:1657]


func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
	log.Debug("BlockCache::commitBlocks : Staging blocks for %s", handle.Path)

	// Make three attempts to upload all pending blocks
	cnt := 0
	for cnt = 0; cnt < 3; cnt++ {
		if handle.Buffers.Cooking.Len() == 0 {
			break
		}

		err := bc.stageBlocks(handle, MAX_BLOCKS)
		if err != nil {
			log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s [%s]", handle.Path, err.Error())
			return err
		}

		bc.waitAndFreeUploadedBlocks(handle, MAX_BLOCKS)
	}

	if cnt == 3 {
		nodeList := handle.Buffers.Cooking
		node := nodeList.Front()
		for node != nil {
			block := node.Value.(*Block)
			node = node.Next()

			if block.IsDirty() {
				log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s after 3 attempts", handle.Path)
				return fmt.Errorf("failed to stage blocks")
			}
		}
	}

	blockIDList, restageIds, err := bc.getBlockIDList(handle)
	if err != nil {
		log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error())
		return err
	}

	log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path)

	// Commit the block list now
	var newEtag string = ""
	err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize, NewETag: &newEtag})
	if err != nil {
		log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error())
		return err
	}

	// Lock was already acquired on the handle.
	if newEtag != "" {
		handle.SetValue("ETAG", newEtag)
	}

	// set all the blocks as committed
	list, _ := handle.GetValue("blockList")
	listMap := list.(map[int64]*blockInfo)
	for k := range listMap {
		listMap[k].committed = true
	}

	restaged := false
	for _, restageID := range restageIds {
		// We need to restage these blocks
		for i := range blockIDList {
			if blockIDList[i] == restageID {
				// Read one block from offset of this block, which shall effectively read this block and the next block
				// The stage this block again with correct length
				// Remove the next block from blockIDList
				// Commit the block list again
				block, err := bc.getOrCreateBlock(handle, uint64(i)*bc.blockSize)
				if err != nil {
					log.Err("BlockCache::commitBlocks : Failed to get block for %v [%v]", handle.Path, err.Error())
					return err
				}

				block.Dirty()
				restaged = true

				// Next item after this block was a semi zero filler so remove that from the list now
				blockIDList = append(blockIDList[:i+1], blockIDList[i+2:]...)
				break
			}
		}
	}

	if restaged {
		// If any block was restaged then commit the blocks again
		return bc.commitBlocks(handle)
	}

	handle.Flags.Clear(handlemap.HandleFlagDirty)
	return nil
}