Add initial meta-stx to support StarlingX build
[pti/rtp.git] / meta-stx / recipes-extended / rabbitmq / files / rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch
diff --git a/meta-stx/recipes-extended/rabbitmq/files/rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch b/meta-stx/recipes-extended/rabbitmq/files/rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch
new file mode 100644 (file)
index 0000000..e6c6fe9
--- /dev/null
@@ -0,0 +1,388 @@
+From: Alexey Lebedeff <alebedev@mirantis.com>
+Date: Wed, 9 Mar 2016 14:55:02 +0300
+Subject: [PATCH] Avoid RPC roundtrips while listing items
+
+- Emit info about particular items in parallel on every node, with
+  results delivered directly to a `rabbitmqctl` instance.
+- `rabbit_control_misc:wait_for_info_messages/5` can wait for results of
+  more than one emitting map.
+- Stop passing arround InfoItemKeys in
+  `rabbit_control_misc:wait_for_info_messages/5`, the same information
+  could be directly encoded in DisplayFun closure.
+- Add `emit` to function names, to avoid confusion with regular ones
+  which return result directly.
+
+Part of https://github.com/rabbitmq/rabbitmq-server/pull/683
+
+diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
+index 27b352a..e09e02c 100644
+--- a/src/rabbit_amqqueue.erl
++++ b/src/rabbit_amqqueue.erl
+@@ -25,10 +25,10 @@
+          check_exclusive_access/2, with_exclusive_access_or_die/3,
+          stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
+-         info_all/6, info_local/1]).
++         emit_info_all/5, list_local/1, info_local/1]).
+ -export([list_down/1]).
+ -export([force_event_refresh/1, notify_policy_changed/1]).
+--export([consumers/1, consumers_all/1,  consumers_all/3, consumer_info_keys/0]).
++-export([consumers/1, consumers_all/1,  emit_consumers_all/4, consumer_info_keys/0]).
+ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
+ -export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
+ -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
+@@ -41,7 +41,8 @@
+ %% internal
+ -export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-         set_ram_duration_target/2, set_maximum_since_use/2]).
++         set_ram_duration_target/2, set_maximum_since_use/2,
++         emit_info_local/4, emit_info_down/4, emit_consumers_local/3]).
+ -include("rabbit.hrl").
+ -include_lib("stdlib/include/qlc.hrl").
+@@ -117,10 +118,6 @@
+ -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+ -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
+           [rabbit_types:infos()].
+--spec info_all
+-        (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(),
+-         reference(), pid()) ->
+-            'ok'.
+ -spec force_event_refresh(reference()) -> 'ok'.
+ -spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
+ -spec consumers(rabbit_types:amqqueue()) ->
+@@ -130,7 +127,6 @@
+ -spec consumers_all(rabbit_types:vhost()) ->
+           [{name(), pid(), rabbit_types:ctag(), boolean(),
+             non_neg_integer(), rabbit_framing:amqp_table()}].
+--spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
+ -spec stat(rabbit_types:amqqueue()) ->
+           {'ok', non_neg_integer(), non_neg_integer()}.
+ -spec delete_immediately(qpids()) -> 'ok'.
+@@ -627,16 +623,18 @@ info_all(VHostPath, Items) ->
+     map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
+         map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
+-info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) ->
+-    NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler(
+-                         AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath),
+-                         continue),
+-    NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler(
+-                          AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
+-                          list_down(VHostPath),
+-                          continue),
+-    %% Previous maps are incomplete, finalize emission
+-    rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
++emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
++    rabbit_control_misc:emitting_map_with_exit_handler(
++      AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).
++
++emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
++    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
++    rabbit_control_misc:await_emitters_termination(Pids).
++
++emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
++    rabbit_control_misc:emitting_map_with_exit_handler(
++      AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
++      list_down(VHostPath)).
+ info_local(VHostPath) ->
+     map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
+@@ -664,12 +662,17 @@ consumers_all(VHostPath) ->
+       map(list(VHostPath),
+           fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
+-consumers_all(VHostPath, Ref, AggregatorPid) ->
++emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
++    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
++    rabbit_control_misc:await_emitters_termination(Pids),
++    ok.
++
++emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
+     ConsumerInfoKeys = consumer_info_keys(),
+     rabbit_control_misc:emitting_map(
+       AggregatorPid, Ref,
+       fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
+-      list(VHostPath)).
++      list_local(VHostPath)).
+ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
+     [lists:zip(ConsumerInfoKeys,
+diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
+index ab7d38d..837a892 100644
+--- a/src/rabbit_channel.erl
++++ b/src/rabbit_channel.erl
+@@ -56,7 +56,7 @@
+ -export([send_command/2, deliver/4, deliver_reply/2,
+          send_credit_reply/2, send_drained/2]).
+ -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
+-         info_all/3, info_local/1]).
++         emit_info_all/4, info_local/1]).
+ -export([refresh_config_local/0, ready_for_close/1]).
+ -export([force_event_refresh/1]).
+@@ -64,7 +64,7 @@
+          handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+          prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+ %% Internal
+--export([list_local/0, deliver_reply_local/3]).
++-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
+ -export([get_vhost/1, get_user/1]).
+ -record(ch, {
+@@ -220,7 +220,6 @@
+ -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+ -spec info_all() -> [rabbit_types:infos()].
+ -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
+--spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
+ -spec refresh_config_local() -> 'ok'.
+ -spec ready_for_close(pid()) -> 'ok'.
+ -spec force_event_refresh(reference()) -> 'ok'.
+@@ -329,9 +328,16 @@ info_all(Items) ->
+ info_local(Items) ->
+     rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).
+-info_all(Items, Ref, AggregatorPid) ->
++emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
++    Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
++    rabbit_control_misc:await_emitters_termination(Pids).
++
++emit_info_local(Items, Ref, AggregatorPid) ->
++    emit_info(list_local(), Items, Ref, AggregatorPid).
++
++emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
+     rabbit_control_misc:emitting_map_with_exit_handler(
+-      AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
++      AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).
+ refresh_config_local() ->
+     rabbit_misc:upmap(
+diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl
+index 2e1f6cc..3b0c60b 100644
+--- a/src/rabbit_control_misc.erl
++++ b/src/rabbit_control_misc.erl
+@@ -17,7 +17,8 @@
+ -module(rabbit_control_misc).
+ -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
+-         emitting_map_with_exit_handler/5, wait_for_info_messages/5,
++         emitting_map_with_exit_handler/5, wait_for_info_messages/6,
++         spawn_emitter_caller/7, await_emitters_termination/1,
+          print_cmd_result/2]).
+ -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
+@@ -25,7 +26,14 @@
+ -spec emitting_map_with_exit_handler
+         (pid(), reference(), fun(), list()) -> 'ok'.
+ -spec emitting_map_with_exit_handler
+-        (pid(), reference(), fun(), list(), atom()) -> 'ok'.
++        (pid(), reference(), fun(), list(), 'continue') -> 'ok'.
++
++-type fold_fun() :: fun ((term(), term()) -> term()).
++
++-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}.
++-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
++-spec await_emitters_termination ([pid()]) -> 'ok'.
++
+ -spec print_cmd_result(atom(), term()) -> 'ok'.
+ emitting_map(AggregatorPid, Ref, Fun, List) ->
+@@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) ->
+             ok
+     end.
+-wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) ->
+-    _ = notify_if_timeout(Pid, Ref, Timeout),
+-    wait_for_info_messages(Ref, ArgAtoms, DisplayFun).
++%% Invokes RPC for async info collection in separate (but linked to
++%% the caller) process. Separate process waits for RPC to finish and
++%% in case of errors sends them in wait_for_info_messages/5-compatible
++%% form to aggregator process. Calling process is then expected to
++%% do blocking call of wait_for_info_messages/5.
++%%
++%% Remote function MUST use calls to emitting_map/4 (and other
++%% emitting_map's) to properly deliver requested information to an
++%% aggregator process.
++%%
++%% If for performance reasons several parallel emitting_map's need to
++%% be run, remote function MUST NOT return until all this
++%% emitting_map's are done. And during all this time remote RPC
++%% process MUST be linked to emitting
++%% processes. await_emitters_termination/1 helper can be used as a
++%% last statement of remote function to ensure this behaviour.
++spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
++    spawn_monitor(
++      fun () ->
++              case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of
++                  {error, _} = Error        ->
++                      Pid ! {Ref, error, Error};
++                  {bad_argument, _} = Error ->
++                      Pid ! {Ref, error, Error};
++                  {badrpc, _} = Error       ->
++                      Pid ! {Ref, error, Error};
++                  _                         ->
++                      ok
++              end
++      end),
++    ok.
++
++rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
++    rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
++
++%% Agregator process expects correct numbers of explicits ACKs about
++%% finished emission process. While everything is linked, we still
++%% need somehow to wait for termination of all emitters before
++%% returning from RPC call - otherwise links will be just broken with
++%% reason 'normal' and we can miss some errors, and subsequentially
++%% hang.
++await_emitters_termination(Pids) ->
++    Monitors = [erlang:monitor(process, Pid) || Pid <- Pids],
++    collect_monitors(Monitors).
+-wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) ->
++collect_monitors([]) ->
++    ok;
++collect_monitors([Monitor|Rest]) ->
+     receive
+-        {Ref,  finished}         ->
+-            ok;
+-        {Ref,  {timeout, T}}     ->
++        {'DOWN', Monitor, _Pid, normal} ->
++            collect_monitors(Rest);
++        {'DOWN', Monitor, _Pid, noproc} ->
++            %% There is a link and a monitor to a process. Matching
++            %% this clause means that process has gracefully
++            %% terminated even before we've started monitoring.
++            collect_monitors(Rest);
++        {'DOWN', _, Pid, Reason} ->
++            exit({emitter_exit, Pid, Reason})
++    end.
++
++%% Wait for result of one or more calls to emitting_map-family
++%% functions.
++%%
++%% Number of expected acknowledgments is specified by ChunkCount
++%% argument. Most common usage will be with ChunkCount equals to
++%% number of live nodes, but it's not mandatory - thus more generic
++%% name of 'ChunkCount' was chosen.
++wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) ->
++    notify_if_timeout(Pid, Ref, Timeout),
++    wait_for_info_messages(Ref, Fun, Acc0, ChunkCount).
++
++wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) ->
++    receive
++        {Ref, finished} when ChunksLeft =:= 1 ->
++            {ok, Acc0};
++        {Ref, finished} ->
++            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1);
++        {Ref, {timeout, T}} ->
+             exit({error, {timeout, (T / 1000)}});
+-        {Ref,  []}               ->
+-            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
+-        {Ref,  Result, continue} ->
+-            DisplayFun(Result, InfoItemKeys),
+-            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
+-        {error, Error}           ->
+-            Error;
+-        _                        ->
+-            wait_for_info_messages(Ref, InfoItemKeys, DisplayFun)
++        {Ref, []} ->
++            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
++        {Ref, Result, continue} ->
++            wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft);
++        {Ref, error, Error} ->
++            {error, simplify_emission_error(Error)};
++        {'DOWN', _MRef, process, _Pid, normal} ->
++            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
++        {'DOWN', _MRef, process, _Pid, Reason} ->
++            {error, simplify_emission_error(Reason)};
++        _Msg ->
++            wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft)
+     end.
++simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) ->
++    EmissionError;
++simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) ->
++    EmissionError;
++simplify_emission_error(Anything) ->
++    {error, Anything}.
++
++notify_if_timeout(_, _, infinity) ->
++    ok;
+ notify_if_timeout(Pid, Ref, Timeout) ->
+     timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).
+diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
+index 8965c59..9341ea9 100644
+--- a/src/rabbit_misc.erl
++++ b/src/rabbit_misc.erl
+@@ -75,7 +75,7 @@
+ -export([get_env/3]).
+ -export([get_channel_operation_timeout/0]).
+ -export([random/1]).
+--export([rpc_call/4, rpc_call/5, rpc_call/7]).
++-export([rpc_call/4, rpc_call/5]).
+ -export([report_default_thread_pool_size/0]).
+ -export([get_gc_info/1]).
+@@ -264,8 +264,6 @@
+ -spec random(non_neg_integer()) -> non_neg_integer().
+ -spec rpc_call(node(), atom(), atom(), [any()]) -> any().
+ -spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
+--spec rpc_call
+-        (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any().
+ -spec report_default_thread_pool_size() -> 'ok'.
+ -spec get_gc_info(pid()) -> integer().
+@@ -1184,9 +1182,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) ->
+                            rpc:call(Node, Mod, Fun, Args, Timeout)
+     end.
+-rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
+-    rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
+-
+ guess_number_of_cpu_cores() ->
+     case erlang:system_info(logical_processors_available) of
+         unknown -> % Happens on Mac OS X.
+diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
+index 5bf30ff..63e3ed0 100644
+--- a/src/rabbit_networking.erl
++++ b/src/rabbit_networking.erl
+@@ -33,7 +33,8 @@
+          node_listeners/1, register_connection/1, unregister_connection/1,
+          connections/0, connection_info_keys/0,
+          connection_info/1, connection_info/2,
+-         connection_info_all/0, connection_info_all/1, connection_info_all/3,
++         connection_info_all/0, connection_info_all/1,
++         emit_connection_info_all/4, emit_connection_info_local/3,
+          close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
+ %% Used by TCP-based transports, e.g. STOMP adapter
+@@ -89,8 +90,6 @@
+ -spec connection_info_all() -> [rabbit_types:infos()].
+ -spec connection_info_all(rabbit_types:info_keys()) ->
+           [rabbit_types:infos()].
+--spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) ->
+-          'ok'.
+ -spec close_connection(pid(), string()) -> 'ok'.
+ -spec force_connection_event_refresh(reference()) -> 'ok'.
+@@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+ connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+-connection_info_all(Items, Ref, AggregatorPid) ->
++emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
++    Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
++    rabbit_control_misc:await_emitters_termination(Pids),
++    ok.
++
++emit_connection_info_local(Items, Ref, AggregatorPid) ->
+     rabbit_control_misc:emitting_map_with_exit_handler(
+       AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
+-      connections()).
++      connections_local()).
+ close_connection(Pid, Explanation) ->
+     rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),