Taking old commit for releasing image
[ric-plt/rtmgr.git] / pkg / rpe / rpe.go
index 5dd8f4d..b5c6961 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
@@ -26,20 +31,16 @@ package rpe
 
 import (
        "errors"
 
 import (
        "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/rtmgr"
+       "routing-manager/pkg/sbi"
+       "runtime"
        "strconv"
 )
 
 var (
        "strconv"
 )
 
 var (
-       SupportedRpes = []*RpeEngineConfig{
-               &RpeEngineConfig{
-                       Name:        "rmrpub",
-                       Version:     "pubsub",
-                       Protocol:    "rmruta",
-                       Instance:    NewRmrPub(),
-                       IsAvailable: true,
-               },
-               &RpeEngineConfig{
+       SupportedRpes = []*EngineConfig{
+               {
                        Name:        "rmrpush",
                        Version:     "pubsush",
                        Protocol:    "rmruta",
                        Name:        "rmrpush",
                        Version:     "pubsush",
                        Protocol:    "rmruta",
@@ -49,7 +50,7 @@ var (
        }
 )
 
        }
 )
 
-func GetRpe(rpeName string) (RpeEngine, error) {
+func GetRpe(rpeName string) (Engine, error) {
        for _, rpe := range SupportedRpes {
                if rpe.Name == rpeName && rpe.IsAvailable {
                        return rpe.Instance, nil
        for _, rpe := range SupportedRpes {
                if rpe.Name == rpeName && rpe.IsAvailable {
                        return rpe.Instance, nil
@@ -61,173 +62,239 @@ func GetRpe(rpeName string) (RpeEngine, error) {
 type Rpe struct {
 }
 
 type Rpe struct {
 }
 
-/*
-Gets the raw xApp list and generates the list of sender endpoints and receiver endpoint groups
-Returns the Tx EndpointList map where the key is the messge type and also returns the nested map of Rx EndpointList's map where keys are message type and xapp type
-Endpoint object's message type already transcoded to integer id
-*/
-
-func (r *Rpe) getRouteRxTxLists(eps rtmgr.Endpoints) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
-       txlist := make(map[string]rtmgr.EndpointList)
-       rxgroups := make(map[string]map[string]rtmgr.EndpointList)
-       for _, ep := range eps {
-               for _, message := range ep.RxMessages {
-                       messageid := rtmgr.MESSAGETYPES[message]
-                       if _, ok := rxgroups[messageid]; !ok {
-                               rxgroups[messageid] = make(map[string]rtmgr.EndpointList)
-                       }
-                       rxgroups[messageid][ep.XAppType] = append(rxgroups[messageid][ep.XAppType], (*ep))
+func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
+       for _, ep := range *eps {
+               if ep.Name == name {
+                       xapp.Logger.Debug("name: %s", ep.Name)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       return ep
                }
                }
-               for _, message := range ep.TxMessages {
-                       messageid := rtmgr.MESSAGETYPES[message]
-                       txlist[messageid] = append(txlist[messageid], (*ep))
+       }
+       return nil
+}
+
+func getEndpointListByName(eps *rtmgr.Endpoints, name string) []rtmgr.Endpoint {
+       var eplist []rtmgr.Endpoint
+
+       for _, ep := range *eps {
+               if ep.Name == name {
+                       xapp.Logger.Debug("name: %s", ep.Name)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       eplist = append(eplist, *ep)
                }
        }
                }
        }
-       return &txlist, &rxgroups
+       return eplist
 }
 
 }
 
-/*
-Gets the raw xapp list and creates a route table for
-Returns the array of route table entries
-*/
-func (r *Rpe) getRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
-       tx, rx := r.getRouteRxTxLists(eps)
-       var rt rtmgr.RouteTable
-       for _, messagetype := range rtmgr.MESSAGETYPES {
-               /*if _, ok := (*tx)[messagetype]; !ok {
-                       continue
+func getEndpointByUuid(uuid string) *rtmgr.Endpoint {
+       endPoints := rtmgr.Eps
+       for _, ep := range endPoints {
+               if ep.Uuid == uuid {
+                       xapp.Logger.Debug("name: %s", ep.Uuid)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       return ep
                }
                }
-               if _, ok := (*rx)[messagetype]; !ok {
-                       continue
-               }*/
-               txList, ok := (*tx)[messagetype]
-               if !ok {
-                       txList = rtmgr.EndpointList{}
+       }
+       return nil
+}
+
+func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
+       txList := rtmgr.EndpointList{}
+       rxList := []rtmgr.EndpointList{}
+
+       if tx == nil && rx == nil {
+               pc, _, _, ok := runtime.Caller(1)
+               details := runtime.FuncForPC(pc)
+               if ok && details != nil {
+                       xapp.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
                }
                }
-               var rxgroups []rtmgr.EndpointList
-               for _, endpointlist := range (*rx)[messagetype] {
-                       rxgroups = append(rxgroups, endpointlist)
+       } else {
+               if tx != nil {
+                       txList = rtmgr.EndpointList{*tx}
                }
                }
-               if len(txList) > 0 || len(rxgroups) > 0 {
-                       rte := rtmgr.RouteTableEntry{
-                               messagetype,
-                               txList,
-                               rxgroups,
-                               -1,
-                       }
-                       rt = append(rt, rte)
+               if rx != nil {
+                       rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
                }
                }
+               messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+               route := rtmgr.RouteTableEntry{
+                       MessageType: messageId,
+                       TxList:      txList,
+                       RxGroups:    rxList,
+                       SubID:       subId,
+                       RouteType:   routeType}
+               *routeTable = append(*routeTable, route)
+               //              xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+               //              xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
        }
        }
-       r.addStaticRoutes(eps, &rt)
-       r.addSubscriptionRoutes(eps, &rt, &rtmgr.Subs)
-       return &rt
 }
 
 }
 
-/*
-Adds specific static routes to the route table
-which cannot be calculated with endpoint tx/rx message types.
-*/
-func (r *Rpe) addStaticRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable) {
-       var uemanep, submanep *rtmgr.Endpoint
-       for _, ep := range eps {
-               if ep.Name == "UEMAN" {
-                       uemanep = ep
-               }
-               if ep.Name == "SUBMAN" {
-                       submanep = ep
-               }
+func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
+       txList := rtmgr.EndpointList{}
+       rxList := []rtmgr.EndpointList{}
+
+       if tx != nil {
+               txList = rtmgr.EndpointList{*tx}
        }
 
        }
 
-       if uemanep != nil && submanep != nil {
-               txlist := rtmgr.EndpointList{*uemanep}
-               rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*submanep}}
-               rte1 := rtmgr.RouteTableEntry{
-                       rtmgr.MESSAGETYPES["RIC_SUB_REQ"],
-                       txlist,
-                       rxlist,
-                       -1,
+       if rx != nil {
+               for _, item := range rx {
+                       ep := []rtmgr.Endpoint{item}
+                       rxList = append(rxList, ep)
                }
                }
-               rte2 := rtmgr.RouteTableEntry{
-                       rtmgr.MESSAGETYPES["RIC_SUB_DEL_REQ"],
-                       txlist,
-                       rxlist,
-                       -1,
-               }
-               *rt = append(*rt, rte1)
-               *rt = append(*rt, rte2)
-       } else {
-               rtmgr.Logger.Warn("Cannot get the static route details of the platform components UEMAN/SUBMAN")
        }
        }
+
+       messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+       route := rtmgr.RouteTableEntry{
+               MessageType: messageId,
+               TxList:      txList,
+               RxGroups:    rxList,
+               SubID:       subId,
+               RouteType:   routeType}
+       *routeTable = append(*routeTable, route)
+       //      xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+       //      xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
 }
 
 }
 
