api/key.go (157 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package api import ( "errors" "github.com/apache/ozone-go/api/common" "github.com/apache/ozone-go/api/datanode" dnproto "github.com/apache/ozone-go/api/proto/datanode" "github.com/apache/ozone-go/api/proto/hdds" omproto "github.com/apache/ozone-go/api/proto/ozone" "io" ) func (ozoneClient *OzoneClient) ListKeys(volume string, bucket string) ([]common.Key, error) { keys, err := ozoneClient.OmClient.ListKeys(volume, bucket) if err != nil { return make([]common.Key, 0), err } ret := make([]common.Key, 0) for _, r := range keys { ret = append(ret, KeyFromProto(r)) } return ret, nil } func (ozoneClient *OzoneClient) ListKeysPrefix(volume string, bucket string, prefix string) ([]common.Key, error) { keys, err := ozoneClient.OmClient.ListKeysPrefix(volume, bucket, prefix) if err != nil { return make([]common.Key, 0), err } ret := make([]common.Key, 0) for _, r := range keys { ret = append(ret, KeyFromProto(r)) } return ret, nil } func (ozoneClient *OzoneClient) InfoKey(volume string, bucket string, key string) (common.Key, error) { k, err := ozoneClient.OmClient.GetKey(volume, bucket, key) return KeyFromProto(k), err } func (ozoneClient *OzoneClient) GetKey(volume string, bucket string, key string, destination io.Writer) (common.Key, error) { keyInfo, err := ozoneClient.OmClient.GetKey(volume, bucket, key) if err != nil { return common.Key{}, err } if len(keyInfo.KeyLocationList) == 0 { return common.Key{}, errors.New("Get key returned with zero key location version " + volume + "/" + bucket + "/" + key) } if len(keyInfo.KeyLocationList[0].KeyLocations) == 0 { return common.Key{}, errors.New("Key location doesn't have any datanode for key " + volume + "/" + bucket + "/" + key) } for _, location := range keyInfo.KeyLocationList[0].KeyLocations { pipeline := location.Pipeline dnBlockId := ConvertBlockId(location.BlockID) dnClient, err := datanode.CreateDatanodeClient(pipeline) chunks, err := dnClient.GetBlock(dnBlockId) if err != nil { return common.Key{}, err } for _, chunk := range chunks { data, err := dnClient.ReadChunk(dnBlockId, chunk) if err != nil { return common.Key{}, err } destination.Write(data) } dnClient.Close() } return common.Key{}, nil } func ConvertBlockId(bid *hdds.BlockID) *dnproto.DatanodeBlockID { id := dnproto.DatanodeBlockID{ ContainerID: bid.ContainerBlockID.ContainerID, LocalID: bid.ContainerBlockID.LocalID, } return &id } func (ozoneClient *OzoneClient) PutKey(volume string, bucket string, key string, source io.Reader) (common.Key, error) { createKey, err := ozoneClient.OmClient.CreateKey(volume, bucket, key) if err != nil { return common.Key{}, err } keyInfo := createKey.KeyInfo location := keyInfo.KeyLocationList[0].KeyLocations[0] pipeline := location.Pipeline dnClient, err := datanode.CreateDatanodeClient(pipeline) if err != nil { return common.Key{}, err } chunkSize := 4096 buffer := make([]byte, chunkSize) chunks := make([]*dnproto.ChunkInfo, 0) keySize := uint64(0) locations := make([]*omproto.KeyLocation, 0) blockId := ConvertBlockId(location.BlockID) eof := false for { blockOffset := uint64(0) for i := 0; i < 64; i++ { count, err := source.Read(buffer) if err == io.EOF { eof = true } else if err != nil { return common.Key{}, err } if count > 0 { chunk, err := dnClient.CreateAndWriteChunk(blockId, blockOffset, buffer[0:count], uint64(count)) if err != nil { return common.Key{}, err } blockOffset += uint64(count) keySize += uint64(count) chunks = append(chunks, &chunk) } if eof { break } } err = dnClient.PutBlock(blockId, chunks) if err != nil { return common.Key{}, err } if eof { break } //get new block and reset counters nextBlockResponse, err := ozoneClient.OmClient.AllocateBlock(volume, bucket, key, createKey.ID) if err != nil { return common.Key{}, err } dnClient.Close() location = nextBlockResponse.KeyLocation pipeline = location.Pipeline dnClient, err = datanode.CreateDatanodeClient(pipeline) if err != nil { return common.Key{}, err } blockId = ConvertBlockId(location.BlockID) blockOffset = 0 chunks = make([]*dnproto.ChunkInfo, 0) } zero := uint64(0) locations = append(locations, &omproto.KeyLocation{ BlockID: location.BlockID, Pipeline: location.Pipeline, Length: &keySize, Offset: &zero, }) ozoneClient.OmClient.CommitKey(volume, bucket, key, createKey.ID, locations, keySize) return common.Key{}, nil } func KeyFromProto(keyProto *omproto.KeyInfo) common.Key { replicationType := common.ReplicationType(*keyProto.Type) result := common.Key{ Name: *keyProto.KeyName, Replication: replicationType, VolumeName: *keyProto.VolumeName, BucketName: *keyProto.BucketName, Size: *keyProto.DataSize, } return result }