Add to_directory method to relevant object classes
[oam.git] / code / network-generator / network_generation / model / python / o_ran_du.py
index 96baab1..98e8c1a 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-#!/usr/bin/python
+# !/usr/bin/python
 
 """
 A Class representing an O-RAN distributed unit (ORanDu)
 """
-from typing import overload
-from network_generation.model.python.o_ran_object import IORanObject
-from network_generation.model.python.o_ran_node import ORanNode
-from network_generation.model.python.o_ran_termination_point import ORanTerminationPoint
 import xml.etree.ElementTree as ET
+import os
+from typing import Any, cast
+
+from network_generation.model.python.o_ran_node import IORanNode, ORanNode
+from network_generation.model.python.o_ran_termination_point import (
+    ORanTerminationPoint,
+)
 
 
 # Define the "IORanDu" interface
-class IORanDu(IORanObject):
-    def __init__(self, o_ran_ru_count: int, **kwargs):
-        super().__init__(**kwargs)
-        self._o_ran_ru_count = o_ran_ru_count
+class IORanDu(IORanNode):
+    o_ran_ru_count: int
+
+
+default_value: IORanDu = cast(
+    IORanDu,
+    {
+        **ORanNode.default(),
+        **{"oRanRuCount": 1},
+    },
+)
 
 
 # Define an abstract O-RAN Node class
-class ORanDu(ORanNode, IORanDu):
-    def __init__(self, o_ran_du_data: IORanDu = None, **kwargs):
-        super().__init__(o_ran_du_data, **kwargs)
+class ORanDu(ORanNode):
+    def __init__(
+        self,
+        data: dict[str, Any] = cast(dict[str, Any], default_value),
+        **kwargs: dict[str, Any]
+    ) -> None:
+        o_ran_du_data: IORanDu = self._to_o_ran_du_data(data)
+        super().__init__(cast(dict[str, Any], o_ran_du_data), **kwargs)
         self._o_ran_ru_count = (
-            o_ran_du_data["oRanRuCount"] if o_ran_du_data and "oRanRuCount" in o_ran_du_data else 1
+            o_ran_du_data["oRanRuCount"]
+            if o_ran_du_data and "oRanRuCount" in o_ran_du_data
+            else 1
         )
+        self.type = "ntsim-ng-o-du"
+
+    def _to_o_ran_du_data(self, data: dict[str, Any]) -> IORanDu:
+        result: IORanDu = default_value
+        for key, key_type in IORanDu.__annotations__.items():
+            if key in data:
+                result[key] = data[key]  # type: ignore
+        return result
 
-    @property
     def termination_points(self) -> list[ORanTerminationPoint]:
-        result: list[ORanTerminationPoint] = super().termination_points
+        result: list[ORanTerminationPoint] = super().termination_points()
         phy_tp: str = "-".join([self.name, "phy".upper()])
         result.append(ORanTerminationPoint({"id": phy_tp, "name": phy_tp}))
-        for interface in ["e2", "o1", "ofhm", "ofhc", "ofhu","ofhs"]:
-            id:str = "-".join([self.name, interface.upper()])
-            result.append(ORanTerminationPoint({"id": id, "name":id, "supporter": phy_tp, "parent":self}))
+        for interface in ["e2", "o1", "ofhm", "ofhc", "ofhu", "ofhs"]:
+            id: str = "-".join([self.name, interface.upper()])
+            result.append(
+                ORanTerminationPoint(
+                    {"id": id, "name": id, "supporter": phy_tp, "parent": self}
+                )
+            )
         return result
 
-    def to_topology_nodes(self) -> list[dict[str, dict]]:
-        result: list[dict[str, dict]] = super().to_topology_nodes()
+    def to_topology_nodes(self) -> list[dict[str, Any]]:
+        result: list[dict[str, Any]] = super().to_topology_nodes()
         return result
 
-    def to_topology_links(self) -> list[dict[str, dict]]:
-        result: list[dict[str, dict]] = super().to_topology_links()
+    def to_topology_links(self) -> list[dict[str, Any]]:
+        result: list[dict[str, Any]] = super().to_topology_links()
         for interface in ["e2", "o1"]:
-            link_id: str = "".join([interface, ":", self.name, "<->", self.parent.name])
+            link_id: str = "".join(
+                [interface, ":", self.name, "<->", self.parent.name]
+            )
             source_tp: str = "-".join([self.name, interface.upper()])
             dest_tp: str = "-".join([self.parent.name, interface.upper()])
             result.append(
                 {
                     "link-id": link_id,
-                    "source": {"source-node": self.name, "source-tp": source_tp},
-                    "destination": {"dest-node": self.parent.name, "dest-tp": dest_tp},
+                    "source": {
+                        "source-node": self.name,
+                        "source-tp": source_tp,
+                    },
+                    "destination": {
+                        "dest-node": self.parent.name,
+                        "dest-tp": dest_tp,
+                    },
                 }
             )
         return result
-    
+
     def toKml(self) -> ET.Element:
         o_ran_du: ET.Element = ET.Element("Folder")
         open: ET.Element = ET.SubElement(o_ran_du, "open")
         open.text = "1"
         name: ET.Element = ET.SubElement(o_ran_du, "name")
         name.text = self.name
-        for tower in self.towers:
-            o_ran_du.append(tower.toKml())
+        for tower in self.towers:
+            o_ran_du.append(tower.toKml())
         return o_ran_du
 
-    def toSvg(self) -> None:
-        return None
+    def toSvg(self) -> ET.Element:
+        return ET.Element("to-be-implemented")
+
+    def to_directory(self, parent_dir: str) -> None:
+        parent_path = os.path.join(parent_dir, self.type)
+        path = os.path.join(parent_path, self.name)
+        if not os.path.exists(parent_path):
+            os.makedirs(parent_path, exist_ok=True)
+        if not os.path.exists(path):
+            os.mkdir(path)