Creating dymanic endpoints(Eps) and distributing routes to those endpoints on upgrad...
[ric-plt/rtmgr.git] / pkg / rpe / rpe.go
index 5dd8f4d..4097ae0 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
@@ -26,20 +31,16 @@ package rpe
 
 import (
        "errors"
 
 import (
        "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/rtmgr"
+       "routing-manager/pkg/sbi"
+       "runtime"
        "strconv"
 )
 
 var (
        "strconv"
 )
 
 var (
-       SupportedRpes = []*RpeEngineConfig{
-               &RpeEngineConfig{
-                       Name:        "rmrpub",
-                       Version:     "pubsub",
-                       Protocol:    "rmruta",
-                       Instance:    NewRmrPub(),
-                       IsAvailable: true,
-               },
-               &RpeEngineConfig{
+       SupportedRpes = []*EngineConfig{
+               {
                        Name:        "rmrpush",
                        Version:     "pubsush",
                        Protocol:    "rmruta",
                        Name:        "rmrpush",
                        Version:     "pubsush",
                        Protocol:    "rmruta",
@@ -49,7 +50,7 @@ var (
        }
 )
 
        }
 )
 
-func GetRpe(rpeName string) (RpeEngine, error) {
+func GetRpe(rpeName string) (Engine, error) {
        for _, rpe := range SupportedRpes {
                if rpe.Name == rpeName && rpe.IsAvailable {
                        return rpe.Instance, nil
        for _, rpe := range SupportedRpes {
                if rpe.Name == rpeName && rpe.IsAvailable {
                        return rpe.Instance, nil
@@ -61,173 +62,274 @@ func GetRpe(rpeName string) (RpeEngine, error) {
 type Rpe struct {
 }
 
 type Rpe struct {
 }
 
-/*
-Gets the raw xApp list and generates the list of sender endpoints and receiver endpoint groups
-Returns the Tx EndpointList map where the key is the messge type and also returns the nested map of Rx EndpointList's map where keys are message type and xapp type
-Endpoint object's message type already transcoded to integer id
-*/
-
-func (r *Rpe) getRouteRxTxLists(eps rtmgr.Endpoints) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
-       txlist := make(map[string]rtmgr.EndpointList)
-       rxgroups := make(map[string]map[string]rtmgr.EndpointList)
-       for _, ep := range eps {
-               for _, message := range ep.RxMessages {
-                       messageid := rtmgr.MESSAGETYPES[message]
-                       if _, ok := rxgroups[messageid]; !ok {
-                               rxgroups[messageid] = make(map[string]rtmgr.EndpointList)
-                       }
-                       rxgroups[messageid][ep.XAppType] = append(rxgroups[messageid][ep.XAppType], (*ep))
-               }
-               for _, message := range ep.TxMessages {
-                       messageid := rtmgr.MESSAGETYPES[message]
-                       txlist[messageid] = append(txlist[messageid], (*ep))
+func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
+       for _, ep := range *eps {
+               if ep.Name == name {
+                       xapp.Logger.Debug("name: %s", ep.Name)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       return ep
                }
        }
                }
        }
-       return &txlist, &rxgroups
+       return nil
 }
 
 }
 
-/*
-Gets the raw xapp list and creates a route table for
-Returns the array of route table entries
-*/
-func (r *Rpe) getRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
-       tx, rx := r.getRouteRxTxLists(eps)
-       var rt rtmgr.RouteTable
-       for _, messagetype := range rtmgr.MESSAGETYPES {
-               /*if _, ok := (*tx)[messagetype]; !ok {
-                       continue
-               }
-               if _, ok := (*rx)[messagetype]; !ok {
-                       continue
-               }*/
-               txList, ok := (*tx)[messagetype]
-               if !ok {
-                       txList = rtmgr.EndpointList{}
-               }
-               var rxgroups []rtmgr.EndpointList
-               for _, endpointlist := range (*rx)[messagetype] {
-                       rxgroups = append(rxgroups, endpointlist)
-               }
-               if len(txList) > 0 || len(rxgroups) > 0 {
-                       rte := rtmgr.RouteTableEntry{
-                               messagetype,
-                               txList,
-                               rxgroups,
-                               -1,
-                       }
-                       rt = append(rt, rte)
+func getEndpointListByName(eps *rtmgr.Endpoints, name string) []rtmgr.Endpoint {
+       var eplist []rtmgr.Endpoint
+
+       for _, ep := range *eps {
+               if ep.Name == name {
+                       xapp.Logger.Debug("name: %s", ep.Name)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       eplist = append(eplist, *ep)
                }
        }
                }
        }
-       r.addStaticRoutes(eps, &rt)
-       r.addSubscriptionRoutes(eps, &rt, &rtmgr.Subs)
-       return &rt
+       return eplist
 }
 
 }
 
-/*
-Adds specific static routes to the route table
-which cannot be calculated with endpoint tx/rx message types.
-*/
-func (r *Rpe) addStaticRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable) {
-       var uemanep, submanep *rtmgr.Endpoint
-       for _, ep := range eps {
-               if ep.Name == "UEMAN" {
-                       uemanep = ep
-               }
-               if ep.Name == "SUBMAN" {
-                       submanep = ep
+func getEndpointByUuid(uuid string) *rtmgr.Endpoint {
+       endPoints := rtmgr.Eps
+       for _, ep := range endPoints {
+               if ep.Uuid == uuid {
+                       xapp.Logger.Debug("name: %s", ep.Uuid)
+                       xapp.Logger.Debug("ep: %v", ep)
+                       return ep
                }
        }
                }
        }
+       return nil
+}
 
 
-       if uemanep != nil && submanep != nil {
-               txlist := rtmgr.EndpointList{*uemanep}
-               rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*submanep}}
-               rte1 := rtmgr.RouteTableEntry{
-                       rtmgr.MESSAGETYPES["RIC_SUB_REQ"],
-                       txlist,
-                       rxlist,
-                       -1,
-               }
-               rte2 := rtmgr.RouteTableEntry{
-                       rtmgr.MESSAGETYPES["RIC_SUB_DEL_REQ"],
-                       txlist,
-                       rxlist,
-                       -1,
+func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
+       txList := rtmgr.EndpointList{}
+       rxList := []rtmgr.EndpointList{}
+
+       if tx == nil && rx == nil {
+               pc, _, _, ok := runtime.Caller(1)
+               details := runtime.FuncForPC(pc)
+               if ok && details != nil {
+                       xapp.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
                }
                }
-               *rt = append(*rt, rte1)
-               *rt = append(*rt, rte2)
        } else {
        } else {
-               rtmgr.Logger.Warn("Cannot get the static route details of the platform components UEMAN/SUBMAN")
+               if tx != nil {
+                       txList = rtmgr.EndpointList{*tx}
+               }
+               if rx != nil {
+                       rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
+               }
+               //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+               messageId := rtmgr.Mtype[messageType]
+               route := rtmgr.RouteTableEntry{
+                       MessageType: messageId,
+                       TxList:      txList,
+                       RxGroups:    rxList,
+                       SubID:       subId,
+                       RouteType:   routeType}
+               *routeTable = append(*routeTable, route)
+               //              xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+               //              xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
        }
 }
 
        }
 }
 
-func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
-       for _, ep := range *eps {
-               if ep.Name == name {
-                       rtmgr.Logger.Debug("name: %s", ep.Name)
-                       rtmgr.Logger.Debug("ep: %v", ep)
-                       return ep
+func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
+       txList := rtmgr.EndpointList{}
+       rxList := []rtmgr.EndpointList{}
+
+       if tx != nil {
+               txList = rtmgr.EndpointList{*tx}
+       }
+
+       if rx != nil {
+               for _, item := range rx {
+                       ep := []rtmgr.Endpoint{item}
+                       rxList = append(rxList, ep)
                }
        }
                }
        }
-       return nil
+
+       //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+       messageId := rtmgr.Mtype[messageType]
+       route := rtmgr.RouteTableEntry{
+               MessageType: messageId,
+               TxList:      txList,
+               RxGroups:    rxList,
+               SubID:       subId,
+               RouteType:   routeType}
+       *routeTable = append(*routeTable, route)
+       //      xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+       //      xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
 }
 
 }
 
-func getEndpointByUuid(eps *rtmgr.Endpoints, uuid string) *rtmgr.Endpoint {
-       for _, ep := range *eps {
-               if ep.Uuid == uuid {
-                       rtmgr.Logger.Debug("name: %s", ep.Uuid)
-                       rtmgr.Logger.Debug("ep: %v", ep)
-                       return ep
+func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.generateXappRoutes invoked")
+       xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
+       if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) {
+               /// TODO ---
+               //xApp -> Subscription Manager
+               r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1, "")
+               r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1, "")
+               //xApp -> E2 Termination
+               //              r.addRoute("RIC_CONTROL_REQ", xAppEp, e2TermEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_REQ", xAppEp, nil, routeTable, -1, "%meid")
+               //E2 Termination -> xApp
+               ///             r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, -1, "")
+               ///             r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, -1, "")
+               r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, -1, "")
+       }
+       //xApp->A1Mediator
+       if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.Policies) > 0 {
+               xapp.Logger.Debug("rpe.generateXappRoutes found policies section")
+               for _, policy := range xAppEp.Policies {
+                       r.addRoute("A1_POLICY_REQ", nil, xAppEp, routeTable, policy, "")
                }
        }
                }
        }
