src/dubbo_registry_zookeeper.erl (326 lines of code) (raw):

%%------------------------------------------------------------------------------ %% Licensed to the Apache Software Foundation (ASF) under one or more %% contributor license agreements. See the NOTICE file distributed with %% this work for additional information regarding copyright ownership. %% The ASF licenses this file to You under the Apache License, Version 2.0 %% (the "License"); you may not use this file except in compliance with %% the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ -module(dubbo_registry_zookeeper). -behaviour(gen_server). -behaviour(dubbo_registry). -include("dubbo.hrl"). -include("dubboerl.hrl"). %% API -export([start_link/0, register_provider/1, provider_watcher/1]). -export([start/1, register/1, unregister/1, subscribe/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -define(CALL_TIMEOUT,20000). -record(state, {zk_pid, provider_notify_fun}). %%%=================================================================== %%% API %%%=================================================================== %%-------------------------------------------------------------------- %% @doc %% Starts the server %% %% @end %%-------------------------------------------------------------------- -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Initializes the server %% %% @spec init(Args) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% @end %%-------------------------------------------------------------------- -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> {ok, Pid} = connection(), {ok, #state{zk_pid = Pid}}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling call messages %% %% @end %%-------------------------------------------------------------------- -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call({do_register, Url}, _From, State) -> Result = do_register(State#state.zk_pid, Url), {reply, Result, State}; handle_call({do_unregister, Url}, _From, State) -> Result = do_unregister(State#state.zk_pid, Url), {reply, Result, State}; handle_call({subscribe_provider, InterfaceName, NotifyFun}, _From, #state{zk_pid = ZkPid} = State) -> logger:debug("subscribe provider ~p notify fun ~p", [InterfaceName, NotifyFun]), NewState = State#state{provider_notify_fun = NotifyFun}, List = get_provider_list(InterfaceName, ZkPid), notify_provider_change(NotifyFun, InterfaceName, List), {reply, ok, NewState}; handle_call(_Request, _From, State) -> {reply, ok, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling cast messages %% %% @end %%-------------------------------------------------------------------- -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid, provider_notify_fun = NotifyFun} = State) -> ProviderList = get_provider_and_start(Pid, Interface, Path), notify_provider_change(NotifyFun, Interface, ProviderList), {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling all non call/cast messages %% %% @spec handle_info(Info, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. %% %% @spec terminate(Reason, State) -> void() %% @end %%-------------------------------------------------------------------- -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(_Reason, _State) -> ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} %% @end %%-------------------------------------------------------------------- -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------- %% dubbo_registry %%---------------------------------------------- start(_Url) -> dubbo_registry_sup:start_child(?MODULE, {?MODULE, start_link, []}, ?MODULE), ok. -spec(register(binary()) -> {ok, Result :: binary()} | {error, any()}). register(Url) -> try gen_server:call(?SERVER, {do_register, Url}, ?CALL_TIMEOUT) of Result -> Result catch exit:{timeout, _} -> {error, timeout}; exit:{badrpc, _} -> {error, badrpc}; _:Reason -> {error, Reason} end. -spec(unregister(binary()) -> {ok, Result :: binary()} | {error, any()}). unregister(Url) -> try gen_server:call(?SERVER, {do_unregister, Url}, ?CALL_TIMEOUT) of Result -> Result catch exit:{timeout, _} -> {error, timeout}; exit:{badrpc, _} -> {error, badrpc}; _:Reason -> {error, Reason} end. do_register(Pid, Url) -> case dubbo_common_fun:parse_url(Url) of {ok, UrlInfo} -> CreateNodeList = [{get_register_node(Item, UrlInfo), p} || Item <- [root, service, category]], UrlNode = {list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))), get_dynamic(UrlInfo)}, CreateNodeList2 = CreateNodeList ++ [UrlNode], RetFullNode = check_and_create_path(Pid, <<"">>, CreateNodeList2), {ok, RetFullNode}; Reason -> logger:error("zk parse url fail reason ~p", [Reason]), {error, Reason} end. do_unregister(Pid, Url) -> case dubbo_common_fun:parse_url(Url) of {ok, UrlInfo} -> CreateNodeList = [get_register_node(Item, UrlInfo) || Item <- [root, service, category]], UrlNode = list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))), CreateNodeList2 = CreateNodeList ++ [UrlNode], Path = dubbo_common_fun:binary_list_join(CreateNodeList2, <<"/">>), FullPath = <<<<"/">>/binary, Path/binary>>, del_path(Pid, FullPath); Reason -> logger:error("zk parse url fail reason ~p", [Reason]), {error, Reason} end. get_dynamic(UrlInfo) -> case maps:get(<<"dynamic">>, UrlInfo#dubbo_url.parameters, <<"true">>) of <<"true">> -> e; _ -> p end. get_register_node(root, _UrlInfo) -> <<"dubbo">>; get_register_node(service, UrlInfo) -> maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters); get_register_node(category, UrlInfo) -> maps:get(<<"category">>, UrlInfo#dubbo_url.parameters, <<"providers">>). subscribe(SubcribeUrl, NotifyFun) -> {ok, UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl), InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters), try gen_server:call(?SERVER, {subscribe_provider, InterfaceName, NotifyFun}, 5000) of ok -> ok catch _Error:Reason -> {error, Reason} end. register_provider(Provider) -> gen_server:call(?SERVER, {add_provider, Provider}), ok. %%%=================================================================== %%% Internal functions %%%=================================================================== connection() -> {ok, List} = application:get_env(dubboerl, zookeeper_list), {ok, Pid} = erlzk:connect(List, 30000, [ {chroot, "/"}, {monitor, self()}]), {ok, Pid}. get_provider_list(InterfaceName, ZkPid) -> InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>, ChildList = get_provider_and_start(ZkPid, InterfaceName, InterfacePath), ChildList. get_provider_and_start(Pid, Interface, Path) -> case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of {ok, ChildList} -> logger:debug("get provider list ~p", [ChildList]), ChildList; {error, no_node} -> logger:warning("interface ~p provide zk node unexist", [Interface]), check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Interface, p}, {<<"providers">>, p}]), get_provider_and_start(Pid, Interface, Path); {error, R1} -> logger:debug("[add_consumer] get_provider_list error ~p", [R1]), [] end. provider_watcher(Interface) -> receive {node_children_changed, Path} -> gen_server:cast(?SERVER, {provider_node_change, Interface, Path}), logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]); {Event, Path} -> logger:debug("provider_watcher get event ~p ~p", [Event, Path]) end, ok. notify_provider_change(Fun, Interface, []) -> UrlInfo = #dubbo_url{ scheme = <<"empty">>, host = <<"127.0.0.1">>, path = Interface, port = 80, parameters = #{ <<"interface">> => Interface } }, UrlInfoBin = dubbo_common_fun:url_to_binary(UrlInfo), logger:debug("notify provider change fun ~p", [Fun]), Fun(Interface, [UrlInfoBin]), ok; notify_provider_change(Fun, Interface, List) -> List2 = [http_uri:decode(Item) || Item <- List], logger:debug("notify provider change fun ~p", [Fun]), Fun(Interface, List2), ok. del_path(Pid, Path) -> case erlzk:delete(Pid, Path) of ok -> ok; {error, Reason} -> logger:warning("zookeeper registry del path error ~p path ~p", [Reason, Path]), {error, Reason} end. create_path(Pid, Path, CreateType) -> case erlzk:create(Pid, Path, CreateType) of {ok, ActualPath} -> logger:debug("create zk path success ~p", [ActualPath]), ok; {error, R1} -> logger:debug("create zk path error ~p ~p", [Path, R1]) end, ok. check_and_create_path(_Pid, RootPath, []) -> RootPath; check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) -> CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>, case erlzk:exists(Pid, CheckPath) of {ok, _Stat} -> check_and_create_path(Pid, CheckPath, Rst); {error, no_node} -> logger:debug("check_and_create_path unexist no_node ~p", [CheckPath]), create_path(Pid, CheckPath, CreateType), check_and_create_path(Pid, CheckPath, Rst); {error, R1} -> logger:error("check_and_create_path error ~p", [R1]), check_and_create_path(Pid, CheckPath, Rst) end.