Revert "Revert "oran-shell-release: release image for F""
[pti/rtp.git] / meta-starlingx / meta-stx-cloud / recipes-extended / rabbitmq / files / rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch
1 From: Alexey Lebedeff <alebedev@mirantis.com>
2 Date: Wed, 9 Mar 2016 18:09:04 +0300
3 Subject: [PATCH] Avoid RPC roundtrips in list commands
4
5 Current implementation of various `list_XXX` commands require cross-node
6 roundtrip for every processed item - because `rabbitmqctl` target node
7 is responsible for gathering global list of all items of
8 interest (channels etc.) and then processing them one by one.
9
10 For example, listing 10000 channels evenly distributed across 3 nodes
11 where network has 1ms delay takes more than 10 seconds on my
12 machine. And with the proposed change listing will take almost the same
13 time as it'll take to gather this info locally. E.g. in the case above
14 listing now takes 0.7 second on the same machine with same 1ms delay.
15
16 It works by invoking emitting_map on every node, where it should send
17 info about only local items to aggregator, in an async fashion - as no
18 reply from aggregator is needed.
19
20 diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
21 index ea9d6a2..e6b168a 100644
22 --- a/src/rabbit_control_main.erl
23 +++ b/src/rabbit_control_main.erl
24 @@ -23,7 +23,7 @@
25           sync_queue/1, cancel_sync_queue/1, become/1,
26           purge_queue/1]).
27  
28 --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
29 +-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
30  
31  -define(EXTERNAL_CHECK_INTERVAL, 1000).
32  
33 @@ -595,56 +595,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
34  
35  action(list_users, Node, [], _Opts, Inform, Timeout) ->
36      Inform("Listing users", []),
37 -    call(Node, {rabbit_auth_backend_internal, list_users, []},
38 -         rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
39 +    call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
40 +                 rabbit_auth_backend_internal:user_info_keys(),
41 +                 [{timeout, Timeout}, to_bin_utf8]);
42  
43  action(list_permissions, Node, [], Opts, Inform, Timeout) ->
44      VHost = proplists:get_value(?VHOST_OPT, Opts),
45      Inform("Listing permissions in vhost \"~s\"", [VHost]),
46 -    call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
47 -         rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
48 -         true);
49 +    call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
50 +                 rabbit_auth_backend_internal:vhost_perms_info_keys(),
51 +                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
52  
53  action(list_parameters, Node, [], Opts, Inform, Timeout) ->
54      VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
55      Inform("Listing runtime parameters", []),
56 -    call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
57 -         rabbit_runtime_parameters:info_keys(), Timeout);
58 +    call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
59 +                 rabbit_runtime_parameters:info_keys(),
60 +                 [{timeout, Timeout}]);
61  
62  action(list_policies, Node, [], Opts, Inform, Timeout) ->
63      VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
64      Inform("Listing policies", []),
65 -    call(Node, {rabbit_policy, list_formatted, [VHostArg]},
66 -         rabbit_policy:info_keys(), Timeout);
67 +    call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
68 +                 rabbit_policy:info_keys(),
69 +                 [{timeout, Timeout}]);
70  
71  action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
72      Inform("Listing vhosts", []),
73      ArgAtoms = default_if_empty(Args, [name]),
74 -    call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
75 +    call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
76 +                 [{timeout, Timeout}, to_bin_utf8]);
77  
78  action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
79      {error_string,
80       "list_user_permissions expects a username argument, but none provided."};
81  action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
82      Inform("Listing permissions for user ~p", Args),
83 -    call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
84 -         rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
85 -         true);
86 +    call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
87 +                 rabbit_auth_backend_internal:user_perms_info_keys(),
88 +                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
89  
90  action(list_queues, Node, Args, Opts, Inform, Timeout) ->
91 -    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
92      Inform("Listing queues", []),
93 +    %% User options
94 +    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
95      VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
96      ArgAtoms = default_if_empty(Args, [name, messages]),
97 -    call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
98 -         ArgAtoms, Timeout);
99 +
100 +    %% Data for emission
101 +    Nodes = nodes_in_cluster(Node, Timeout),
102 +    OnlineChunks = if Online -> length(Nodes); true -> 0 end,
103 +    OfflineChunks = if Offline -> 1; true -> 0 end,
104 +    ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
105 +    TimeoutOpt = {timeout, Timeout},
106 +    EmissionRef = make_ref(),
107 +    EmissionRefOpt = {ref, EmissionRef},
108 +
109 +    _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
110 +                                      [TimeoutOpt, EmissionRefOpt]),
111 +    _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
112 +                                       [TimeoutOpt, EmissionRefOpt]),
113 +    display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
114  
115  action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
116      Inform("Listing exchanges", []),
117      VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
118      ArgAtoms = default_if_empty(Args, [name, type]),
119 -    call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
120 -         ArgAtoms, Timeout);
121 +    call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
122 +                 ArgAtoms, [{timeout, Timeout}]);
123  
124  action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
125      Inform("Listing bindings", []),
126 @@ -652,27 +670,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
127      ArgAtoms = default_if_empty(Args, [source_name, source_kind,
128                                         destination_name, destination_kind,
129                                         routing_key, arguments]),
130 -    call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
131 -         ArgAtoms, Timeout);
132 +    call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
133 +                 ArgAtoms, [{timeout, Timeout}]);
134  
135  action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
136      Inform("Listing connections", []),
137      ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
138 -    call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
139 -         ArgAtoms, Timeout);
140 +    Nodes = nodes_in_cluster(Node, Timeout),
141 +    call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
142 +                 ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
143  
144  action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
145      Inform("Listing channels", []),
146      ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
147                                         messages_unacknowledged]),
148 -    call(Node, {rabbit_channel, info_all, [ArgAtoms]},
149 -         ArgAtoms, Timeout);
150 +    Nodes = nodes_in_cluster(Node, Timeout),
151 +    call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
152 +                 [{timeout, Timeout}, {chunks, length(Nodes)}]);
153  
154  action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
155      Inform("Listing consumers", []),
156      VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
157 -    call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
158 -         rabbit_amqqueue:consumer_info_keys(), Timeout);
159 +    Nodes = nodes_in_cluster(Node, Timeout),
160 +    call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
161 +                 rabbit_amqqueue:consumer_info_keys(),
162 +                 [{timeout, Timeout}, {chunks, length(Nodes)}]);
163  
164  action(node_health_check, Node, _Args, _Opts, Inform, Timeout) ->
165      Inform("Checking health of node ~p", [Node]),
166 @@ -788,17 +810,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
167                         {X, Value} -> Value
168                     end, IsEscaped) || X <- InfoItemKeys]).
169  
170 -display_info_message(IsEscaped) ->
171 +display_info_message(IsEscaped, InfoItemKeys) ->
172      fun ([], _) ->
173              ok;
174 -        ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
175 +        ([FirstResult|_] = List, _) when is_list(FirstResult) ->
176              lists:foreach(fun(Result) ->
177                                    display_info_message_row(IsEscaped, Result, InfoItemKeys)
178                            end,
179                            List),
180              ok;
181 -        (Result, InfoItemKeys) ->
182 -            display_info_message_row(IsEscaped, Result, InfoItemKeys)
183 +        (Result, _) ->
184 +            display_info_message_row(IsEscaped, Result, InfoItemKeys),
185 +            ok
186      end.
187  
188  display_info_list(Results, InfoItemKeys) when is_list(Results) ->
189 @@ -855,7 +878,10 @@ display_call_result(Node, MFA) ->
190      end.
191  
192  unsafe_rpc(Node, Mod, Fun, Args) ->
193 -    case rpc_call(Node, Mod, Fun, Args) of
194 +    unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
195 +
196 +unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
197 +    case rpc_call(Node, Mod, Fun, Args, Timeout) of
198          {badrpc, _} = Res -> throw(Res);
199          Normal            -> Normal
200      end.
201 @@ -874,33 +900,42 @@ ensure_app_running(Node) ->
202  call(Node, {Mod, Fun, Args}) ->
203      rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
204  
205 -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
206 -    call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
207 +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
208 +    Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
209 +    display_emission_result(Ref, InfoKeys, Opts).
210 +
211 +start_emission(Node, {Mod, Fun, Args}, Opts) ->
212 +    ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
213 +    Timeout = proplists:get_value(timeout, Opts, infinity),
214 +    Ref = proplists:get_value(ref, Opts, make_ref()),
215 +    rabbit_control_misc:spawn_emitter_caller(
216 +      Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
217 +      Ref, self(), Timeout),
218 +    Ref.
219 +
220 +display_emission_result(Ref, InfoKeys, Opts) ->
221 +    IsEscaped = proplists:get_value(is_escaped, Opts, false),
222 +    Chunks = proplists:get_value(chunks, Opts, 1),
223 +    Timeout = proplists:get_value(timeout, Opts, infinity),
224 +    EmissionStatus = rabbit_control_misc:wait_for_info_messages(
225 +                       self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
226 +    emission_to_action_result(EmissionStatus).
227 +
228 +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
229 +%% into form expected by rabbit_cli:main/3.
230 +emission_to_action_result({ok, ok}) ->
231 +    ok;
232 +emission_to_action_result({error, Error}) ->
233 +    Error.
234  
235 -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
236 -    call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
237 +prepare_call_args(Args, ToBinUtf8) ->
238 +    case ToBinUtf8 of
239 +        true  -> valid_utf8_args(Args);
240 +        false -> Args
241 +    end.
242  
243 -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
244 -    Args0 = case ToBinUtf8 of
245 -                true  -> lists:map(fun list_to_binary_utf8/1, Args);
246 -                false -> Args
247 -            end,
248 -    Ref = make_ref(),
249 -    Pid = self(),
250 -    spawn_link(
251 -      fun () ->
252 -              case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
253 -                                       Ref, Pid, Timeout) of
254 -                  {error, _} = Error        ->
255 -                      Pid ! {error, Error};
256 -                  {bad_argument, _} = Error ->
257 -                      Pid ! {error, Error};
258 -                  _                         ->
259 -                      ok
260 -              end
261 -      end),
262 -    rabbit_control_misc:wait_for_info_messages(
263 -      Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
264 +valid_utf8_args(Args) ->
265 +    lists:map(fun list_to_binary_utf8/1, Args).
266  
267  list_to_binary_utf8(L) ->
268      B = list_to_binary(L),
269 @@ -950,7 +985,10 @@ split_list([_])        -> exit(even_list_needed);
270  split_list([A, B | T]) -> [{A, B} | split_list(T)].
271  
272  nodes_in_cluster(Node) ->
273 -    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
274 +    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
275 +
276 +nodes_in_cluster(Node, Timeout) ->
277 +    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
278  
279  alarms_by_node(Name) ->
280      case rpc_call(Name, rabbit, status, []) of