in api/key.go [105:191]
func (ozoneClient *OzoneClient) PutKey(volume string, bucket string, key string, source io.Reader) (common.Key, error) {
createKey, err := ozoneClient.OmClient.CreateKey(volume, bucket, key)
if err != nil {
return common.Key{}, err
}
keyInfo := createKey.KeyInfo
location := keyInfo.KeyLocationList[0].KeyLocations[0]
pipeline := location.Pipeline
dnClient, err := datanode.CreateDatanodeClient(pipeline)
if err != nil {
return common.Key{}, err
}
chunkSize := 4096
buffer := make([]byte, chunkSize)
chunks := make([]*dnproto.ChunkInfo, 0)
keySize := uint64(0)
locations := make([]*omproto.KeyLocation, 0)
blockId := ConvertBlockId(location.BlockID)
eof := false
for {
blockOffset := uint64(0)
for i := 0; i < 64; i++ {
count, err := source.Read(buffer)
if err == io.EOF {
eof = true
} else if err != nil {
return common.Key{}, err
}
if count > 0 {
chunk, err := dnClient.CreateAndWriteChunk(blockId, blockOffset, buffer[0:count], uint64(count))
if err != nil {
return common.Key{}, err
}
blockOffset += uint64(count)
keySize += uint64(count)
chunks = append(chunks, &chunk)
}
if eof {
break
}
}
err = dnClient.PutBlock(blockId, chunks)
if err != nil {
return common.Key{}, err
}
if eof {
break
}
//get new block and reset counters
nextBlockResponse, err := ozoneClient.OmClient.AllocateBlock(volume, bucket, key, createKey.ID)
if err != nil {
return common.Key{}, err
}
dnClient.Close()
location = nextBlockResponse.KeyLocation
pipeline = location.Pipeline
dnClient, err = datanode.CreateDatanodeClient(pipeline)
if err != nil {
return common.Key{}, err
}
blockId = ConvertBlockId(location.BlockID)
blockOffset = 0
chunks = make([]*dnproto.ChunkInfo, 0)
}
zero := uint64(0)
locations = append(locations, &omproto.KeyLocation{
BlockID: location.BlockID,
Pipeline: location.Pipeline,
Length: &keySize,
Offset: &zero,
})
ozoneClient.OmClient.CommitKey(volume, bucket, key, createKey.ID, locations, keySize)
return common.Key{}, nil
}