1 # Copyright 2023 highstreet technologies GmbH
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 A Class representing an O-RAN radio unit (ORanRu)
20 import xml.etree.ElementTree as ET
22 from typing import Any, cast
24 from network_generation.model.python.nr_cell_du import NrCellDu
25 from network_generation.model.python.o_ran_du import ORanDu
26 from network_generation.model.python.o_ran_node import IORanNode, ORanNode
27 from network_generation.model.python.o_ran_termination_point import (
32 # Define the "IORanRu" interface
33 class IORanRu(IORanNode):
39 default_value: IORanRu = cast(
43 **{"cellCount": 1, "ruAngle": 120, "ruAzimuth": 0},
48 # Define an abstract O-RAN Node class
49 class ORanRu(ORanNode):
52 data: dict[str, Any] = cast(dict[str, Any], default_value),
53 **kwargs: dict[str, Any]
55 o_ran_ru_data: IORanRu = self._to_o_ran_ru_data(data)
56 super().__init__(cast(dict[str, Any], o_ran_ru_data), **kwargs)
57 self._cell_count: int = (
58 int(str(o_ran_ru_data["cellCount"]))
59 if o_ran_ru_data and "cellCount" in o_ran_ru_data
62 self._ru_angle: int = (
63 int(str(o_ran_ru_data["ruAngle"]))
64 if o_ran_ru_data and "ruAngle" in o_ran_ru_data
67 self._ru_azimuth: int = (
68 int(str(o_ran_ru_data["ruAzimuth"]))
69 if o_ran_ru_data and "ruAzimuth" in o_ran_ru_data
72 self._cells: list[NrCellDu] = self._create_cells()
73 name: str = self.name.replace("RU", "DU")
74 self.type = "ntsim-ng-o-ru"
76 o_ran_du_data: dict[str, Any] = {
78 "geoLocation": self.parent.geo_location,
79 "position": self.parent.position,
80 "layout": self.layout,
81 "parent": self.parent.parent.parent,
83 self._oRanDu: ORanDu = ORanDu(o_ran_du_data)
85 def _to_o_ran_ru_data(self, data: dict[str, Any]) -> IORanRu:
86 result: IORanRu = default_value
87 for key, key_type in IORanRu.__annotations__.items():
89 result[key] = data[key] # type: ignore
92 def _create_cells(self) -> list[NrCellDu]:
93 result: list[NrCellDu] = []
95 self.parent.parent.parent.parent.parent.parent
96 .configuration["pattern"]["nrCellDu"]
98 cell_angle: int = cell_config["cellAngle"]
99 cell_scale_factor: int = (
100 cell_config["cellScaleFactorForHandoverArea"]
102 maxReach: int = cell_config["maxReach"]
103 for index in range(self._cell_count):
104 s: str = "00" + str(index)
105 name: str = "-".join(
106 [self.name.replace("RU", "NRCellDu"), s[len(s) - 2: len(s)]]
108 azimuth: int = index * cell_angle + self._ru_azimuth
113 "geoLocation": self.geo_location,
114 "position": self.position,
115 "layout": self.layout,
117 "cellAngle": cell_angle,
118 "cellScaleFactorForHandoverArea": cell_scale_factor,
119 "maxReach": maxReach,
127 def cells(self) -> list[NrCellDu]:
131 def oRanDu(self) -> ORanDu:
134 def termination_points(self) -> list[ORanTerminationPoint]:
135 result: list[ORanTerminationPoint] = super().termination_points()
136 phy_tp: str = "-".join([self.name, "phy".upper()])
137 result.append(ORanTerminationPoint({"id": phy_tp, "name": phy_tp}))
138 for interface in ["ofhm", "ofhc", "ofhu", "ofhs"]:
139 id: str = "-".join([self.name, interface.upper()])
141 ORanTerminationPoint(
142 {"id": id, "name": id, "supporter": phy_tp, "parent": self}
145 for cell in self.cells:
146 result.extend(cell.termination_points())
149 def to_topology_nodes(self) -> list[dict[str, Any]]:
150 result: list[dict[str, Any]] = super().to_topology_nodes()
151 result.extend(self.oRanDu.to_topology_nodes())
154 def to_topology_links(self) -> list[dict[str, Any]]:
155 result: list[dict[str, Any]] = super().to_topology_links()
156 result.extend(self.oRanDu.to_topology_links())
157 for interface in ["phy", "ofhm", "ofhc", "ofhu", "ofhs"]:
158 link_id: str = "".join(
159 [interface, ":", self.name, "<->", self.oRanDu.name]
161 source_tp: str = "-".join([self.name, interface.upper()])
162 dest_tp: str = "-".join([self.oRanDu.name, interface.upper()])
167 "source-node": self.name,
168 "source-tp": source_tp,
171 "dest-node": self.oRanDu.name,
178 def toKml(self) -> ET.Element:
179 o_ran_ru: ET.Element = ET.Element("Folder")
180 open: ET.Element = ET.SubElement(o_ran_ru, "open")
182 name: ET.Element = ET.SubElement(o_ran_ru, "name")
183 name.text = self.name
184 for cell in self.cells:
185 o_ran_ru.append(cell.toKml())
188 def toSvg(self) -> ET.Element:
189 return ET.Element("to-be-implemented")
191 def to_directory(self, parent_dir: str) -> None:
192 self.oRanDu.to_directory(parent_dir)
193 parent_path = os.path.join(parent_dir, self.type)
194 path = os.path.join(parent_path, self.name)
195 if not os.path.exists(parent_path):
196 os.makedirs(parent_path, exist_ok=True)
197 if not os.path.exists(path):