Add new tests to validate O1
[it/dep.git] / smo-install / test / topology-generator / network-topology-converter.py
1 #!/usr/bin/env python3
2 ###
3 # ============LICENSE_START=======================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2021-2022 AT&T Intellectual Property. All rights
7 #                             reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END============================================
21 # ===================================================================
22 #
23 ###
24
25 import json
26 import sys
27 import yaml
28
29 def read_json_topology_file (filename):
30         return json.loads(open(filename, "r").read())
31
32 def save_helm_override_file (helm_file, filename):
33         with open(filename, 'w') as file:
34                 return yaml.dump(helm_file, file)
35
36 def get_name_of_node (node):
37         for name in node["name"]:
38                 if name["value-name"] == "topology-node-name":
39                         return name["value"]
40
41 def search_all_topology_nodes (topology_json):
42         ru_nodes = {}
43         du_nodes = {}
44         ue_nodes = {}
45         cu_nodes = {}
46         near_rt_ric_nodes = {}
47         smo_nodes = {}
48
49         for node in topology_json["tapi-common:context"]["tapi-topology:topology-context"]["topology"][0]["node"]:
50
51                 if node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-ru-function":
52                         ru_nodes[get_name_of_node(node)]=node
53
54                 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-du-function":
55                         du_nodes[get_name_of_node(node)]=node
56                 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:user-equipment-function":
57                         ue_nodes[get_name_of_node(node)]=node
58                 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:o-cu-up-function":
59                         cu_nodes[get_name_of_node(node)]=node
60                 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:near-rt-ric-function":
61                         near_rt_ric_nodes[get_name_of_node(node)]=node
62                 elif node["o-ran-topology:function"] == "o-ran-common-identity-refs:smo-function":
63                         smo_nodes[get_name_of_node(node)]=node
64         return {"ru_nodes":ru_nodes, "du_nodes": du_nodes, "ue_nodes": ue_nodes, "cu_nodes": cu_nodes, "near_rt_ric_nodes":near_rt_ric_nodes, "smo_nodes":smo_nodes}
65
66 def generate_ru_node (ru_nodes):
67         rus=[]
68         for ru_key in ru_nodes:
69                 rus.append({"name":ru_key, "simulatedFaults":[]})
70         return {"ru_simulator":{"rus":rus}}
71
72 def generate_ru_faults ():
73         return {}
74
75 all_nodes=search_all_topology_nodes(read_json_topology_file (sys.argv[1]))
76 helm_override={}
77 helm_override = {**helm_override, **generate_ru_node(all_nodes["ru_nodes"])}
78
79
80 print ("#RU:"+str(len(all_nodes["ru_nodes"])))
81 print ("#DU:"+str(len(all_nodes["du_nodes"])))
82 print ("#UE:"+str(len(all_nodes["ue_nodes"])))
83 print ("#CU:"+str(len(all_nodes["cu_nodes"])))
84 print ("#NEAR_RTRIC:"+str(len(all_nodes["near_rt_ric_nodes"])))
85 print ("#SMO:"+str(len(all_nodes["smo_nodes"])))
86 number_of_nodes=len(all_nodes["ru_nodes"])+len(all_nodes["du_nodes"])+len(all_nodes["ue_nodes"])+len(all_nodes["cu_nodes"])+len(all_nodes["near_rt_ric_nodes"])+len(all_nodes["smo_nodes"])
87 print ("#Nodes(total):"+str(number_of_nodes))
88
89 print(save_helm_override_file(helm_override, sys.argv[2]))