src/dubbo_provider_consumer_reg_table.erl (283 lines of code) (raw):

%%------------------------------------------------------------------------------ %% Licensed to the Apache Software Foundation (ASF) under one or more %% contributor license agreements. See the NOTICE file distributed with %% this work for additional information regarding copyright ownership. %% The ASF licenses this file to You under the Apache License, Version 2.0 %% (the "License"); you may not use this file except in compliance with %% the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ -module(dubbo_provider_consumer_reg_table). -behaviour(gen_server). %% API -export([start_link/0, start_consumer/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([update_consumer_connections/2, update_node_conections/2, query_node_connections/1, get_interface_provider_node/1, get_host_connections/2, select_connection/1, update_connection_readonly/2, get_host_flag/1, get_host_flag/2, clean_invalid_provider/1, update_interface_info/1, get_interface_info/1]). -include("dubbo.hrl"). -define(SERVER, ?MODULE). -define(INTERFCE_LIST_TABLE, interface_list). -define(INTERFACE_INFO_TABLE, dubbo_interface_info). -define(PROVIDER_NODE_LIST_TABLE, provider_node_list). -record(state, {}). -ifdef(TEST). -compile([export_all]). -endif. %%%=================================================================== %%% API %%%=================================================================== %%-------------------------------------------------------------------- %% @doc %% Starts the server %% %% @end %%-------------------------------------------------------------------- -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Initializes the server %% %% @spec init(Args) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% @end %%-------------------------------------------------------------------- -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> init_ets_table(), {ok, #state{}}. init_ets_table() -> try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of ?INTERFCE_LIST_TABLE -> ok catch _Type:Reason -> logger:error("new ets table INTERFCE_LIST_TABLE error ~p", [Reason]) end, try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of ?PROVIDER_NODE_LIST_TABLE -> ok catch _Type1:Reason1 -> logger:error("new ets table PROVIDER_NODE_LIST_TABLE error ~p", [Reason1]) end, try ets:new(?INTERFACE_INFO_TABLE, [public, named_table, {keypos, 2}]) of ?INTERFACE_INFO_TABLE -> ok catch _Type2:Reason2 -> logger:error("new ets table INTERFACE_INFO_TABLE error ~p", [Reason2]) end, ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling call messages %% %% @end %%-------------------------------------------------------------------- -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %%handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) -> %% %% OldProviderList = get_interface_provider_node(Interface), %% NewProviderList = add_consumer(ProviderNodeList, []), %% DeleteProverList = OldProviderList -- NewProviderList, %% clean_invalid_provider(DeleteProverList), %% {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling cast messages %% %% @end %%-------------------------------------------------------------------- -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast(_Request, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling all non call/cast messages %% %% @spec handle_info(Info, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. %% %% @spec terminate(Reason, State) -> void() %% @end %%-------------------------------------------------------------------- -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(_Reason, _State) -> ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} %% @end %%-------------------------------------------------------------------- -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State, _Extra) -> {ok, State}. start_consumer(Interface, ProviderNodeInfo) -> gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}). get_host_connections(Host, Port) -> HostFlag = get_host_flag(Host, Port), List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag), List. update_interface_info(InterfaceInfo) -> ets:insert(?INTERFACE_INFO_TABLE, InterfaceInfo). get_interface_info(Interface) -> case ets:lookup(?INTERFACE_INFO_TABLE, Interface) of [] -> undefined; [Result] -> Result end. %%%=================================================================== %%% Internal functions %%%=================================================================== update_node_conections(Interface, Connections) -> lists:map( fun(Item) -> HostFlag = Item#connection_info.host_flag, case ets:match_object(?PROVIDER_NODE_LIST_TABLE, #connection_info{host_flag = HostFlag, pid = Item#connection_info.pid, _ = '_'}) of [] -> I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), logger:debug("update_node_conections insert one record ~p result:~p", [HostFlag, I2]); _ -> logger:debug("update_node_conections hostflag ~p already exit ", [HostFlag]), ok end end, Connections), ok. query_node_connections(Hostflag) -> ets:lookup(?PROVIDER_NODE_LIST_TABLE, Hostflag). update_consumer_connections(Interface, Connections) -> lists:map( fun(Item) -> I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), ok end, Connections), ok. get_host_flag(ProviderConfig) -> HostFlag = <<(ProviderConfig#provider_config.host)/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>, HostFlag. get_host_flag(Host, Port) -> <<Host/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>. update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) -> lists:map(fun(Item) -> I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), case IsUpdateProvideNode of true -> I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); false -> ok end, ok end, ConnectionList), ok. get_interface_provider_node(Interface) -> case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of [] -> []; List -> ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List], dubbo_lists_util:del_duplicate(ListRet) end. select_connection(Interface) -> case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of [] -> {error, none}; List -> Ret = [Item#interface_list.connection_info || Item <- List], {ok, Ret} end. -spec(update_connection_readonly(pid(), boolean()) -> ok). update_connection_readonly(ConnectionPid, Readonly) -> Pattern = #interface_list{pid = ConnectionPid, _ = '_'}, Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern), lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) -> logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]), NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly}, NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo}, ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection), ets:insert(?INTERFCE_LIST_TABLE, NewObject) end, Objects), {ok, length(Objects)}. clean_invalid_provider([]) -> ok; clean_invalid_provider([HostFlag | DeleteProverList]) -> case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of [] -> ok; ProviderNodeConnections -> ProviderNodeConnections1 = dubbo_lists_util:del_duplicate(ProviderNodeConnections), clean_connection_info(ProviderNodeConnections1) end, clean_invalid_provider(DeleteProverList). clean_connection_info(ProviderNodeConnections) -> lists:map(fun(Item) -> Pid = Item#connection_info.pid, Pattern = #interface_list{pid = Pid, _ = '_'}, ets:delete_object(?INTERFCE_LIST_TABLE, Pattern), dubbo_transport_pool_sup:stop_children(Pid) end, ProviderNodeConnections), ok.