def invoke()

in src/dubbo/protocol/triple/invoker.py [0:0]


    def invoke(self, invocation: RpcInvocation) -> Result:
        """
        Invoke the invocation.
        :param invocation: The invocation to invoke.
        :type invocation: RpcInvocation
        :return: The result of the invocation.
        :rtype: Result
        """
        future = concurrent.futures.Future()
        result = TriResult(future)
        if not self._client.is_connected():
            result.set_exception(RemotingError("The client is not connected to the server."))
            return result

        # get method descriptor
        method_descriptor: MethodDescriptor = invocation.get_attribute(common_constants.METHOD_DESCRIPTOR_KEY)

        # get arg_serializer
        arg_serializing_function = method_descriptor.get_arg_serializer()
        arg_serializer = CustomSerializer(arg_serializing_function) if arg_serializing_function else DirectSerializer()

        # get return_deserializer
        return_deserializing_function = method_descriptor.get_return_deserializer()
        return_deserializer = (
            CustomDeserializer(return_deserializing_function) if return_deserializing_function else DirectDeserializer()
        )

        write_stream = TriClientWriteStream()
        read_stream = TriReadStream()

        # create listener
        rpc_type = method_descriptor.get_rpc_type()
        is_unary = not rpc_type.client_stream and not rpc_type.server_stream
        if is_unary:
            listener = FutureToClientCallListenerAdapter(future)
        else:
            read_stream = TriReadStream()
            listener = ReadStreamToClientCallListenerAdapter(read_stream)

        # Create a new TriClientCall
        tri_client_call = TripleClientCall(
            self._stream_multiplexer,
            listener,
            arg_serializer,
            return_deserializer,
        )
        write_stream.set_call(tri_client_call)

        if not is_unary:
            write_stream = TriReadWriteStream(write_stream, read_stream)

        # start the call
        try:
            metadata = self._create_metadata(invocation)
            tri_client_call.start(metadata)
        except ExtensionError as e:
            result.set_exception(e)
            return result

        # write the message
        if not rpc_type.client_stream:
            # if the client call is not a stream, we send the message directly
            FunctionHelper.call_func(write_stream.write, invocation.get_argument())
            write_stream.done_writing()
        else:
            # try to get first argument and check if it is an iterable
            args, _ = invocation.get_argument()
            if args and isinstance(args[0], Iterator):
                # if the argument is an iterator, we need to write the stream
                for arg in args[0]:
                    write_stream.write(arg)
                write_stream.done_writing()

        # If the call is not unary, we need to return the stream
        # server_stream -> return read_stream
        # client_stream or bidirectional_stream -> return write_read_stream
        if not is_unary:
            if rpc_type.server_stream and not rpc_type.client_stream:
                result.set_value(read_stream)
            else:
                result.set_value(write_stream)

        return result