api/om/om.go (172 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package om import ( "errors" "github.com/apache/ozone-go/api/common" "github.com/apache/ozone-go/api/proto/hdds" ozone_proto "github.com/apache/ozone-go/api/proto/ozone" "github.com/hortonworks/gohadoop" hadoop_ipc_client "github.com/hortonworks/gohadoop/hadoop_common/ipc/client" uuid "github.com/nu7hatch/gouuid" "net" "strconv" "sync" ) var OM_PROTOCOL = "org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol" type OmClient struct { OmHost string client *hadoop_ipc_client.Client clientId string mu sync.Mutex } func CreateOmClient(omhost string) OmClient { clientId, _ := uuid.NewV4() ugi, _ := gohadoop.CreateSimpleUGIProto() c := &hadoop_ipc_client.Client{ ClientId: clientId, Ugi: ugi, ServerAddress: net.JoinHostPort(omhost, strconv.Itoa(9862))} return OmClient{ OmHost: omhost, client: c, } } func (om *OmClient) GetKey(volume string, bucket string, key string) (*ozone_proto.KeyInfo, error) { keyArgs := &ozone_proto.KeyArgs{ VolumeName: &volume, BucketName: &bucket, KeyName: &key, } req := ozone_proto.LookupKeyRequest{ KeyArgs: keyArgs, } requestType := ozone_proto.Type_LookupKey wrapperRequest := ozone_proto.OMRequest{ CmdType: &requestType, LookupKeyRequest: &req, ClientId: &om.clientId, } resp, err := om.submitRequest(&wrapperRequest) if err != nil { return nil, err } keyProto := resp.GetLookupKeyResponse().GetKeyInfo() println(keyProto) return keyProto, nil } func (om *OmClient) ListKeys(volume string, bucket string) ([]*ozone_proto.KeyInfo, error) { return om.ListKeysPrefix(volume, bucket, "") } func (om *OmClient) AllocateBlock(volume string, bucket string, key string, clientID *uint64) (*ozone_proto.AllocateBlockResponse, error) { req := ozone_proto.AllocateBlockRequest{ KeyArgs: &ozone_proto.KeyArgs{ VolumeName: &volume, BucketName: &bucket, KeyName: &key, }, ClientID: clientID, } msgType := ozone_proto.Type_AllocateBlock wrapperRequest := ozone_proto.OMRequest{ CmdType: &msgType, AllocateBlockRequest: &req, ClientId: &om.clientId, } resp, err := om.submitRequest(&wrapperRequest) if err != nil { return nil, err } return resp.AllocateBlockResponse, nil } func (om *OmClient) CreateKey(volume string, bucket string, key string) (*ozone_proto.CreateKeyResponse, error) { req := ozone_proto.CreateKeyRequest{ KeyArgs: &ozone_proto.KeyArgs{ VolumeName: &volume, BucketName: &bucket, KeyName: &key, }, } createKeys := ozone_proto.Type_CreateKey wrapperRequest := ozone_proto.OMRequest{ CmdType: &createKeys, CreateKeyRequest: &req, ClientId: &om.clientId, } resp, err := om.submitRequest(&wrapperRequest) if err != nil { return nil, err } return resp.CreateKeyResponse, nil } func (om *OmClient) CommitKey(volume string, bucket string, key string, id *uint64, keyLocations []*ozone_proto.KeyLocation, size uint64) (common.Key, error) { one := hdds.ReplicationFactor_ONE standalone := hdds.ReplicationType_STAND_ALONE req := ozone_proto.CommitKeyRequest{ KeyArgs: &ozone_proto.KeyArgs{ VolumeName: &volume, BucketName: &bucket, KeyName: &key, KeyLocations: keyLocations, DataSize: &size, Factor: &one, Type: &standalone, }, ClientID: id, } messageType := ozone_proto.Type_CommitKey wrapperRequest := ozone_proto.OMRequest{ CmdType: &messageType, CommitKeyRequest: &req, ClientId: &om.clientId, } _, err := om.submitRequest(&wrapperRequest) if err != nil { return common.Key{}, err } return common.Key{}, nil } func (om *OmClient) ListKeysPrefix(volume string, bucket string, prefix string) ([]*ozone_proto.KeyInfo, error) { req := ozone_proto.ListKeysRequest{ VolumeName: &volume, BucketName: &bucket, Prefix: ptr(prefix), Count: ptri(1000), } listKeys := ozone_proto.Type_ListKeys wrapperRequest := ozone_proto.OMRequest{ CmdType: &listKeys, ListKeysRequest: &req, ClientId: &om.clientId, } resp, err := om.submitRequest(&wrapperRequest) if err != nil { return nil, err } return resp.GetListKeysResponse().GetKeyInfo(), nil } func getRpcPort(ports []*hdds.Port) uint32 { for _, port := range ports { if port.GetName() == "RATIS" { return port.GetValue() } } return 0 } func ptri(i int32) *int32 { return &i } func ptr(s string) *string { return &s } func (om *OmClient) submitRequest(request *ozone_proto.OMRequest) (*ozone_proto.OMResponse, error) { wrapperResponse := ozone_proto.OMResponse{} om.mu.Lock() err := om.client.Call(gohadoop.GetCalleeRPCRequestHeaderProto(&OM_PROTOCOL), request, &wrapperResponse) om.mu.Unlock() if err != nil { return nil, err } if *wrapperResponse.Status != ozone_proto.Status_OK { return nil, errors.New("Error on calling OM " + wrapperResponse.Status.String() + " " + *wrapperResponse.Message) } return &wrapperResponse, nil }