syncer/rpc/server.go (81 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rpc
import (
"context"
"fmt"
"time"
"github.com/apache/servicecomb-service-center/pkg/log"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/service/replicator"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
)
const (
HealthStatusConnected = "CONNECTED"
HealthStatusAbnormal = "ABNORMAL"
HealthStatusClose = "CLOSE"
HealthStatusAuthFail = "AuthFail"
RbacAllowedAccountName = "sync-user"
RbacAllowedRoleName = "sync-admin"
)
func NewServer() *Server {
return &Server{
replicator: replicator.Manager(),
}
}
type Server struct {
v1sync.UnimplementedEventServiceServer
replicator replicator.Replicator
}
func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
err := auth(ctx)
if err != nil {
log.Error("auth failed", err)
return generateFailedResults(events, err)
}
log.Info(fmt.Sprintf("start sync: %s", events.Flag()))
res := s.replicator.Persist(ctx, events)
return s.toResults(res), nil
}
func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
if events == nil || len(events.Events) == 0 {
return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
}
rsts := make(map[string]*v1sync.Result, len(events.Events))
for _, evt := range events.Events {
rsts[evt.Id] = &v1sync.Result{
Code: resource.Fail,
Message: err.Error(),
}
}
return &v1sync.Results{Results: rsts}, nil
}
func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
syncResult := make(map[string]*v1sync.Result, len(results))
for _, r := range results {
syncResult[r.EventID] = &v1sync.Result{
Code: r.Status,
Message: r.Message,
}
}
return &v1sync.Results{
Results: syncResult,
}
}
func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
resp := &v1sync.HealthReply{
Status: HealthStatusConnected,
LocalTimestamp: time.Now().UnixNano(),
}
err := auth(ctx)
if err != nil {
resp.Status = HealthStatusAuthFail
log.Error("auth failed", err)
return resp, nil
}
// TODO enable to close syncer
if !config.GetConfig().Sync.EnableOnStart {
resp.Status = HealthStatusClose
log.Error("unexpected health check when syncer is closed", nil)
return resp, nil
}
return resp, nil
}