Appending Receiver endpoint in routes if RX message type is same
[ric-plt/rtmgr.git] / pkg / rpe / rpe.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     rpe.go
26   Abstract:     Contains RPE (Route Policy Engine) module definitions and generic RPE components
27   Date:         16 March 2019
28 */
29
30 package rpe
31
32 import (
33         "errors"
34         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35         "routing-manager/pkg/rtmgr"
36         "routing-manager/pkg/sbi"
37         "runtime"
38         "strconv"
39 )
40
41 var (
42         SupportedRpes = []*EngineConfig{
43                 {
44                         Name:        "rmrpush",
45                         Version:     "pubsush",
46                         Protocol:    "rmruta",
47                         Instance:    NewRmrPush(),
48                         IsAvailable: true,
49                 },
50         }
51 )
52
53 func GetRpe(rpeName string) (Engine, error) {
54         for _, rpe := range SupportedRpes {
55                 if rpe.Name == rpeName && rpe.IsAvailable {
56                         return rpe.Instance, nil
57                 }
58         }
59         return nil, errors.New("SBI:" + rpeName + " is not supported or still not a available")
60 }
61
62 type Rpe struct {
63 }
64
65 func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
66         for _, ep := range *eps {
67                 if ep.Name == name {
68                         xapp.Logger.Debug("name: %s", ep.Name)
69                         xapp.Logger.Debug("ep: %v", ep)
70                         return ep
71                 }
72         }
73         return nil
74 }
75
76 func getEndpointListByName(eps *rtmgr.Endpoints, name string) []rtmgr.Endpoint {
77         var eplist []rtmgr.Endpoint
78
79         for _, ep := range *eps {
80                 if ep.Name == name {
81                         xapp.Logger.Debug("name: %s", ep.Name)
82                         xapp.Logger.Debug("ep: %v", ep)
83                         eplist = append(eplist, *ep)
84                 }
85         }
86         return eplist
87 }
88
89 func getEndpointByUuid(uuid string) *rtmgr.Endpoint {
90         endPoints := rtmgr.Eps
91         for _, ep := range endPoints {
92                 if ep.Uuid == uuid {
93                         xapp.Logger.Debug("name: %s", ep.Uuid)
94                         xapp.Logger.Debug("ep: %v", ep)
95                         return ep
96                 }
97         }
98         return nil
99 }
100
101 func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
102         txList := rtmgr.EndpointList{}
103         rxList := []rtmgr.EndpointList{}
104
105         if tx == nil && rx == nil {
106                 pc, _, _, ok := runtime.Caller(1)
107                 details := runtime.FuncForPC(pc)
108                 if ok && details != nil {
109                         xapp.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
110                 }
111         } else {
112                 if tx != nil {
113                         txList = rtmgr.EndpointList{*tx}
114                 }
115                 if rx != nil {
116                         rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
117                 }
118                 //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
119                 messageId := rtmgr.Mtype[messageType]
120                 route := rtmgr.RouteTableEntry{
121                         MessageType: messageId,
122                         TxList:      txList,
123                         RxGroups:    rxList,
124                         SubID:       subId,
125                         RouteType:   routeType}
126                 *routeTable = append(*routeTable, route)
127                 //              xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
128                 //              xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
129         }
130 }
131
132 func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
133         txList := rtmgr.EndpointList{}
134         rxList := []rtmgr.EndpointList{}
135
136         if tx != nil {
137                 txList = rtmgr.EndpointList{*tx}
138         }
139
140         if rx != nil {
141                 for _, item := range rx {
142                         ep := []rtmgr.Endpoint{item}
143                         rxList = append(rxList, ep)
144                 }
145         }
146
147         //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
148         messageId := rtmgr.Mtype[messageType]
149         route := rtmgr.RouteTableEntry{
150                 MessageType: messageId,
151                 TxList:      txList,
152                 RxGroups:    rxList,
153                 SubID:       subId,
154                 RouteType:   routeType}
155         *routeTable = append(*routeTable, route)
156         //      xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
157         //      xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
158 }
159
160 func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
161         xapp.Logger.Debug("rpe.generateXappRoutes invoked")
162         xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
163         if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) {
164                 /// TODO ---
165                 //xApp -> Subscription Manager
166                 r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1, "")
167                 r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1, "")
168                 //xApp -> E2 Termination
169                 //              r.addRoute("RIC_CONTROL_REQ", xAppEp, e2TermEp, routeTable, -1, "")
170                 r.addRoute("RIC_CONTROL_REQ", xAppEp, nil, routeTable, -1, "%meid")
171                 //E2 Termination -> xApp
172                 ///             r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, -1, "")
173                 ///             r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, -1, "")
174                 r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, -1, "")
175                 r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, -1, "")
176         }
177         //xApp->A1Mediator
178         if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.Policies) > 0 {
179                 xapp.Logger.Debug("rpe.generateXappRoutes found policies section")
180                 for _, policy := range xAppEp.Policies {
181                         r.addRoute("A1_POLICY_REQ", nil, xAppEp, routeTable, policy, "")
182                 }
183         }
184
185 }
186
187 func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) {
188         xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked")
189
190         for _, rxmsg := range RecvxAppEp.RxMessages {
191
192                 var src_present bool
193                 identicalMsg := false
194                 var RxGrp []rtmgr.Endpoint
195                 xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg)
196                 if rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ" {
197                         for _, SrcxAppEp := range endPointList {
198                                 if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
199                                         for _, txmsg := range SrcxAppEp.TxMessages {
200                                                 if rxmsg == txmsg {
201                                                         r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "")
202                                                         src_present = true
203                                                         break
204                                                 }
205                                         }
206                                 }
207                         }
208                         for _, SrcxAppEp := range endPointList {
209
210                                 if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.RxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
211                                         for _, newrxmsg := range SrcxAppEp.RxMessages {
212                                                 if newrxmsg == rxmsg {
213                                                         RxGrp = append(RxGrp, *SrcxAppEp)
214                                                         identicalMsg = true
215                                                 }
216                                         }
217                                 }
218                         }
219                         if src_present == false && identicalMsg == false {
220                                 xapp.Logger.Debug("Message type %v,for SrcxAppEp.Name %v", rxmsg, RecvxAppEp)
221                                 r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "")
222                         }
223                         if identicalMsg == true {
224                                 xapp.Logger.Debug("Appending Message type %v,for SrcxAppEp.Name %v", rxmsg, RecvxAppEp)
225                                 RxGrp = append(RxGrp, *RecvxAppEp)
226                                 r.addRoute_rx_list(rxmsg, nil, RxGrp, routeTable, -1, "")
227                                 return
228                         }
229                 }
230         }
231 }
232
233 func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
234         xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
235         subscriptionList := &rtmgr.Subs
236         for _, subscription := range *subscriptionList {
237                 xapp.Logger.Debug("Subscription: %v", subscription)
238                 xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
239                 xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
240                 xAppEp := getEndpointByUuid(xAppUuid)
241                 if xAppEp != nil {
242                         if xAppEp.Uuid == selectedxAppEp.Uuid {
243                                 xapp.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
244                                 /// TODO
245                                 //Subscription Manager -> xApp
246                                 r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
247                                 r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
248                                 r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
249                                 r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
250                                 //E2 Termination -> xApp
251                                 r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "")
252                                 r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "")
253                                 r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "")
254                         }
255                 } else {
256                         xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
257                 }
258         }
259 }
260
261 func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
262         xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
263         //Platform Routes --- Subscription Routes
264         //Subscription Manager -> E2 Termination
265         if rtmgr.PrsCfg == nil {
266                 xapp.Logger.Info("No static routes configuration")
267                 return
268         }
269         for _, routes := range *rtmgr.PrsCfg {
270                 var sendEp *rtmgr.Endpoint
271                 var Ep *rtmgr.Endpoint
272                 switch routes.SenderEndPoint {
273                 case "SUBMAN":
274                         sendEp = subManEp
275                 case "E2MAN":
276                         sendEp = e2ManEp
277                 case "RSM":
278                         sendEp = rsmEp
279                 case "A1MEDIATOR":
280                         sendEp = a1mediatorEp
281                 }
282                 switch routes.EndPoint {
283                 case "SUBMAN":
284                         Ep = subManEp
285                 case "E2MAN":
286                         Ep = e2ManEp
287                 //case "UEMAN":
288                 //      Ep = ueManEp
289                 case "RSM":
290                         Ep = rsmEp
291                 case "A1MEDIATOR":
292                         Ep = a1mediatorEp
293                 }
294
295                 r.addRoute(routes.MessageType, sendEp, Ep, routeTable, routes.SubscriptionId, routes.Meid)
296         }
297
298         if len(e2TermEp) > 0 {
299                 r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
300                 r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
301                 r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "")
302                 r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "")
303         }
304 }
305
306 func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
307         xapp.Logger.Debug("rpe.generateRouteTable invoked")
308         xapp.Logger.Debug("Endpoint List:  %v", endPointList)
309         routeTable := &rtmgr.RouteTable{}
310         /*e2TermEp := getEndpointByName(&endPointList, "E2TERM")
311         if e2TermEp == nil {
312                 xapp.Logger.Error("Platform component not found: %v", "E2 Termination")
313                 xapp.Logger.Debug("Endpoints: %v", endPointList)
314         }*/
315         subManEp := getEndpointByName(&endPointList, "SUBMAN")
316         if subManEp == nil {
317                 xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
318                 xapp.Logger.Debug("Endpoints: %v", endPointList)
319         }
320         e2ManEp := getEndpointByName(&endPointList, "E2MAN")
321         if e2ManEp == nil {
322                 xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
323                 xapp.Logger.Debug("Endpoints: %v", endPointList)
324         }
325         rsmEp := getEndpointByName(&endPointList, "RSM")
326         if rsmEp == nil {
327                 xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
328                 xapp.Logger.Debug("Endpoints: %v", endPointList)
329         }
330         A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
331         if A1MediatorEp == nil {
332                 xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
333                 xapp.Logger.Debug("Endpoints: %v", endPointList)
334         }
335
336         e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
337         if len(e2TermListEp) == 0 {
338                 xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
339                 xapp.Logger.Debug("Endpoints: %v", endPointList)
340         }
341         r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
342
343         for _, endPoint := range endPointList {
344                 xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
345                 if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
346                         r.generateXappRoutes(endPoint, subManEp, routeTable)
347                         r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
348                         r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
349
350                 }
351         }
352         return routeTable
353 }