pkg/seata/synchronizers.go (102 lines of code) (raw):
package seata
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
"github.com/apache/seata-k8s/pkg/utils"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)
func SyncService(curr *apiv1.Service, next *apiv1.Service) {
curr.Spec.Ports = next.Spec.Ports
}
func SyncStatefulSet(curr *appsv1.StatefulSet, next *appsv1.StatefulSet) {
curr.Spec.Template = next.Spec.Template
curr.Spec.Replicas = next.Spec.Replicas
}
type rspData struct {
Code string `json:"code"`
Message string `json:"message"`
Data string `json:"data"`
Success bool `json:"success"`
}
func changeCluster(s *seatav1alpha1.SeataServer, i int32, username string, password string) error {
client := http.Client{}
host := fmt.Sprintf("%s-%d.%s.%s.svc.cluster.local:%d", s.Name, i, s.Spec.ServiceName, s.Namespace, s.Spec.Ports.ConsolePort)
values := map[string]string{"username": username, "password": password}
jsonValue, _ := json.Marshal(values)
loginUrl := fmt.Sprintf("http://%s/api/v1/auth/login", host)
rsp, err := client.Post(loginUrl, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
return err
}
defer rsp.Body.Close()
d := &rspData{}
var tokenStr string
if rsp.StatusCode != http.StatusOK {
return errors.New("login failed")
}
body, err := io.ReadAll(rsp.Body)
if err != nil {
return err
}
if err = json.Unmarshal(body, &d); err != nil {
return err
}
if !d.Success {
return errors.New(d.Message)
}
tokenStr = d.Data
targetUrl := fmt.Sprintf("http://%s/metadata/v1/changeCluster?raftClusterStr=%s",
host, url.QueryEscape(utils.ConcatRaftServerAddress(s)))
req, _ := http.NewRequest("POST", targetUrl, nil)
req.Header.Set("Authorization", tokenStr)
rsp, err = client.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
d = &rspData{}
if rsp.StatusCode != http.StatusOK {
return errors.New("failed to changeCluster")
}
body, err = io.ReadAll(rsp.Body)
if err != nil {
return err
}
if err = json.Unmarshal(body, &d); err != nil {
return err
}
if !d.Success {
return errors.New(d.Message)
}
return nil
}
func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer, username string, password string) error {
logger := log.FromContext(ctx)
group, childContext := errgroup.WithContext(ctx)
for i := int32(0); i < s.Spec.Replicas; i++ {
finalI := i
group.Go(func() error {
select {
case <-childContext.Done():
return nil
default:
err := changeCluster(s, finalI, username, password)
if err != nil {
logger.Error(err, fmt.Sprintf("fail to SyncRaftCluster at %d-th pod", finalI))
}
return err
}
})
}
return group.Wait()
}