Revert "Revert "oran-shell-release: release image for F""
[pti/rtp.git] / meta-starlingx / meta-stx-cloud / recipes-extended / rabbitmq / files / rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch
1 From: Alexey Lebedeff <alebedev@mirantis.com>
2 Date: Wed, 9 Mar 2016 14:55:02 +0300
3 Subject: [PATCH] Avoid RPC roundtrips while listing items
4
5 - Emit info about particular items in parallel on every node, with
6   results delivered directly to a `rabbitmqctl` instance.
7 - `rabbit_control_misc:wait_for_info_messages/5` can wait for results of
8   more than one emitting map.
9 - Stop passing arround InfoItemKeys in
10   `rabbit_control_misc:wait_for_info_messages/5`, the same information
11   could be directly encoded in DisplayFun closure.
12 - Add `emit` to function names, to avoid confusion with regular ones
13   which return result directly.
14
15 Part of https://github.com/rabbitmq/rabbitmq-server/pull/683
16
17 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
18 index 27b352a..e09e02c 100644
19 --- a/src/rabbit_amqqueue.erl
20 +++ b/src/rabbit_amqqueue.erl
21 @@ -25,10 +25,10 @@
22           check_exclusive_access/2, with_exclusive_access_or_die/3,
23           stat/1, deliver/2, requeue/3, ack/3, reject/4]).
24  -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
25 -         info_all/6, info_local/1]).
26 +         emit_info_all/5, list_local/1, info_local/1]).
27  -export([list_down/1]).
28  -export([force_event_refresh/1, notify_policy_changed/1]).
29 --export([consumers/1, consumers_all/1,  consumers_all/3, consumer_info_keys/0]).
30 +-export([consumers/1, consumers_all/1,  emit_consumers_all/4, consumer_info_keys/0]).
31  -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
32  -export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
33  -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
34 @@ -41,7 +41,8 @@
35  
36  %% internal
37  -export([internal_declare/2, internal_delete/1, run_backing_queue/3,
38 -         set_ram_duration_target/2, set_maximum_since_use/2]).
39 +         set_ram_duration_target/2, set_maximum_since_use/2,
40 +         emit_info_local/4, emit_info_down/4, emit_consumers_local/3]).
41  
42  -include("rabbit.hrl").
43  -include_lib("stdlib/include/qlc.hrl").
44 @@ -117,10 +118,6 @@
45  -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
46  -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
47            [rabbit_types:infos()].
48 --spec info_all
49 -        (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(),
50 -         reference(), pid()) ->
51 -            'ok'.
52  -spec force_event_refresh(reference()) -> 'ok'.
53  -spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
54  -spec consumers(rabbit_types:amqqueue()) ->
55 @@ -130,7 +127,6 @@
56  -spec consumers_all(rabbit_types:vhost()) ->
57            [{name(), pid(), rabbit_types:ctag(), boolean(),
58              non_neg_integer(), rabbit_framing:amqp_table()}].
59 --spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
60  -spec stat(rabbit_types:amqqueue()) ->
61            {'ok', non_neg_integer(), non_neg_integer()}.
62  -spec delete_immediately(qpids()) -> 'ok'.
63 @@ -627,16 +623,18 @@ info_all(VHostPath, Items) ->
64      map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
65          map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
66  
67 -info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) ->
68 -    NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler(
69 -                         AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath),
70 -                         continue),
71 -    NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler(
72 -                          AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
73 -                          list_down(VHostPath),
74 -                          continue),
75 -    %% Previous maps are incomplete, finalize emission
76 -    rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
77 +emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
78 +    rabbit_control_misc:emitting_map_with_exit_handler(
79 +      AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).
80 +
81 +emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
82 +    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
83 +    rabbit_control_misc:await_emitters_termination(Pids).
84 +
85 +emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
86 +    rabbit_control_misc:emitting_map_with_exit_handler(
87 +      AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
88 +      list_down(VHostPath)).
89  
90  info_local(VHostPath) ->
91      map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
92 @@ -664,12 +662,17 @@ consumers_all(VHostPath) ->
93        map(list(VHostPath),
94            fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
95  
96 -consumers_all(VHostPath, Ref, AggregatorPid) ->
97 +emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
98 +    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
99 +    rabbit_control_misc:await_emitters_termination(Pids),
100 +    ok.
101 +
102 +emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
103      ConsumerInfoKeys = consumer_info_keys(),
104      rabbit_control_misc:emitting_map(
105        AggregatorPid, Ref,
106        fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
107 -      list(VHostPath)).
108 +      list_local(VHostPath)).
109  
110  get_queue_consumer_info(Q, ConsumerInfoKeys) ->
111      [lists:zip(ConsumerInfoKeys,
112 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
113 index ab7d38d..837a892 100644
114 --- a/src/rabbit_channel.erl
115 +++ b/src/rabbit_channel.erl
116 @@ -56,7 +56,7 @@
117  -export([send_command/2, deliver/4, deliver_reply/2,
118           send_credit_reply/2, send_drained/2]).
119  -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
120 -         info_all/3, info_local/1]).
121 +         emit_info_all/4, info_local/1]).
122  -export([refresh_config_local/0, ready_for_close/1]).
123  -export([force_event_refresh/1]).
124  
125 @@ -64,7 +64,7 @@
126           handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
127           prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
128  %% Internal
129 --export([list_local/0, deliver_reply_local/3]).
130 +-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
131  -export([get_vhost/1, get_user/1]).
132  
133  -record(ch, {
134 @@ -220,7 +220,6 @@
135  -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
136  -spec info_all() -> [rabbit_types:infos()].
137  -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
138 --spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
139  -spec refresh_config_local() -> 'ok'.
140  -spec ready_for_close(pid()) -> 'ok'.
141  -spec force_event_refresh(reference()) -> 'ok'.
142 @@ -329,9 +328,16 @@ info_all(Items) ->
143  info_local(Items) ->
144      rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).
145  
146 -info_all(Items, Ref, AggregatorPid) ->
147 +emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
148 +    Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
149 +    rabbit_control_misc:await_emitters_termination(Pids).
150 +
151 +emit_info_local(Items, Ref, AggregatorPid) ->
152 +    emit_info(list_local(), Items, Ref, AggregatorPid).
153 +
154 +emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
155      rabbit_control_misc:emitting_map_with_exit_handler(
156 -      AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
157 +      AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).
158  
159  refresh_config_local() ->
160      rabbit_misc:upmap(
161 diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl
162 index 2e1f6cc..3b0c60b 100644
163 --- a/src/rabbit_control_misc.erl
164 +++ b/src/rabbit_control_misc.erl
165 @@ -17,7 +17,8 @@
166  -module(rabbit_control_misc).
167  
168  -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
169 -         emitting_map_with_exit_handler/5, wait_for_info_messages/5,
170 +         emitting_map_with_exit_handler/5, wait_for_info_messages/6,
171 +         spawn_emitter_caller/7, await_emitters_termination/1,
172           print_cmd_result/2]).
173  
174  -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
175 @@ -25,7 +26,14 @@
176  -spec emitting_map_with_exit_handler
177          (pid(), reference(), fun(), list()) -> 'ok'.
178  -spec emitting_map_with_exit_handler
179 -        (pid(), reference(), fun(), list(), atom()) -> 'ok'.
180 +        (pid(), reference(), fun(), list(), 'continue') -> 'ok'.
181 +
182 +-type fold_fun() :: fun ((term(), term()) -> term()).
183 +
184 +-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}.
185 +-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
186 +-spec await_emitters_termination ([pid()]) -> 'ok'.
187 +
188  -spec print_cmd_result(atom(), term()) -> 'ok'.
189  
190  emitting_map(AggregatorPid, Ref, Fun, List) ->
191 @@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) ->
192              ok
193      end.
194  
195 -wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) ->
196 -    _ = notify_if_timeout(Pid, Ref, Timeout),
197 -    wait_for_info_messages(Ref, ArgAtoms, DisplayFun).
198 +%% Invokes RPC for async info collection in separate (but linked to
199 +%% the caller) process. Separate process waits for RPC to finish and
200 +%% in case of errors sends them in wait_for_info_messages/5-compatible
201 +%% form to aggregator process. Calling process is then expected to
202 +%% do blocking call of wait_for_info_messages/5.
203 +%%
204 +%% Remote function MUST use calls to emitting_map/4 (and other
205 +%% emitting_map's) to properly deliver requested information to an
206 +%% aggregator process.
207 +%%
208 +%% If for performance reasons several parallel emitting_map's need to
209 +%% be run, remote function MUST NOT return until all this
210 +%% emitting_map's are done. And during all this time remote RPC
211 +%% process MUST be linked to emitting
212 +%% processes. await_emitters_termination/1 helper can be used as a
213 +%% last statement of remote function to ensure this behaviour.
214 +spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
215 +    spawn_monitor(
216 +      fun () ->
217 +              case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of
218 +                  {error, _} = Error        ->
219 +                      Pid ! {Ref, error, Error};
220 +                  {bad_argument, _} = Error ->
221 +                      Pid ! {Ref, error, Error};
222 +                  {badrpc, _} = Error       ->
223 +                      Pid ! {Ref, error, Error};
224 +                  _                         ->
225 +                      ok
226 +              end
227 +      end),
228 +    ok.
229 +
230 +rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
231 +    rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
232 +
233 +%% Agregator process expects correct numbers of explicits ACKs about
234 +%% finished emission process. While everything is linked, we still
235 +%% need somehow to wait for termination of all emitters before
236 +%% returning from RPC call - otherwise links will be just broken with
237 +%% reason 'normal' and we can miss some errors, and subsequentially
238 +%% hang.
239 +await_emitters_termination(Pids) ->
240 +    Monitors = [erlang:monitor(process, Pid) || Pid <- Pids],
241 +    collect_monitors(Monitors).
242  
243 -wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) ->
244 +collect_monitors([]) ->
245 +    ok;
246 +collect_monitors([Monitor|Rest]) ->
247      receive
248 -        {Ref,  finished}         ->
249 -            ok;
250 -        {Ref,  {timeout, T}}     ->
251 +        {'DOWN', Monitor, _Pid, normal} ->
252 +            collect_monitors(Rest);
253 +        {'DOWN', Monitor, _Pid, noproc} ->
254 +            %% There is a link and a monitor to a process. Matching
255 +            %% this clause means that process has gracefully
256 +            %% terminated even before we've started monitoring.
257 +            collect_monitors(Rest);
258 +        {'DOWN', _, Pid, Reason} ->
259 +            exit({emitter_exit, Pid, Reason})
260 +    end.
261 +
262 +%% Wait for result of one or more calls to emitting_map-family
263 +%% functions.
264 +%%
265 +%% Number of expected acknowledgments is specified by ChunkCount
266 +%% argument. Most common usage will be with ChunkCount equals to
267 +%% number of live nodes, but it's not mandatory - thus more generic
268 +%% name of 'ChunkCount' was chosen.
269 +wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) ->
270 +    notify_if_timeout(Pid, Ref, Timeout),
271 +    wait_for_info_messages(Ref, Fun, Acc0, ChunkCount).
272 +
273 +wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) ->
274 +    receive
275 +        {Ref, finished} when ChunksLeft =:= 1 ->
276 +            {ok, Acc0};
277 +        {Ref, finished} ->
278 +            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1);
279 +        {Ref, {timeout, T}} ->
280              exit({error, {timeout, (T / 1000)}});
281 -        {Ref,  []}               ->
282 -            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
283 -        {Ref,  Result, continue} ->
284 -            DisplayFun(Result, InfoItemKeys),
285 -            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
286 -        {error, Error}           ->
287 -            Error;
288 -        _                        ->
289 -            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun)
290 +        {Ref, []} ->
291 +            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
292 +        {Ref, Result, continue} ->
293 +            wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft);
294 +        {Ref, error, Error} ->
295 +            {error, simplify_emission_error(Error)};
296 +        {'DOWN', _MRef, process, _Pid, normal} ->
297 +            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
298 +        {'DOWN', _MRef, process, _Pid, Reason} ->
299 +            {error, simplify_emission_error(Reason)};
300 +        _Msg ->
301 +            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft)
302      end.
303  
304 +simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) ->
305 +    EmissionError;
306 +simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) ->
307 +    EmissionError;
308 +simplify_emission_error(Anything) ->
309 +    {error, Anything}.
310 +
311 +notify_if_timeout(_, _, infinity) ->
312 +    ok;
313  notify_if_timeout(Pid, Ref, Timeout) ->
314      timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).
315  
316 diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
317 index 8965c59..9341ea9 100644
318 --- a/src/rabbit_misc.erl
319 +++ b/src/rabbit_misc.erl
320 @@ -75,7 +75,7 @@
321  -export([get_env/3]).
322  -export([get_channel_operation_timeout/0]).
323  -export([random/1]).
324 --export([rpc_call/4, rpc_call/5, rpc_call/7]).
325 +-export([rpc_call/4, rpc_call/5]).
326  -export([report_default_thread_pool_size/0]).
327  -export([get_gc_info/1]).
328  
329 @@ -264,8 +264,6 @@
330  -spec random(non_neg_integer()) -> non_neg_integer().
331  -spec rpc_call(node(), atom(), atom(), [any()]) -> any().
332  -spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
333 --spec rpc_call
334 -        (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any().
335  -spec report_default_thread_pool_size() -> 'ok'.
336  -spec get_gc_info(pid()) -> integer().
337  
338 @@ -1184,9 +1182,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) ->
339                             rpc:call(Node, Mod, Fun, Args, Timeout)
340      end.
341  
342 -rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
343 -    rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
344 -
345  guess_number_of_cpu_cores() ->
346      case erlang:system_info(logical_processors_available) of
347          unknown -> % Happens on Mac OS X.
348 diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
349 index 5bf30ff..63e3ed0 100644
350 --- a/src/rabbit_networking.erl
351 +++ b/src/rabbit_networking.erl
352 @@ -33,7 +33,8 @@
353           node_listeners/1, register_connection/1, unregister_connection/1,
354           connections/0, connection_info_keys/0,
355           connection_info/1, connection_info/2,
356 -         connection_info_all/0, connection_info_all/1, connection_info_all/3,
357 +         connection_info_all/0, connection_info_all/1,
358 +         emit_connection_info_all/4, emit_connection_info_local/3,
359           close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
360  
361  %% Used by TCP-based transports, e.g. STOMP adapter
362 @@ -89,8 +90,6 @@
363  -spec connection_info_all() -> [rabbit_types:infos()].
364  -spec connection_info_all(rabbit_types:info_keys()) ->
365            [rabbit_types:infos()].
366 --spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) ->
367 -          'ok'.
368  -spec close_connection(pid(), string()) -> 'ok'.
369  -spec force_connection_event_refresh(reference()) -> 'ok'.
370  
371 @@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
372  connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
373  connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
374  
375 -connection_info_all(Items, Ref, AggregatorPid) ->
376 +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
377 +    Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
378 +    rabbit_control_misc:await_emitters_termination(Pids),
379 +    ok.
380 +
381 +emit_connection_info_local(Items, Ref, AggregatorPid) ->
382      rabbit_control_misc:emitting_map_with_exit_handler(
383        AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
384 -      connections()).
385 +      connections_local()).
386  
387  close_connection(Pid, Explanation) ->
388      rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),