3 # ============LICENSE_START=======================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2021-2022 AT&T Intellectual Property. All rights
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END============================================
21 # ===================================================================
29 def read_json_topology_file (filename):
30 return json.loads(open(filename, "r").read())
32 def save_helm_override_file (helm_file, filename):
33 with open(filename, 'w') as file:
34 return yaml.dump(helm_file, file)
36 def get_name_of_node (node):
37 for name in node["name"]:
38 if name["value-name"] == "topology-node-name":
41 def search_all_topology_nodes (topology_json):
46 near_rt_ric_nodes = {}
49 for node in topology_json["tapi-common:context"]["tapi-topology:topology-context"]["topology"][0]["node"]:
51 if node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-ru-function":
52 ru_nodes[get_name_of_node(node)]=node
54 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-du-function":
55 du_nodes[get_name_of_node(node)]=node
56 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:user-equipment-function":
57 ue_nodes[get_name_of_node(node)]=node
58 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-cu-up-function":
59 cu_nodes[get_name_of_node(node)]=node
60 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:near-rt-ric-function":
61 near_rt_ric_nodes[get_name_of_node(node)]=node
62 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:smo-function":
63 smo_nodes[get_name_of_node(node)]=node
64 return {"ru_nodes":ru_nodes, "du_nodes": du_nodes, "ue_nodes": ue_nodes, "cu_nodes": cu_nodes, "near_rt_ric_nodes":near_rt_ric_nodes, "smo_nodes":smo_nodes}
66 def generate_ru_node (ru_nodes):
68 for ru_key in ru_nodes:
69 rus.append({"name":ru_key, "simulatedFaults":[]})
70 return {"ru_simulator":{"rus":rus}}
72 def generate_ru_faults ():
75 all_nodes=search_all_topology_nodes(read_json_topology_file (sys.argv[1]))
77 helm_override = {**helm_override, **generate_ru_node(all_nodes["ru_nodes"])}
80 print ("#RU:"+str(len(all_nodes["ru_nodes"])))
81 print ("#DU:"+str(len(all_nodes["du_nodes"])))
82 print ("#UE:"+str(len(all_nodes["ue_nodes"])))
83 print ("#CU:"+str(len(all_nodes["cu_nodes"])))
84 print ("#NEAR_RTRIC:"+str(len(all_nodes["near_rt_ric_nodes"])))
85 print ("#SMO:"+str(len(all_nodes["smo_nodes"])))
86 number_of_nodes=len(all_nodes["ru_nodes"])+len(all_nodes["du_nodes"])+len(all_nodes["ue_nodes"])+len(all_nodes["cu_nodes"])+len(all_nodes["near_rt_ric_nodes"])+len(all_nodes["smo_nodes"])
87 print ("#Nodes(total):"+str(number_of_nodes))
89 print(save_helm_override_file(helm_override, sys.argv[2]))