api/datanode/datanode.go (213 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"errors"
"fmt"
dnapi "github.com/apache/ozone-go/api/proto/datanode"
"github.com/apache/ozone-go/api/proto/hdds"
"github.com/apache/ozone-go/api/proto/ratis"
"google.golang.org/grpc"
"io"
"strconv"
)
type ChunkInfo struct {
Name string
Offset uint64
Len uint64
}
type DatanodeClient struct {
ratisClient *ratis.RaftClientProtocolService_UnorderedClient
ratisReceiver chan ratis.RaftClientReplyProto
standaloneClient *dnapi.XceiverClientProtocolService_SendClient
standaloneReceiver chan dnapi.ContainerCommandResponseProto
ctx context.Context
datanodes []*hdds.DatanodeDetailsProto
currentDatanode hdds.DatanodeDetailsProto
grpcConnection *grpc.ClientConn
pipelineId *hdds.PipelineID
memberIndex int
}
func (dn *DatanodeClient) GetCurrentDnUUid() *string {
uid := dn.currentDatanode.GetUuid()
return &uid
}
func (dnClient *DatanodeClient) connectToNext() error {
if dnClient.grpcConnection != nil {
dnClient.grpcConnection.Close()
}
dnClient.memberIndex = dnClient.memberIndex + 1
if dnClient.memberIndex == len(dnClient.datanodes) {
dnClient.memberIndex = 0
}
selectedDatanode := dnClient.datanodes[dnClient.memberIndex]
dnClient.currentDatanode = *selectedDatanode
standalonePort := 0
for _, port := range dnClient.currentDatanode.Ports {
if *port.Name == "STANDALONE" {
standalonePort = int(*port.Value)
}
}
address := *dnClient.currentDatanode.IpAddress + ":" + strconv.Itoa(standalonePort)
println("Connecting to the " + address)
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
dnClient.ratisReceiver = make(chan ratis.RaftClientReplyProto)
dnClient.standaloneReceiver = make(chan dnapi.ContainerCommandResponseProto)
client, err := dnapi.NewXceiverClientProtocolServiceClient(conn).Send(dnClient.ctx)
dnClient.standaloneClient = &client
go dnClient.StandaloneReceive()
return nil
}
func CreateDatanodeClient(pipeline *hdds.Pipeline) (*DatanodeClient, error) {
dnClient := &DatanodeClient{
ctx: context.Background(),
pipelineId: pipeline.Id,
datanodes: pipeline.Members,
memberIndex: -1,
}
err := dnClient.connectToNext()
if err != nil {
return nil, err
}
return dnClient, nil
}
func (dnClient *DatanodeClient) RaftReceiver() {
for {
proto, err := (*dnClient.ratisClient).Recv()
if err == io.EOF {
return
}
if err != nil {
fmt.Println(err)
return
}
dnClient.ratisReceiver <- *proto
}
}
func (dnClient *DatanodeClient) CreateAndWriteChunk(id *dnapi.DatanodeBlockID, blockOffset uint64, buffer []byte, length uint64) (dnapi.ChunkInfo, error) {
bpc := uint32(12)
checksumType := dnapi.ChecksumType_NONE
checksumDataProto := dnapi.ChecksumData{
Type: &checksumType,
BytesPerChecksum: &bpc,
}
chunkName := fmt.Sprintf("chunk_%d", blockOffset)
chunkInfoProto := dnapi.ChunkInfo{
ChunkName: &chunkName,
Offset: &blockOffset,
Len: &length,
ChecksumData: &checksumDataProto,
}
return dnClient.WriteChunk(id, chunkInfoProto, buffer[0:length])
}
func (dnClient *DatanodeClient) WriteChunk(id *dnapi.DatanodeBlockID, info dnapi.ChunkInfo, data []byte) (dnapi.ChunkInfo, error) {
req := dnapi.WriteChunkRequestProto{
BlockID: id,
ChunkData: &info,
Data: data,
}
commandType := dnapi.Type_WriteChunk
uuid := dnClient.currentDatanode.GetUuid()
proto := dnapi.ContainerCommandRequestProto{
CmdType: &commandType,
WriteChunk: &req,
ContainerID: id.ContainerID,
DatanodeUuid: &uuid,
}
_, err := dnClient.sendDatanodeCommand(proto)
if err != nil {
return info, err
}
return info, nil
}
func (dnClient *DatanodeClient) ReadChunk(id *dnapi.DatanodeBlockID, info ChunkInfo) ([]byte, error) {
result := make([]byte, 0)
bpc := uint32(12)
checksumType := dnapi.ChecksumType_NONE
checksumDataProto := dnapi.ChecksumData{
Type: &checksumType,
BytesPerChecksum: &bpc,
}
chunkInfoProto := dnapi.ChunkInfo{
ChunkName: &info.Name,
Offset: &info.Offset,
Len: &info.Len,
ChecksumData: &checksumDataProto,
}
req := dnapi.ReadChunkRequestProto{
BlockID: id,
ChunkData: &chunkInfoProto,
}
commandType := dnapi.Type_ReadChunk
uuid := dnClient.currentDatanode.GetUuid()
proto := dnapi.ContainerCommandRequestProto{
CmdType: &commandType,
ReadChunk: &req,
ContainerID: id.ContainerID,
DatanodeUuid: &uuid,
}
resp, err := dnClient.sendDatanodeCommand(proto)
if err != nil {
return result, err
}
if resp.GetResult() != dnapi.Result_SUCCESS {
return nil, errors.New(resp.GetResult().String() + " " + resp.GetMessage())
}
return resp.GetReadChunk().Data, nil
}
func (dnClient *DatanodeClient) PutBlock(id *dnapi.DatanodeBlockID, chunks []*dnapi.ChunkInfo) error {
flags := int64(0)
req := dnapi.PutBlockRequestProto{
BlockData: &dnapi.BlockData{
BlockID: id,
Flags: &flags,
Metadata: make([]*dnapi.KeyValue, 0),
Chunks: chunks,
},
}
commandType := dnapi.Type_PutBlock
proto := dnapi.ContainerCommandRequestProto{
CmdType: &commandType,
PutBlock: &req,
ContainerID: id.ContainerID,
DatanodeUuid: dnClient.GetCurrentDnUUid(),
}
_, err := dnClient.sendDatanodeCommand(proto)
if err != nil {
return err
}
return nil
}
func (dnClient *DatanodeClient) GetBlock(id *dnapi.DatanodeBlockID) ([]ChunkInfo, error) {
result := make([]ChunkInfo, 0)
req := dnapi.GetBlockRequestProto{
BlockID: id,
}
commandType := dnapi.Type_GetBlock
proto := dnapi.ContainerCommandRequestProto{
CmdType: &commandType,
GetBlock: &req,
ContainerID: id.ContainerID,
DatanodeUuid: dnClient.GetCurrentDnUUid(),
}
resp, err := dnClient.sendDatanodeCommand(proto)
if err != nil {
return result, err
}
for _, chunkInfo := range resp.GetGetBlock().GetBlockData().Chunks {
result = append(result, ChunkInfo{
Name: chunkInfo.GetChunkName(),
Offset: chunkInfo.GetOffset(),
Len: chunkInfo.GetLen(),
})
}
return result, nil
}
func (dnClient *DatanodeClient) sendDatanodeCommand(proto dnapi.ContainerCommandRequestProto) (dnapi.ContainerCommandResponseProto, error) {
return dnClient.sendStandaloneDatanodeCommand(proto)
}
func (dn *DatanodeClient) Close() {
(*dn.standaloneClient).CloseSend()
}