-func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
-       for _, ep := range *eps {
-               if ep.Name == name {
-                       rtmgr.Logger.Debug("name: %s", ep.Name)
-                       rtmgr.Logger.Debug("ep: %v", ep)
-                       return ep
+func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.generateXappRoutes invoked")
+       xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
+       if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) {
+               /// TODO ---
+               //xApp -> Subscription Manager
+               r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1, "")
+               r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1, "")
+               //xApp -> E2 Termination
+               //              r.addRoute("RIC_CONTROL_REQ", xAppEp, e2TermEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_REQ", xAppEp, nil, routeTable, -1, "%meid")
+               //E2 Termination -> xApp
+               ///             r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, -1, "")
+               ///             r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, -1, "")
+       }
+       //xApp->A1Mediator
+       if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.Policies) > 0 {
+               xapp.Logger.Debug("rpe.generateXappRoutes found policies section")
+               for _, policy := range xAppEp.Policies {
+                       r.addRoute("A1_POLICY_REQ", nil, xAppEp, routeTable, policy, "")
                }
        }
                }
        }
-       return nil
+
 }
 
 }
 
-func getEndpointByUuid(eps *rtmgr.Endpoints, uuid string) *rtmgr.Endpoint {
-       for _, ep := range *eps {
-               if ep.Uuid == uuid {
-                       rtmgr.Logger.Debug("name: %s", ep.Uuid)
-                       rtmgr.Logger.Debug("ep: %v", ep)
-                       return ep
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
+       subscriptionList := &rtmgr.Subs
+       for _, subscription := range *subscriptionList {
+               xapp.Logger.Debug("Subscription: %v", subscription)
+               xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
+               xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+               xAppEp := getEndpointByUuid(xAppUuid)
+               if xAppEp.Uuid == selectedxAppEp.Uuid {
+                       xapp.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
+                       /// TODO
+                       //Subscription Manager -> xApp
+                       r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                       //E2 Termination -> xApp
+                       r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "")
                }
        }
                }
        }
