Supporting of reading subscriptions from subscription manager while restarting rtmgr
[ric-plt/rtmgr.git] / pkg / rpe / rpe.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     rpe.go
26   Abstract:     Contains RPE (Route Policy Engine) module definitions and generic RPE components
27   Date:         16 March 2019
28 */
29
30 package rpe
31
32 import (
33         "errors"
34         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35         "routing-manager/pkg/rtmgr"
36         "routing-manager/pkg/sbi"
37         "runtime"
38         "strconv"
39 )
40
41 var (
42         SupportedRpes = []*EngineConfig{
43                 {
44                         Name:        "rmrpush",
45                         Version:     "pubsush",
46                         Protocol:    "rmruta",
47                         Instance:    NewRmrPush(),
48                         IsAvailable: true,
49                 },
50         }
51 )
52
53 func GetRpe(rpeName string) (Engine, error) {
54         for _, rpe := range SupportedRpes {
55                 if rpe.Name == rpeName && rpe.IsAvailable {
56                         return rpe.Instance, nil
57                 }
58         }
59         return nil, errors.New("SBI:" + rpeName + " is not supported or still not a available")
60 }
61
62 type Rpe struct {
63 }
64
65 func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
66         for _, ep := range *eps {
67                 if ep.Name == name {
68                         xapp.Logger.Debug("name: %s", ep.Name)
69                         xapp.Logger.Debug("ep: %v", ep)
70                         return ep
71                 }
72         }
73         return nil
74 }
75
76 func getEndpointListByName(eps *rtmgr.Endpoints, name string) []rtmgr.Endpoint {
77         var eplist []rtmgr.Endpoint
78
79         for _, ep := range *eps {
80                 if ep.Name == name {
81                         xapp.Logger.Debug("name: %s", ep.Name)
82                         xapp.Logger.Debug("ep: %v", ep)
83                         eplist = append(eplist, *ep)
84                 }
85         }
86         return eplist
87 }
88
89 func getEndpointByUuid(uuid string) *rtmgr.Endpoint {
90         endPoints := rtmgr.Eps
91         for _, ep := range endPoints {
92                 if ep.Uuid == uuid {
93                         xapp.Logger.Debug("name: %s", ep.Uuid)
94                         xapp.Logger.Debug("ep: %v", ep)
95                         return ep
96                 }
97         }
98         return nil
99 }
100
101 func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
102         txList := rtmgr.EndpointList{}
103         rxList := []rtmgr.EndpointList{}
104
105         if tx == nil && rx == nil {
106                 pc, _, _, ok := runtime.Caller(1)
107                 details := runtime.FuncForPC(pc)
108                 if ok && details != nil {
109                         xapp.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
110                 }
111         } else {
112                 if tx != nil {
113                         txList = rtmgr.EndpointList{*tx}
114                 }
115                 if rx != nil {
116                         rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
117                 }
118                 messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
119                 route := rtmgr.RouteTableEntry{
120                         MessageType: messageId,
121                         TxList:      txList,
122                         RxGroups:    rxList,
123                         SubID:       subId,
124                         RouteType:   routeType}
125                 *routeTable = append(*routeTable, route)
126                 //              xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
127                 //              xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
128         }
129 }
130
131 func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32, routeType string) {
132         txList := rtmgr.EndpointList{}
133         rxList := []rtmgr.EndpointList{}
134
135         if tx != nil {
136                 txList = rtmgr.EndpointList{*tx}
137         }
138
139         if rx != nil {
140                 for _, item := range rx {
141                         ep := []rtmgr.Endpoint{item}
142                         rxList = append(rxList, ep)
143                 }
144         }
145
146         messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
147         route := rtmgr.RouteTableEntry{
148                 MessageType: messageId,
149                 TxList:      txList,
150                 RxGroups:    rxList,
151                 SubID:       subId,
152                 RouteType:   routeType}
153         *routeTable = append(*routeTable, route)
154         //      xapp.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
155         //      xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
156 }
157
158 func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
159         xapp.Logger.Debug("rpe.generateXappRoutes invoked")
160         xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
161         if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) {
162                 /// TODO ---
163                 //xApp -> Subscription Manager
164                 r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1, "")
165                 r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1, "")
166                 //xApp -> E2 Termination
167                 //              r.addRoute("RIC_CONTROL_REQ", xAppEp, e2TermEp, routeTable, -1, "")
168                 r.addRoute("RIC_CONTROL_REQ", xAppEp, nil, routeTable, -1, "%meid")
169                 //E2 Termination -> xApp
170                 ///             r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, -1, "")
171                 ///             r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, -1, "")
172                 r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, -1, "")
173                 r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, -1, "")
174         }
175         //xApp->A1Mediator
176         if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.Policies) > 0 {
177                 xapp.Logger.Debug("rpe.generateXappRoutes found policies section")
178                 for _, policy := range xAppEp.Policies {
179                         r.addRoute("A1_POLICY_REQ", nil, xAppEp, routeTable, policy, "")
180                 }
181         }
182
183 }
184
185
186 func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) {
187         xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked")
188
189         for _, rxmsg := range RecvxAppEp.RxMessages {
190
191                 var src_present bool
192                 xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg)
193                 if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") {
194                         for _, SrcxAppEp := range endPointList {
195                                 if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
196                                         for _, txmsg := range SrcxAppEp.TxMessages {
197                                                         if (rxmsg == txmsg) {
198                                                                 r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "")
199                                                                 src_present = true
200                                                                 break
201                                                         }
202                                         }
203                                 }
204                         }
205                         if src_present == false {
206                                 r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "")
207                         }
208                 }
209
210         }
211 }
212
213 func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
214         xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
215         subscriptionList := &rtmgr.Subs
216         for _, subscription := range *subscriptionList {
217                 xapp.Logger.Debug("Subscription: %v", subscription)
218                 xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
219                 xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
220                 xAppEp := getEndpointByUuid(xAppUuid)
221                 if xAppEp != nil {
222                         if xAppEp.Uuid == selectedxAppEp.Uuid {
223                                 xapp.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
224                                 /// TODO
225                                 //Subscription Manager -> xApp
226                                 r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
227                                 r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
228                                 r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
229                                 r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
230                                 //E2 Termination -> xApp
231                                 r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "")
232                                 r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "")
233                                 r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "")
234                         }
235                 } else {
236                                 xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
237                 }
238         }
239 }
240
241 func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
242         xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
243         //Platform Routes --- Subscription Routes
244         //Subscription Manager -> E2 Termination
245         for _, routes := range *rtmgr.PrsCfg {
246                 var sendEp *rtmgr.Endpoint
247                 var Ep *rtmgr.Endpoint
248                 switch routes.SenderEndPoint {
249                 case "SUBMAN":
250                         sendEp = subManEp
251                 case "E2MAN":
252                         sendEp = e2ManEp
253                 //case "UEMAN":
254                 //      sendEp = ueManEp
255                 case "RSM":
256                         sendEp = rsmEp
257                 case "A1MEDIATOR":
258                         sendEp = a1mediatorEp
259                 }
260                 switch routes.EndPoint {
261                 case "SUBMAN":
262                         Ep = subManEp
263                 case "E2MAN":
264                         Ep = e2ManEp
265                 //case "UEMAN":
266                 //      Ep = ueManEp
267                 case "RSM":
268                         Ep = rsmEp
269                 case "A1MEDIATOR":
270                         Ep = a1mediatorEp
271                 }
272
273                 r.addRoute(routes.MessageType, sendEp, Ep, routeTable, routes.SubscriptionId, routes.Meid)
274         }
275
276         if len(e2TermEp) > 0 {
277                 r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
278                 r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
279                 r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "")
280                 r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "")
281         }
282 }
283
284 func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
285         xapp.Logger.Debug("rpe.generateRouteTable invoked")
286         xapp.Logger.Debug("Endpoint List:  %v", endPointList)
287         routeTable := &rtmgr.RouteTable{}
288         /*e2TermEp := getEndpointByName(&endPointList, "E2TERM")
289         if e2TermEp == nil {
290                 xapp.Logger.Error("Platform component not found: %v", "E2 Termination")
291                 xapp.Logger.Debug("Endpoints: %v", endPointList)
292         }*/
293         subManEp := getEndpointByName(&endPointList, "SUBMAN")
294         if subManEp == nil {
295                 xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
296                 xapp.Logger.Debug("Endpoints: %v", endPointList)
297         }
298         e2ManEp := getEndpointByName(&endPointList, "E2MAN")
299         if e2ManEp == nil {
300                 xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
301                 xapp.Logger.Debug("Endpoints: %v", endPointList)
302         }
303         /*ueManEp := getEndpointByName(&endPointList, "UEMAN")
304         if ueManEp == nil {
305                 xapp.Logger.Error("Platform component not found: %v", "UE Manger")
306                 xapp.Logger.Debug("Endpoints: %v", endPointList)
307         }*/
308         rsmEp := getEndpointByName(&endPointList, "RSM")
309         if rsmEp == nil {
310                 xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
311                 xapp.Logger.Debug("Endpoints: %v", endPointList)
312         }
313         A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
314         if A1MediatorEp == nil {
315                 xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
316                 xapp.Logger.Debug("Endpoints: %v", endPointList)
317         }
318
319         e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
320         if len(e2TermListEp) == 0 {
321                 xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
322                 xapp.Logger.Debug("Endpoints: %v", endPointList)
323         }
324         r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
325
326         for _, endPoint := range endPointList {
327                 xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
328                 if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
329                         r.generateXappRoutes(endPoint, subManEp, routeTable)
330                         r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
331                         r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
332
333                 }
334         }
335         return routeTable
336 }