in component/block_cache/block_cache.go [1564:1657]
func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
log.Debug("BlockCache::commitBlocks : Staging blocks for %s", handle.Path)
// Make three attempts to upload all pending blocks
cnt := 0
for cnt = 0; cnt < 3; cnt++ {
if handle.Buffers.Cooking.Len() == 0 {
break
}
err := bc.stageBlocks(handle, MAX_BLOCKS)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s [%s]", handle.Path, err.Error())
return err
}
bc.waitAndFreeUploadedBlocks(handle, MAX_BLOCKS)
}
if cnt == 3 {
nodeList := handle.Buffers.Cooking
node := nodeList.Front()
for node != nil {
block := node.Value.(*Block)
node = node.Next()
if block.IsDirty() {
log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s after 3 attempts", handle.Path)
return fmt.Errorf("failed to stage blocks")
}
}
}
blockIDList, restageIds, err := bc.getBlockIDList(handle)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error())
return err
}
log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path)
// Commit the block list now
var newEtag string = ""
err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize, NewETag: &newEtag})
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error())
return err
}
// Lock was already acquired on the handle.
if newEtag != "" {
handle.SetValue("ETAG", newEtag)
}
// set all the blocks as committed
list, _ := handle.GetValue("blockList")
listMap := list.(map[int64]*blockInfo)
for k := range listMap {
listMap[k].committed = true
}
restaged := false
for _, restageID := range restageIds {
// We need to restage these blocks
for i := range blockIDList {
if blockIDList[i] == restageID {
// Read one block from offset of this block, which shall effectively read this block and the next block
// The stage this block again with correct length
// Remove the next block from blockIDList
// Commit the block list again
block, err := bc.getOrCreateBlock(handle, uint64(i)*bc.blockSize)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to get block for %v [%v]", handle.Path, err.Error())
return err
}
block.Dirty()
restaged = true
// Next item after this block was a semi zero filler so remove that from the list now
blockIDList = append(blockIDList[:i+1], blockIDList[i+2:]...)
break
}
}
}
if restaged {
// If any block was restaged then commit the blocks again
return bc.commitBlocks(handle)
}
handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
}