-       return nil
+
 }
 }
-func (r *Rpe) addSubscriptionRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable, subs *rtmgr.SubscriptionList) {
-       rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
-       rtmgr.Logger.Debug("params: %v", eps)
-       var e2termep, submanep, xappEp *rtmgr.Endpoint
-       var xappName string
-       e2termep = getEndpointByName(&eps, "E2TERM")
-       submanep = getEndpointByName(&eps, "SUBMAN")
-       if e2termep != nil && submanep != nil {
-               // looping through the subscription list, add routes one by one
-               for _, sub := range *subs {
-                       // SubMan -> XApp
-                       xappName = sub.Fqdn + ":" + strconv.Itoa(int(sub.Port))
-                       xappEp = getEndpointByUuid(&eps, xappName)
-                       if xappEp == nil {
-                               rtmgr.Logger.Error("XApp not found: %s", xappName)
-                               rtmgr.Logger.Debug("Endpoints: %v", eps)
-                       } else {
-                               txlist := rtmgr.EndpointList{*submanep}
-                               rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
-                               subManMsgs := []string{"RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE"}
-                               for _, entry := range subManMsgs {
-                                       rte := rtmgr.RouteTableEntry{
-                                               rtmgr.MESSAGETYPES[entry],
-                                               txlist,
-                                               rxlist,
-                                               sub.SubID,
-                                       }
-                                       *rt = append(*rt, rte)
-                               }
-                               // E2Term -> XApp
-                               txlist = rtmgr.EndpointList{*e2termep}
-                               rxlist = []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
-                               e2apMsgs := []string{"RIC_CONTROL_ACK", "RIC_CONTROL_FAILURE", "RIC_INDICATION"}
-                               for _, entry := range e2apMsgs {
-                                       rte := rtmgr.RouteTableEntry{
-                                               rtmgr.MESSAGETYPES[entry],
-                                               txlist,
-                                               rxlist,
-                                               sub.SubID,
+
+
+func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked")
+
+       for _, rxmsg := range RecvxAppEp.RxMessages {
+
+               var src_present bool
+               xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg)
+               if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") {
+                       for _, SrcxAppEp := range endPointList {
+                               if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
+                                       for _, txmsg := range SrcxAppEp.TxMessages {
+                                                       if (rxmsg == txmsg) {
+                                                               r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "")
+                                                               src_present = true
+                                                               break
+                                                       }
                                        }
                                        }
-                                       *rt = append(*rt, rte)
                                }
                        }
                                }
                        }
+                       if src_present == false {
+                               r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "")
+                       }
                }
                }
