commander/ipc/client/client.go (137 lines of code) (raw):
package client
import (
"context"
"fmt"
"sync"
"time"
"github.com/aliyun/aliyun_assist_client/commander/ipc/endpoint"
"github.com/aliyun/aliyun_assist_client/commander/model"
"github.com/aliyun/aliyun_assist_client/commander/taskerrors"
pb "github.com/aliyun/aliyun_assist_client/interprocess/commander/agrpc"
"github.com/aliyun/aliyun_assist_client/interprocess/messagebus/buses"
messagebus_client "github.com/aliyun/aliyun_assist_client/interprocess/messagebus/client"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
grpc "google.golang.org/grpc"
)
type assistAgentClient struct {
Conn *grpc.ClientConn
Client pb.AssistAgentClient
}
const (
dialTimeout = 5
)
var (
_client *assistAgentClient
_endpoint buses.Endpoint // Endpoint of agent server
_needRebuildClient bool // _endpoint changed, create new client
_lock sync.Mutex
)
func getClient(logger logrus.FieldLogger) (*assistAgentClient, error) {
_lock.Lock()
defer _lock.Unlock()
if _needRebuildClient {
_needRebuildClient = false
if _client != nil {
_client.Conn.Close()
}
} else if _client != nil {
return _client, nil
}
conn, err := messagebus_client.ConnectWithTimeout(logger, _endpoint, time.Duration(dialTimeout)*time.Second)
if err != nil {
return nil, err
}
client := &assistAgentClient{}
client.Conn = conn
client.Client = pb.NewAssistAgentClient(client.Conn)
_client = client
return _client, nil
}
func UpdateEndpoint(endpoint buses.Endpoint) {
_lock.Lock()
defer _lock.Unlock()
_needRebuildClient = true
_endpoint = endpoint
}
func RegisterCommander(logger logrus.FieldLogger, handshakeToken string) error {
logger = logger.WithField("client", "RegisterCommander")
client, err := getClient(logger)
logger.Info("Client RegisterCommander handshakeToken:" + handshakeToken)
if err != nil {
logger.Error("Create new client failed: ", err)
return err
}
commanderName, commanderSupportedApiVersion := model.GetCommanderBaseInfo()
req := &pb.RegisterCommanderReq{
PluginName: commanderName,
HandshakeToken: handshakeToken,
Endpoint: endpoint.GetEndpoint(false).String(),
SupportedApiVersion: fmt.Sprintf("[\"%s\"]", commanderSupportedApiVersion),
}
resp, err := client.Client.RegisterCommander(context.Background(), req)
if err != nil {
return err
}
if resp.Status.StatusCode != 0 {
return fmt.Errorf(resp.Status.ErrMessage)
}
return nil
}
func WriteSubmissionOutput(logger logrus.FieldLogger, index int, submissionId, output string) error {
logger = logger.WithField("client", "WriteSubmissionOutput")
client, err := getClient(logger)
if err != nil {
logger.Error("Create new client failed: ", err)
return err
}
req := &pb.WriteSubmissionOutputReq{
SubmissionId: submissionId,
Output: &pb.TaskOutput{
Index: int32(index),
Output: output,
},
}
resp, err := client.Client.WriteSubmissionOutput(context.Background(), req)
if err != nil {
return err
}
if resp.Status.StatusCode != 0 {
return fmt.Errorf(resp.Status.ErrMessage)
}
return nil
}
func FinalizeSubmissionStatus(logger logrus.FieldLogger, index int, submissionId, output string,
exitCode, taskStatus int, taskError *taskerrors.TaskError) error {
logger = logger.WithField("client", "FinalizeSubmissionStatus")
client, err := getClient(logger)
if err != nil {
logger.Error("Create new client failed: ", err)
return err
}
req := &pb.FinalizeSubmissionStatusReq{
SubmissionId: submissionId,
Output: &pb.TaskOutput{
Index: int32(index),
Output: output,
},
Result: &pb.TaskRes{
ExitCode: int32(exitCode),
TaskStatus: int32(taskStatus),
},
}
if taskError != nil {
req.Result.TaskError = &pb.TaskError{
ErrorCode: taskError.ErrorCode,
ErrorSubCode: taskError.ErrorSubCode,
ErrorMessage: taskError.ErrorMessage,
}
}
resp, err := client.Client.FinalizeSubmissionStatus(context.Background(), req)
if err != nil {
return err
}
if resp.Status.StatusCode != 0 {
return fmt.Errorf(resp.Status.ErrMessage)
}
return nil
}