1 From: Alexey Lebedeff <alebedev@mirantis.com>
2 Date: Wed, 9 Mar 2016 14:55:02 +0300
3 Subject: [PATCH] Avoid RPC roundtrips while listing items
5 - Emit info about particular items in parallel on every node, with
6 results delivered directly to a `rabbitmqctl` instance.
7 - `rabbit_control_misc:wait_for_info_messages/5` can wait for results of
8 more than one emitting map.
9 - Stop passing arround InfoItemKeys in
10 `rabbit_control_misc:wait_for_info_messages/5`, the same information
11 could be directly encoded in DisplayFun closure.
12 - Add `emit` to function names, to avoid confusion with regular ones
13 which return result directly.
15 Part of https://github.com/rabbitmq/rabbitmq-server/pull/683
17 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
18 index 27b352a..e09e02c 100644
19 --- a/src/rabbit_amqqueue.erl
20 +++ b/src/rabbit_amqqueue.erl
22 check_exclusive_access/2, with_exclusive_access_or_die/3,
23 stat/1, deliver/2, requeue/3, ack/3, reject/4]).
24 -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
25 - info_all/6, info_local/1]).
26 + emit_info_all/5, list_local/1, info_local/1]).
27 -export([list_down/1]).
28 -export([force_event_refresh/1, notify_policy_changed/1]).
29 --export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
30 +-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
31 -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
32 -export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
33 -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
37 -export([internal_declare/2, internal_delete/1, run_backing_queue/3,
38 - set_ram_duration_target/2, set_maximum_since_use/2]).
39 + set_ram_duration_target/2, set_maximum_since_use/2,
40 + emit_info_local/4, emit_info_down/4, emit_consumers_local/3]).
42 -include("rabbit.hrl").
43 -include_lib("stdlib/include/qlc.hrl").
45 -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
46 -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
47 [rabbit_types:infos()].
49 - (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(),
50 - reference(), pid()) ->
52 -spec force_event_refresh(reference()) -> 'ok'.
53 -spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
54 -spec consumers(rabbit_types:amqqueue()) ->
56 -spec consumers_all(rabbit_types:vhost()) ->
57 [{name(), pid(), rabbit_types:ctag(), boolean(),
58 non_neg_integer(), rabbit_framing:amqp_table()}].
59 --spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
60 -spec stat(rabbit_types:amqqueue()) ->
61 {'ok', non_neg_integer(), non_neg_integer()}.
62 -spec delete_immediately(qpids()) -> 'ok'.
63 @@ -627,16 +623,18 @@ info_all(VHostPath, Items) ->
64 map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
65 map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
67 -info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) ->
68 - NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler(
69 - AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath),
71 - NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler(
72 - AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
73 - list_down(VHostPath),
75 - %% Previous maps are incomplete, finalize emission
76 - rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
77 +emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
78 + rabbit_control_misc:emitting_map_with_exit_handler(
79 + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).
81 +emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
82 + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
83 + rabbit_control_misc:await_emitters_termination(Pids).
85 +emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
86 + rabbit_control_misc:emitting_map_with_exit_handler(
87 + AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
88 + list_down(VHostPath)).
90 info_local(VHostPath) ->
91 map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
92 @@ -664,12 +662,17 @@ consumers_all(VHostPath) ->
94 fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
96 -consumers_all(VHostPath, Ref, AggregatorPid) ->
97 +emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
98 + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
99 + rabbit_control_misc:await_emitters_termination(Pids),
102 +emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
103 ConsumerInfoKeys = consumer_info_keys(),
104 rabbit_control_misc:emitting_map(
106 fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
108 + list_local(VHostPath)).
110 get_queue_consumer_info(Q, ConsumerInfoKeys) ->
111 [lists:zip(ConsumerInfoKeys,
112 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
113 index ab7d38d..837a892 100644
114 --- a/src/rabbit_channel.erl
115 +++ b/src/rabbit_channel.erl
117 -export([send_command/2, deliver/4, deliver_reply/2,
118 send_credit_reply/2, send_drained/2]).
119 -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
120 - info_all/3, info_local/1]).
121 + emit_info_all/4, info_local/1]).
122 -export([refresh_config_local/0, ready_for_close/1]).
123 -export([force_event_refresh/1]).
126 handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
127 prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
129 --export([list_local/0, deliver_reply_local/3]).
130 +-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
131 -export([get_vhost/1, get_user/1]).
135 -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
136 -spec info_all() -> [rabbit_types:infos()].
137 -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
138 --spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
139 -spec refresh_config_local() -> 'ok'.
140 -spec ready_for_close(pid()) -> 'ok'.
141 -spec force_event_refresh(reference()) -> 'ok'.
142 @@ -329,9 +328,16 @@ info_all(Items) ->
144 rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).
146 -info_all(Items, Ref, AggregatorPid) ->
147 +emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
148 + Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
149 + rabbit_control_misc:await_emitters_termination(Pids).
151 +emit_info_local(Items, Ref, AggregatorPid) ->
152 + emit_info(list_local(), Items, Ref, AggregatorPid).
154 +emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
155 rabbit_control_misc:emitting_map_with_exit_handler(
156 - AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
157 + AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).
159 refresh_config_local() ->
161 diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl
162 index 2e1f6cc..3b0c60b 100644
163 --- a/src/rabbit_control_misc.erl
164 +++ b/src/rabbit_control_misc.erl
166 -module(rabbit_control_misc).
168 -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
169 - emitting_map_with_exit_handler/5, wait_for_info_messages/5,
170 + emitting_map_with_exit_handler/5, wait_for_info_messages/6,
171 + spawn_emitter_caller/7, await_emitters_termination/1,
172 print_cmd_result/2]).
174 -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
176 -spec emitting_map_with_exit_handler
177 (pid(), reference(), fun(), list()) -> 'ok'.
178 -spec emitting_map_with_exit_handler
179 - (pid(), reference(), fun(), list(), atom()) -> 'ok'.
180 + (pid(), reference(), fun(), list(), 'continue') -> 'ok'.
182 +-type fold_fun() :: fun ((term(), term()) -> term()).
184 +-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}.
185 +-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
186 +-spec await_emitters_termination ([pid()]) -> 'ok'.
188 -spec print_cmd_result(atom(), term()) -> 'ok'.
190 emitting_map(AggregatorPid, Ref, Fun, List) ->
191 @@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) ->
195 -wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) ->
196 - _ = notify_if_timeout(Pid, Ref, Timeout),
197 - wait_for_info_messages(Ref, ArgAtoms, DisplayFun).
198 +%% Invokes RPC for async info collection in separate (but linked to
199 +%% the caller) process. Separate process waits for RPC to finish and
200 +%% in case of errors sends them in wait_for_info_messages/5-compatible
201 +%% form to aggregator process. Calling process is then expected to
202 +%% do blocking call of wait_for_info_messages/5.
204 +%% Remote function MUST use calls to emitting_map/4 (and other
205 +%% emitting_map's) to properly deliver requested information to an
206 +%% aggregator process.
208 +%% If for performance reasons several parallel emitting_map's need to
209 +%% be run, remote function MUST NOT return until all this
210 +%% emitting_map's are done. And during all this time remote RPC
211 +%% process MUST be linked to emitting
212 +%% processes. await_emitters_termination/1 helper can be used as a
213 +%% last statement of remote function to ensure this behaviour.
214 +spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
217 + case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of
218 + {error, _} = Error ->
219 + Pid ! {Ref, error, Error};
220 + {bad_argument, _} = Error ->
221 + Pid ! {Ref, error, Error};
222 + {badrpc, _} = Error ->
223 + Pid ! {Ref, error, Error};
230 +rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
231 + rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
233 +%% Agregator process expects correct numbers of explicits ACKs about
234 +%% finished emission process. While everything is linked, we still
235 +%% need somehow to wait for termination of all emitters before
236 +%% returning from RPC call - otherwise links will be just broken with
237 +%% reason 'normal' and we can miss some errors, and subsequentially
239 +await_emitters_termination(Pids) ->
240 + Monitors = [erlang:monitor(process, Pid) || Pid <- Pids],
241 + collect_monitors(Monitors).
243 -wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) ->
244 +collect_monitors([]) ->
246 +collect_monitors([Monitor|Rest]) ->
250 - {Ref, {timeout, T}} ->
251 + {'DOWN', Monitor, _Pid, normal} ->
252 + collect_monitors(Rest);
253 + {'DOWN', Monitor, _Pid, noproc} ->
254 + %% There is a link and a monitor to a process. Matching
255 + %% this clause means that process has gracefully
256 + %% terminated even before we've started monitoring.
257 + collect_monitors(Rest);
258 + {'DOWN', _, Pid, Reason} ->
259 + exit({emitter_exit, Pid, Reason})
262 +%% Wait for result of one or more calls to emitting_map-family
265 +%% Number of expected acknowledgments is specified by ChunkCount
266 +%% argument. Most common usage will be with ChunkCount equals to
267 +%% number of live nodes, but it's not mandatory - thus more generic
268 +%% name of 'ChunkCount' was chosen.
269 +wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) ->
270 + notify_if_timeout(Pid, Ref, Timeout),
271 + wait_for_info_messages(Ref, Fun, Acc0, ChunkCount).
273 +wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) ->
275 + {Ref, finished} when ChunksLeft =:= 1 ->
278 + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1);
279 + {Ref, {timeout, T}} ->
280 exit({error, {timeout, (T / 1000)}});
282 - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
283 - {Ref, Result, continue} ->
284 - DisplayFun(Result, InfoItemKeys),
285 - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
289 - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun)
291 + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
292 + {Ref, Result, continue} ->
293 + wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft);
294 + {Ref, error, Error} ->
295 + {error, simplify_emission_error(Error)};
296 + {'DOWN', _MRef, process, _Pid, normal} ->
297 + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
298 + {'DOWN', _MRef, process, _Pid, Reason} ->
299 + {error, simplify_emission_error(Reason)};
301 + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft)
304 +simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) ->
306 +simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) ->
308 +simplify_emission_error(Anything) ->
311 +notify_if_timeout(_, _, infinity) ->
313 notify_if_timeout(Pid, Ref, Timeout) ->
314 timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).
316 diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
317 index 8965c59..9341ea9 100644
318 --- a/src/rabbit_misc.erl
319 +++ b/src/rabbit_misc.erl
321 -export([get_env/3]).
322 -export([get_channel_operation_timeout/0]).
324 --export([rpc_call/4, rpc_call/5, rpc_call/7]).
325 +-export([rpc_call/4, rpc_call/5]).
326 -export([report_default_thread_pool_size/0]).
327 -export([get_gc_info/1]).
330 -spec random(non_neg_integer()) -> non_neg_integer().
331 -spec rpc_call(node(), atom(), atom(), [any()]) -> any().
332 -spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
334 - (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any().
335 -spec report_default_thread_pool_size() -> 'ok'.
336 -spec get_gc_info(pid()) -> integer().
338 @@ -1184,9 +1182,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) ->
339 rpc:call(Node, Mod, Fun, Args, Timeout)
342 -rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
343 - rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
345 guess_number_of_cpu_cores() ->
346 case erlang:system_info(logical_processors_available) of
347 unknown -> % Happens on Mac OS X.
348 diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
349 index 5bf30ff..63e3ed0 100644
350 --- a/src/rabbit_networking.erl
351 +++ b/src/rabbit_networking.erl
353 node_listeners/1, register_connection/1, unregister_connection/1,
354 connections/0, connection_info_keys/0,
355 connection_info/1, connection_info/2,
356 - connection_info_all/0, connection_info_all/1, connection_info_all/3,
357 + connection_info_all/0, connection_info_all/1,
358 + emit_connection_info_all/4, emit_connection_info_local/3,
359 close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
361 %% Used by TCP-based transports, e.g. STOMP adapter
363 -spec connection_info_all() -> [rabbit_types:infos()].
364 -spec connection_info_all(rabbit_types:info_keys()) ->
365 [rabbit_types:infos()].
366 --spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) ->
368 -spec close_connection(pid(), string()) -> 'ok'.
369 -spec force_connection_event_refresh(reference()) -> 'ok'.
371 @@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
372 connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
373 connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
375 -connection_info_all(Items, Ref, AggregatorPid) ->
376 +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
377 + Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
378 + rabbit_control_misc:await_emitters_termination(Pids),
381 +emit_connection_info_local(Items, Ref, AggregatorPid) ->
382 rabbit_control_misc:emitting_map_with_exit_handler(
383 AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
385 + connections_local()).
387 close_connection(Pid, Explanation) ->
388 rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),