-               rtmgr.Logger.Debug("addSubscriptionRoutes eps: %v", eps)
-       } else {
-               rtmgr.Logger.Warn("Subscription route update failure: Cannot get the static route details of the platform components E2TERM/SUBMAN")
+
+       }
+}
+
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
+       subscriptionList := &rtmgr.Subs
+       for _, subscription := range *subscriptionList {
+               xapp.Logger.Debug("Subscription: %v", subscription)
+               xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
+               xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+               xAppEp := getEndpointByUuid(xAppUuid)
+               if xAppEp != nil {
+                       if xAppEp.Uuid == selectedxAppEp.Uuid {
+                               xapp.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
+                               /// TODO
+                               //Subscription Manager -> xApp
+                               r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                               r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                               r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                               r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
+                               //E2 Termination -> xApp
+                               r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "")
+                               r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "")
+                               r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "")
+                       }
+               } else {
+                               xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
+               }
+       }
+}
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
+       //Platform Routes --- Subscription Routes
+       //Subscription Manager -> E2 Termination
+       if rtmgr.PrsCfg == nil {
+               xapp.Logger.Info("No static routes configuration")
+               return
        }
        }
+       for _, routes := range *rtmgr.PrsCfg {
+               var sendEp *rtmgr.Endpoint
+               var Ep *rtmgr.Endpoint
+               switch routes.SenderEndPoint {
+               case "SUBMAN":
+                       sendEp = subManEp
+               case "E2MAN":
+                       sendEp = e2ManEp
+               case "RSM":
+                       sendEp = rsmEp
+               case "A1MEDIATOR":
+                       sendEp = a1mediatorEp
+               }
+               switch routes.EndPoint {
+               case "SUBMAN":
+                       Ep = subManEp
+               case "E2MAN":
+                       Ep = e2ManEp
+               //case "UEMAN":
+               //      Ep = ueManEp
+               case "RSM":
+                       Ep = rsmEp
+               case "A1MEDIATOR":
+                       Ep = a1mediatorEp
+               }
+
+               r.addRoute(routes.MessageType, sendEp, Ep, routeTable, routes.SubscriptionId, routes.Meid)
+       }
+
+       if len(e2TermEp) > 0 {
+               r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "")
+       }
+}
 
 
+func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
+       xapp.Logger.Debug("rpe.generateRouteTable invoked")
+       xapp.Logger.Debug("Endpoint List:  %v", endPointList)
+       routeTable := &rtmgr.RouteTable{}
+       /*e2TermEp := getEndpointByName(&endPointList, "E2TERM")
+       if e2TermEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Termination")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }*/
+       subManEp := getEndpointByName(&endPointList, "SUBMAN")
+       if subManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+       if e2ManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       rsmEp := getEndpointByName(&endPointList, "RSM")
+       if rsmEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
+       if A1MediatorEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+
+       e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+       if len(e2TermListEp) == 0 {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+
+       for _, endPoint := range endPointList {
+               xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+               if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+                       r.generateXappRoutes(endPoint, subManEp, routeTable)
+                       r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
+                       r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
+
+               }
+       }
+       return routeTable
 }
 }