in protocol/triple/server.go [248:355]
func (s *Server) handleServiceWithInfo(interfaceName string, invoker protocol.Invoker, info *common.ServiceInfo, opts ...tri.HandlerOption) {
for _, method := range info.Methods {
m := method
procedure := joinProcedure(interfaceName, method.Name)
switch m.Type {
case constant.CallUnary:
_ = s.triServer.RegisterUnaryHandler(
procedure,
m.ReqInitFunc,
func(ctx context.Context, req *tri.Request) (*tri.Response, error) {
var args []any
if argsRaw, ok := req.Msg.([]any); ok {
// non-idl mode, req.Msg consists of many arguments
for _, argRaw := range argsRaw {
// refer to createServiceInfoWithReflection, in ReqInitFunc, argRaw is a pointer to real arg.
// so we have to invoke Elem to get the real arg.
args = append(args, reflect.ValueOf(argRaw).Elem().Interface())
}
} else {
// triple idl mode and old triple idl mode
args = append(args, req.Msg)
}
attachments := generateAttachments(req.Header())
// inject attachments
ctx = context.WithValue(ctx, constant.AttachmentKey, attachments)
capturedAttachments := make(map[string]any)
ctx = context.WithValue(ctx, constant.AttachmentServerKey, capturedAttachments)
invo := invocation.NewRPCInvocation(m.Name, args, attachments)
res := invoker.Invoke(ctx, invo)
// todo(DMwangnima): modify InfoInvoker to get a unified processing logic
var triResp *tri.Response
// please refer to server/InfoInvoker.Invoke()
if existingResp, ok := res.Result().(*tri.Response); ok {
triResp = existingResp
} else {
// please refer to proxy/proxy_factory/ProxyInvoker.Invoke
triResp = tri.NewResponse([]any{res.Result()})
}
for k, v := range res.Attachments() {
switch val := v.(type) {
case string:
triResp.Trailer().Set(k, val)
case []string:
if len(val) > 0 {
triResp.Trailer().Set(k, val[0])
}
default:
triResp.Header().Set(k, fmt.Sprintf("%v", val))
}
}
return triResp, res.Error()
},
opts...,
)
case constant.CallClientStream:
_ = s.triServer.RegisterClientStreamHandler(
procedure,
func(ctx context.Context, stream *tri.ClientStream) (*tri.Response, error) {
var args []any
args = append(args, m.StreamInitFunc(stream))
attachments := generateAttachments(stream.RequestHeader())
// inject attachments
ctx = context.WithValue(ctx, constant.AttachmentKey, attachments)
invo := invocation.NewRPCInvocation(m.Name, args, attachments)
res := invoker.Invoke(ctx, invo)
if triResp, ok := res.Result().(*tri.Response); ok {
return triResp, res.Error()
}
// please refer to proxy/proxy_factory/ProxyInvoker.Invoke
triResp := tri.NewResponse([]any{res.Result()})
return triResp, res.Error()
},
opts...,
)
case constant.CallServerStream:
_ = s.triServer.RegisterServerStreamHandler(
procedure,
m.ReqInitFunc,
func(ctx context.Context, req *tri.Request, stream *tri.ServerStream) error {
var args []any
args = append(args, req.Msg, m.StreamInitFunc(stream))
attachments := generateAttachments(req.Header())
// inject attachments
ctx = context.WithValue(ctx, constant.AttachmentKey, attachments)
invo := invocation.NewRPCInvocation(m.Name, args, attachments)
res := invoker.Invoke(ctx, invo)
return res.Error()
},
opts...,
)
case constant.CallBidiStream:
_ = s.triServer.RegisterBidiStreamHandler(
procedure,
func(ctx context.Context, stream *tri.BidiStream) error {
var args []any
args = append(args, m.StreamInitFunc(stream))
attachments := generateAttachments(stream.RequestHeader())
// inject attachments
ctx = context.WithValue(ctx, constant.AttachmentKey, attachments)
invo := invocation.NewRPCInvocation(m.Name, args, attachments)
res := invoker.Invoke(ctx, invo)
return res.Error()
},
opts...,
)
}
}
}