-       return nil
 }
 }
-func (r *Rpe) addSubscriptionRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable, subs *rtmgr.SubscriptionList) {
-       rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
-       rtmgr.Logger.Debug("params: %v", eps)
-       var e2termep, submanep, xappEp *rtmgr.Endpoint
-       var xappName string
-       e2termep = getEndpointByName(&eps, "E2TERM")
-       submanep = getEndpointByName(&eps, "SUBMAN")
-       if e2termep != nil && submanep != nil {
-               // looping through the subscription list, add routes one by one
-               for _, sub := range *subs {
-                       // SubMan -> XApp
-                       xappName = sub.Fqdn + ":" + strconv.Itoa(int(sub.Port))
-                       xappEp = getEndpointByUuid(&eps, xappName)
-                       if xappEp == nil {
-                               rtmgr.Logger.Error("XApp not found: %s", xappName)
-                               rtmgr.Logger.Debug("Endpoints: %v", eps)
-                       } else {
-                               txlist := rtmgr.EndpointList{*submanep}
-                               rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
-                               subManMsgs := []string{"RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE"}
-                               for _, entry := range subManMsgs {
-                                       rte := rtmgr.RouteTableEntry{
-                                               rtmgr.MESSAGETYPES[entry],
-                                               txlist,
-                                               rxlist,
-                                               sub.SubID,
-                                       }
-                                       *rt = append(*rt, rte)
-                               }
-                               // E2Term -> XApp
-                               txlist = rtmgr.EndpointList{*e2termep}
-                               rxlist = []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
-                               e2apMsgs := []string{"RIC_CONTROL_ACK", "RIC_CONTROL_FAILURE", "RIC_INDICATION"}
-                               for _, entry := range e2apMsgs {
-                                       rte := rtmgr.RouteTableEntry{
-                                               rtmgr.MESSAGETYPES[entry],
-                                               txlist,
-                                               rxlist,
-                                               sub.SubID,
-                                       }
-                                       *rt = append(*rt, rte)
-                               }
-                       }
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
+       //Platform Routes --- Subscription Routes
+       //Subscription Manager -> E2 Termination
+       for _, routes := range *rtmgr.PrsCfg {
+               var sendEp *rtmgr.Endpoint
+               var Ep *rtmgr.Endpoint
+               switch routes.SenderEndPoint {
+               case "SUBMAN":
+                       sendEp = subManEp
+               case "E2MAN":
+                       sendEp = e2ManEp
+               //case "UEMAN":
+               //      sendEp = ueManEp
+               case "RSM":
+                       sendEp = rsmEp
+               case "A1MEDIATOR":
+                       sendEp = a1mediatorEp
                }
                }
-               rtmgr.Logger.Debug("addSubscriptionRoutes eps: %v", eps)
-       } else {
-               rtmgr.Logger.Warn("Subscription route update failure: Cannot get the static route details of the platform components E2TERM/SUBMAN")
+               switch routes.EndPoint {
+               case "SUBMAN":
+                       Ep = subManEp
+               case "E2MAN":
+                       Ep = e2ManEp
+               //case "UEMAN":
+               //      Ep = ueManEp
+               case "RSM":
+                       Ep = rsmEp
+               case "A1MEDIATOR":
+                       Ep = a1mediatorEp
+               }
+
+               r.addRoute(routes.MessageType, sendEp, Ep, routeTable, routes.SubscriptionId, routes.Meid)
+       }
+
+       if len(e2TermEp) > 0 {
+               r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
+       }
+}
+
+func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
+       xapp.Logger.Debug("rpe.generateRouteTable invoked")
+       xapp.Logger.Debug("Endpoint List:  %v", endPointList)
+       routeTable := &rtmgr.RouteTable{}
+       /*e2TermEp := getEndpointByName(&endPointList, "E2TERM")
+       if e2TermEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Termination")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }*/
+       subManEp := getEndpointByName(&endPointList, "SUBMAN")
+       if subManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+       if e2ManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       /*ueManEp := getEndpointByName(&endPointList, "UEMAN")
+       if ueManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "UE Manger")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }*/
+       rsmEp := getEndpointByName(&endPointList, "RSM")
+       if rsmEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
+       if A1MediatorEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
 
        }
 
+       e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+       if len(e2TermListEp) == 0 {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+
+       for _, endPoint := range endPointList {
+               xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+               if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+                       r.generateXappRoutes(endPoint, subManEp, routeTable)
+                       r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
+               }
+       }
+       return routeTable
 }
 }