1 From: Alexey Lebedeff <alebedev@mirantis.com>
2 Date: Wed, 9 Mar 2016 18:09:04 +0300
3 Subject: [PATCH] Avoid RPC roundtrips in list commands
5 Current implementation of various `list_XXX` commands require cross-node
6 roundtrip for every processed item - because `rabbitmqctl` target node
7 is responsible for gathering global list of all items of
8 interest (channels etc.) and then processing them one by one.
10 For example, listing 10000 channels evenly distributed across 3 nodes
11 where network has 1ms delay takes more than 10 seconds on my
12 machine. And with the proposed change listing will take almost the same
13 time as it'll take to gather this info locally. E.g. in the case above
14 listing now takes 0.7 second on the same machine with same 1ms delay.
16 It works by invoking emitting_map on every node, where it should send
17 info about only local items to aggregator, in an async fashion - as no
18 reply from aggregator is needed.
20 diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
21 index ea9d6a2..e6b168a 100644
22 --- a/src/rabbit_control_main.erl
23 +++ b/src/rabbit_control_main.erl
25 sync_queue/1, cancel_sync_queue/1, become/1,
28 --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
29 +-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
31 -define(EXTERNAL_CHECK_INTERVAL, 1000).
33 @@ -595,56 +595,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
35 action(list_users, Node, [], _Opts, Inform, Timeout) ->
36 Inform("Listing users", []),
37 - call(Node, {rabbit_auth_backend_internal, list_users, []},
38 - rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
39 + call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
40 + rabbit_auth_backend_internal:user_info_keys(),
41 + [{timeout, Timeout}, to_bin_utf8]);
43 action(list_permissions, Node, [], Opts, Inform, Timeout) ->
44 VHost = proplists:get_value(?VHOST_OPT, Opts),
45 Inform("Listing permissions in vhost \"~s\"", [VHost]),
46 - call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
47 - rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
49 + call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
50 + rabbit_auth_backend_internal:vhost_perms_info_keys(),
51 + [{timeout, Timeout}, to_bin_utf8, is_escaped]);
53 action(list_parameters, Node, [], Opts, Inform, Timeout) ->
54 VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
55 Inform("Listing runtime parameters", []),
56 - call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
57 - rabbit_runtime_parameters:info_keys(), Timeout);
58 + call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
59 + rabbit_runtime_parameters:info_keys(),
60 + [{timeout, Timeout}]);
62 action(list_policies, Node, [], Opts, Inform, Timeout) ->
63 VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
64 Inform("Listing policies", []),
65 - call(Node, {rabbit_policy, list_formatted, [VHostArg]},
66 - rabbit_policy:info_keys(), Timeout);
67 + call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
68 + rabbit_policy:info_keys(),
69 + [{timeout, Timeout}]);
71 action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
72 Inform("Listing vhosts", []),
73 ArgAtoms = default_if_empty(Args, [name]),
74 - call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
75 + call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
76 + [{timeout, Timeout}, to_bin_utf8]);
78 action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
80 "list_user_permissions expects a username argument, but none provided."};
81 action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
82 Inform("Listing permissions for user ~p", Args),
83 - call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
84 - rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
86 + call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
87 + rabbit_auth_backend_internal:user_perms_info_keys(),
88 + [{timeout, Timeout}, to_bin_utf8, is_escaped]);
90 action(list_queues, Node, Args, Opts, Inform, Timeout) ->
91 - [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
92 Inform("Listing queues", []),
94 + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
95 VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
96 ArgAtoms = default_if_empty(Args, [name, messages]),
97 - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
100 + %% Data for emission
101 + Nodes = nodes_in_cluster(Node, Timeout),
102 + OnlineChunks = if Online -> length(Nodes); true -> 0 end,
103 + OfflineChunks = if Offline -> 1; true -> 0 end,
104 + ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
105 + TimeoutOpt = {timeout, Timeout},
106 + EmissionRef = make_ref(),
107 + EmissionRefOpt = {ref, EmissionRef},
109 + _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
110 + [TimeoutOpt, EmissionRefOpt]),
111 + _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
112 + [TimeoutOpt, EmissionRefOpt]),
113 + display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
115 action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
116 Inform("Listing exchanges", []),
117 VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
118 ArgAtoms = default_if_empty(Args, [name, type]),
119 - call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
120 - ArgAtoms, Timeout);
121 + call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
122 + ArgAtoms, [{timeout, Timeout}]);
124 action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
125 Inform("Listing bindings", []),
126 @@ -652,27 +670,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
127 ArgAtoms = default_if_empty(Args, [source_name, source_kind,
128 destination_name, destination_kind,
129 routing_key, arguments]),
130 - call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
131 - ArgAtoms, Timeout);
132 + call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
133 + ArgAtoms, [{timeout, Timeout}]);
135 action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
136 Inform("Listing connections", []),
137 ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
138 - call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
139 - ArgAtoms, Timeout);
140 + Nodes = nodes_in_cluster(Node, Timeout),
141 + call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
142 + ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
144 action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
145 Inform("Listing channels", []),
146 ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
147 messages_unacknowledged]),
148 - call(Node, {rabbit_channel, info_all, [ArgAtoms]},
149 - ArgAtoms, Timeout);
150 + Nodes = nodes_in_cluster(Node, Timeout),
151 + call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
152 + [{timeout, Timeout}, {chunks, length(Nodes)}]);
154 action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
155 Inform("Listing consumers", []),
156 VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
157 - call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
158 - rabbit_amqqueue:consumer_info_keys(), Timeout);
159 + Nodes = nodes_in_cluster(Node, Timeout),
160 + call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
161 + rabbit_amqqueue:consumer_info_keys(),
162 + [{timeout, Timeout}, {chunks, length(Nodes)}]);
164 action(node_health_check, Node, _Args, _Opts, Inform, Timeout) ->
165 Inform("Checking health of node ~p", [Node]),
166 @@ -788,17 +810,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
168 end, IsEscaped) || X <- InfoItemKeys]).
170 -display_info_message(IsEscaped) ->
171 +display_info_message(IsEscaped, InfoItemKeys) ->
174 - ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
175 + ([FirstResult|_] = List, _) when is_list(FirstResult) ->
176 lists:foreach(fun(Result) ->
177 display_info_message_row(IsEscaped, Result, InfoItemKeys)
181 - (Result, InfoItemKeys) ->
182 - display_info_message_row(IsEscaped, Result, InfoItemKeys)
184 + display_info_message_row(IsEscaped, Result, InfoItemKeys),
188 display_info_list(Results, InfoItemKeys) when is_list(Results) ->
189 @@ -855,7 +878,10 @@ display_call_result(Node, MFA) ->
192 unsafe_rpc(Node, Mod, Fun, Args) ->
193 - case rpc_call(Node, Mod, Fun, Args) of
194 + unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
196 +unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
197 + case rpc_call(Node, Mod, Fun, Args, Timeout) of
198 {badrpc, _} = Res -> throw(Res);
201 @@ -874,33 +900,42 @@ ensure_app_running(Node) ->
202 call(Node, {Mod, Fun, Args}) ->
203 rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
205 -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
206 - call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
207 +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
208 + Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
209 + display_emission_result(Ref, InfoKeys, Opts).
211 +start_emission(Node, {Mod, Fun, Args}, Opts) ->
212 + ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
213 + Timeout = proplists:get_value(timeout, Opts, infinity),
214 + Ref = proplists:get_value(ref, Opts, make_ref()),
215 + rabbit_control_misc:spawn_emitter_caller(
216 + Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
217 + Ref, self(), Timeout),
220 +display_emission_result(Ref, InfoKeys, Opts) ->
221 + IsEscaped = proplists:get_value(is_escaped, Opts, false),
222 + Chunks = proplists:get_value(chunks, Opts, 1),
223 + Timeout = proplists:get_value(timeout, Opts, infinity),
224 + EmissionStatus = rabbit_control_misc:wait_for_info_messages(
225 + self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
226 + emission_to_action_result(EmissionStatus).
228 +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
229 +%% into form expected by rabbit_cli:main/3.
230 +emission_to_action_result({ok, ok}) ->
232 +emission_to_action_result({error, Error}) ->
235 -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
236 - call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
237 +prepare_call_args(Args, ToBinUtf8) ->
239 + true -> valid_utf8_args(Args);
243 -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
244 - Args0 = case ToBinUtf8 of
245 - true -> lists:map(fun list_to_binary_utf8/1, Args);
252 - case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
253 - Ref, Pid, Timeout) of
254 - {error, _} = Error ->
255 - Pid ! {error, Error};
256 - {bad_argument, _} = Error ->
257 - Pid ! {error, Error};
262 - rabbit_control_misc:wait_for_info_messages(
263 - Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
264 +valid_utf8_args(Args) ->
265 + lists:map(fun list_to_binary_utf8/1, Args).
267 list_to_binary_utf8(L) ->
268 B = list_to_binary(L),
269 @@ -950,7 +985,10 @@ split_list([_]) -> exit(even_list_needed);
270 split_list([A, B | T]) -> [{A, B} | split_list(T)].
272 nodes_in_cluster(Node) ->
273 - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
274 + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
276 +nodes_in_cluster(Node, Timeout) ->
277 + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
279 alarms_by_node(Name) ->
280 case rpc_call(Name, rabbit, status, []) of