in src/dubbo/protocol/triple/invoker.py [0:0]
def invoke(self, invocation: RpcInvocation) -> Result:
"""
Invoke the invocation.
:param invocation: The invocation to invoke.
:type invocation: RpcInvocation
:return: The result of the invocation.
:rtype: Result
"""
future = concurrent.futures.Future()
result = TriResult(future)
if not self._client.is_connected():
result.set_exception(RemotingError("The client is not connected to the server."))
return result
# get method descriptor
method_descriptor: MethodDescriptor = invocation.get_attribute(common_constants.METHOD_DESCRIPTOR_KEY)
# get arg_serializer
arg_serializing_function = method_descriptor.get_arg_serializer()
arg_serializer = CustomSerializer(arg_serializing_function) if arg_serializing_function else DirectSerializer()
# get return_deserializer
return_deserializing_function = method_descriptor.get_return_deserializer()
return_deserializer = (
CustomDeserializer(return_deserializing_function) if return_deserializing_function else DirectDeserializer()
)
write_stream = TriClientWriteStream()
read_stream = TriReadStream()
# create listener
rpc_type = method_descriptor.get_rpc_type()
is_unary = not rpc_type.client_stream and not rpc_type.server_stream
if is_unary:
listener = FutureToClientCallListenerAdapter(future)
else:
read_stream = TriReadStream()
listener = ReadStreamToClientCallListenerAdapter(read_stream)
# Create a new TriClientCall
tri_client_call = TripleClientCall(
self._stream_multiplexer,
listener,
arg_serializer,
return_deserializer,
)
write_stream.set_call(tri_client_call)
if not is_unary:
write_stream = TriReadWriteStream(write_stream, read_stream)
# start the call
try:
metadata = self._create_metadata(invocation)
tri_client_call.start(metadata)
except ExtensionError as e:
result.set_exception(e)
return result
# write the message
if not rpc_type.client_stream:
# if the client call is not a stream, we send the message directly
FunctionHelper.call_func(write_stream.write, invocation.get_argument())
write_stream.done_writing()
else:
# try to get first argument and check if it is an iterable
args, _ = invocation.get_argument()
if args and isinstance(args[0], Iterator):
# if the argument is an iterator, we need to write the stream
for arg in args[0]:
write_stream.write(arg)
write_stream.done_writing()
# If the call is not unary, we need to return the stream
# server_stream -> return read_stream
# client_stream or bidirectional_stream -> return write_read_stream
if not is_unary:
if rpc_type.server_stream and not rpc_type.client_stream:
result.set_value(read_stream)
else:
result.set_value(write_stream)
return result