X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=meta-starlingx%2Fmeta-stx-cloud%2Frecipes-extended%2Frabbitmq%2Ffiles%2Frabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch;fp=meta-starlingx%2Fmeta-stx-cloud%2Frecipes-extended%2Frabbitmq%2Ffiles%2Frabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch;h=0000000000000000000000000000000000000000;hb=6fc6934434f70595536a387ece31bc30141cafb5;hp=238d3d57040d594d61ee4e8c536bf87fe90d09a2;hpb=eb1e26510491ba49de693ab3b0498edcb06be6c5;p=pti%2Frtp.git diff --git a/meta-starlingx/meta-stx-cloud/recipes-extended/rabbitmq/files/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch b/meta-starlingx/meta-stx-cloud/recipes-extended/rabbitmq/files/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch deleted file mode 100644 index 238d3d5..0000000 --- a/meta-starlingx/meta-stx-cloud/recipes-extended/rabbitmq/files/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch +++ /dev/null @@ -1,280 +0,0 @@ -From: Alexey Lebedeff -Date: Wed, 9 Mar 2016 18:09:04 +0300 -Subject: [PATCH] Avoid RPC roundtrips in list commands - -Current implementation of various `list_XXX` commands require cross-node -roundtrip for every processed item - because `rabbitmqctl` target node -is responsible for gathering global list of all items of -interest (channels etc.) and then processing them one by one. - -For example, listing 10000 channels evenly distributed across 3 nodes -where network has 1ms delay takes more than 10 seconds on my -machine. And with the proposed change listing will take almost the same -time as it'll take to gather this info locally. E.g. in the case above -listing now takes 0.7 second on the same machine with same 1ms delay. - -It works by invoking emitting_map on every node, where it should send -info about only local items to aggregator, in an async fashion - as no -reply from aggregator is needed. - -diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl -index ea9d6a2..e6b168a 100644 ---- a/src/rabbit_control_main.erl -+++ b/src/rabbit_control_main.erl -@@ -23,7 +23,7 @@ - sync_queue/1, cancel_sync_queue/1, become/1, - purge_queue/1]). - ---import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). -+-import(rabbit_misc, [rpc_call/4, rpc_call/5]). - - -define(EXTERNAL_CHECK_INTERVAL, 1000). - -@@ -595,56 +595,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> - - action(list_users, Node, [], _Opts, Inform, Timeout) -> - Inform("Listing users", []), -- call(Node, {rabbit_auth_backend_internal, list_users, []}, -- rabbit_auth_backend_internal:user_info_keys(), true, Timeout); -+ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, -+ rabbit_auth_backend_internal:user_info_keys(), -+ [{timeout, Timeout}, to_bin_utf8]); - - action(list_permissions, Node, [], Opts, Inform, Timeout) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost \"~s\"", [VHost]), -- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, -- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, -- true); -+ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, -+ rabbit_auth_backend_internal:vhost_perms_info_keys(), -+ [{timeout, Timeout}, to_bin_utf8, is_escaped]); - - action(list_parameters, Node, [], Opts, Inform, Timeout) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing runtime parameters", []), -- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, -- rabbit_runtime_parameters:info_keys(), Timeout); -+ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, -+ rabbit_runtime_parameters:info_keys(), -+ [{timeout, Timeout}]); - - action(list_policies, Node, [], Opts, Inform, Timeout) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing policies", []), -- call(Node, {rabbit_policy, list_formatted, [VHostArg]}, -- rabbit_policy:info_keys(), Timeout); -+ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, -+ rabbit_policy:info_keys(), -+ [{timeout, Timeout}]); - - action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing vhosts", []), - ArgAtoms = default_if_empty(Args, [name]), -- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); -+ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, -+ [{timeout, Timeout}, to_bin_utf8]); - - action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> - {error_string, - "list_user_permissions expects a username argument, but none provided."}; - action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> - Inform("Listing permissions for user ~p", Args), -- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, -- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, -- true); -+ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, -+ rabbit_auth_backend_internal:user_perms_info_keys(), -+ [{timeout, Timeout}, to_bin_utf8, is_escaped]); - - action(list_queues, Node, Args, Opts, Inform, Timeout) -> -- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), - Inform("Listing queues", []), -+ %% User options -+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, messages]), -- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, -- ArgAtoms, Timeout); -+ -+ %% Data for emission -+ Nodes = nodes_in_cluster(Node, Timeout), -+ OnlineChunks = if Online -> length(Nodes); true -> 0 end, -+ OfflineChunks = if Offline -> 1; true -> 0 end, -+ ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, -+ TimeoutOpt = {timeout, Timeout}, -+ EmissionRef = make_ref(), -+ EmissionRefOpt = {ref, EmissionRef}, -+ -+ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, -+ [TimeoutOpt, EmissionRefOpt]), -+ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, -+ [TimeoutOpt, EmissionRefOpt]), -+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); - - action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> - Inform("Listing exchanges", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, type]), -- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, -- ArgAtoms, Timeout); -+ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, -+ ArgAtoms, [{timeout, Timeout}]); - - action(list_bindings, Node, Args, Opts, Inform, Timeout) -> - Inform("Listing bindings", []), -@@ -652,27 +670,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> - ArgAtoms = default_if_empty(Args, [source_name, source_kind, - destination_name, destination_kind, - routing_key, arguments]), -- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, -- ArgAtoms, Timeout); -+ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, -+ ArgAtoms, [{timeout, Timeout}]); - - action(list_connections, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), -- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, -- ArgAtoms, Timeout); -+ Nodes = nodes_in_cluster(Node, Timeout), -+ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, -+ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); - - action(list_channels, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, - messages_unacknowledged]), -- call(Node, {rabbit_channel, info_all, [ArgAtoms]}, -- ArgAtoms, Timeout); -+ Nodes = nodes_in_cluster(Node, Timeout), -+ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, -+ [{timeout, Timeout}, {chunks, length(Nodes)}]); - - action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> - Inform("Listing consumers", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), -- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, -- rabbit_amqqueue:consumer_info_keys(), Timeout); -+ Nodes = nodes_in_cluster(Node, Timeout), -+ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, -+ rabbit_amqqueue:consumer_info_keys(), -+ [{timeout, Timeout}, {chunks, length(Nodes)}]); - - action(node_health_check, Node, _Args, _Opts, Inform, Timeout) -> - Inform("Checking health of node ~p", [Node]), -@@ -788,17 +810,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> - {X, Value} -> Value - end, IsEscaped) || X <- InfoItemKeys]). - --display_info_message(IsEscaped) -> -+display_info_message(IsEscaped, InfoItemKeys) -> - fun ([], _) -> - ok; -- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> -+ ([FirstResult|_] = List, _) when is_list(FirstResult) -> - lists:foreach(fun(Result) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) - end, - List), - ok; -- (Result, InfoItemKeys) -> -- display_info_message_row(IsEscaped, Result, InfoItemKeys) -+ (Result, _) -> -+ display_info_message_row(IsEscaped, Result, InfoItemKeys), -+ ok - end. - - display_info_list(Results, InfoItemKeys) when is_list(Results) -> -@@ -855,7 +878,10 @@ display_call_result(Node, MFA) -> - end. - - unsafe_rpc(Node, Mod, Fun, Args) -> -- case rpc_call(Node, Mod, Fun, Args) of -+ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). -+ -+unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> -+ case rpc_call(Node, Mod, Fun, Args, Timeout) of - {badrpc, _} = Res -> throw(Res); - Normal -> Normal - end. -@@ -874,33 +900,42 @@ ensure_app_running(Node) -> - call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). - --call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> -- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). -+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> -+ Ref = start_emission(Node, {Mod, Fun, Args}, Opts), -+ display_emission_result(Ref, InfoKeys, Opts). -+ -+start_emission(Node, {Mod, Fun, Args}, Opts) -> -+ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), -+ Timeout = proplists:get_value(timeout, Opts, infinity), -+ Ref = proplists:get_value(ref, Opts, make_ref()), -+ rabbit_control_misc:spawn_emitter_caller( -+ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), -+ Ref, self(), Timeout), -+ Ref. -+ -+display_emission_result(Ref, InfoKeys, Opts) -> -+ IsEscaped = proplists:get_value(is_escaped, Opts, false), -+ Chunks = proplists:get_value(chunks, Opts, 1), -+ Timeout = proplists:get_value(timeout, Opts, infinity), -+ EmissionStatus = rabbit_control_misc:wait_for_info_messages( -+ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), -+ emission_to_action_result(EmissionStatus). -+ -+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value -+%% into form expected by rabbit_cli:main/3. -+emission_to_action_result({ok, ok}) -> -+ ok; -+emission_to_action_result({error, Error}) -> -+ Error. - --call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> -- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). -+prepare_call_args(Args, ToBinUtf8) -> -+ case ToBinUtf8 of -+ true -> valid_utf8_args(Args); -+ false -> Args -+ end. - --call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> -- Args0 = case ToBinUtf8 of -- true -> lists:map(fun list_to_binary_utf8/1, Args); -- false -> Args -- end, -- Ref = make_ref(), -- Pid = self(), -- spawn_link( -- fun () -> -- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, -- Ref, Pid, Timeout) of -- {error, _} = Error -> -- Pid ! {error, Error}; -- {bad_argument, _} = Error -> -- Pid ! {error, Error}; -- _ -> -- ok -- end -- end), -- rabbit_control_misc:wait_for_info_messages( -- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). -+valid_utf8_args(Args) -> -+ lists:map(fun list_to_binary_utf8/1, Args). - - list_to_binary_utf8(L) -> - B = list_to_binary(L), -@@ -950,7 +985,10 @@ split_list([_]) -> exit(even_list_needed); - split_list([A, B | T]) -> [{A, B} | split_list(T)]. - - nodes_in_cluster(Node) -> -- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). -+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). -+ -+nodes_in_cluster(Node, Timeout) -> -+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). - - alarms_by_node(Name) -> - case rpc_call(Name, rabbit, status, []) of