func()

in api/key.go [105:191]


func (ozoneClient *OzoneClient) PutKey(volume string, bucket string, key string, source io.Reader) (common.Key, error) {
	createKey, err := ozoneClient.OmClient.CreateKey(volume, bucket, key)
	if err != nil {
		return common.Key{}, err
	}

	keyInfo := createKey.KeyInfo
	location := keyInfo.KeyLocationList[0].KeyLocations[0]
	pipeline := location.Pipeline

	dnClient, err := datanode.CreateDatanodeClient(pipeline)
	if err != nil {
		return common.Key{}, err
	}

	chunkSize := 4096
	buffer := make([]byte, chunkSize)

	chunks := make([]*dnproto.ChunkInfo, 0)
	keySize := uint64(0)

	locations := make([]*omproto.KeyLocation, 0)

	blockId := ConvertBlockId(location.BlockID)
	eof := false

	for {
		blockOffset := uint64(0)
		for i := 0; i < 64; i++ {
			count, err := source.Read(buffer)
			if err == io.EOF {
				eof = true
			} else if err != nil {
				return common.Key{}, err
			}
			if count > 0 {
				chunk, err := dnClient.CreateAndWriteChunk(blockId, blockOffset, buffer[0:count], uint64(count))
				if err != nil {
					return common.Key{}, err
				}
				blockOffset += uint64(count)
				keySize += uint64(count)
				chunks = append(chunks, &chunk)
			}
			if eof {
				break
			}
		}

		err = dnClient.PutBlock(blockId, chunks)
		if err != nil {
			return common.Key{}, err
		}
		if eof {
			break
		}

		//get new block and reset counters

		nextBlockResponse, err := ozoneClient.OmClient.AllocateBlock(volume, bucket, key, createKey.ID)
		if err != nil {
			return common.Key{}, err
		}

		dnClient.Close()
		location = nextBlockResponse.KeyLocation
		pipeline = location.Pipeline
		dnClient, err = datanode.CreateDatanodeClient(pipeline)
		if err != nil {
			return common.Key{}, err
		}
		blockId = ConvertBlockId(location.BlockID)
		blockOffset = 0
		chunks = make([]*dnproto.ChunkInfo, 0)

	}
	zero := uint64(0)
	locations = append(locations, &omproto.KeyLocation{
		BlockID:  location.BlockID,
		Pipeline: location.Pipeline,
		Length:   &keySize,
		Offset:   &zero,
	})

	ozoneClient.OmClient.CommitKey(volume, bucket, key, createKey.ID, locations, keySize)
	return common.Key{}, nil
}