Add fix for policy change transaction key already tracked
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         httptransport "github.com/go-openapi/runtime/client"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "time"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 func idstring(err error, entries ...fmt.Stringer) string {
39         var retval string = ""
40         var filler string = ""
41         for _, entry := range entries {
42                 retval += filler + entry.String()
43                 filler = " "
44         }
45         if err != nil {
46                 retval += filler + "err(" + err.Error() + ")"
47                 filler = " "
48
49         }
50         return retval
51 }
52
53 //-----------------------------------------------------------------------------
54 //
55 //-----------------------------------------------------------------------------
56
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
61
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
63
64 type Control struct {
65         xapptweaks.XappWrapper
66         e2ap     *E2ap
67         registry *Registry
68         tracker  *Tracker
69         timerMap *TimerMap
70 }
71
72 type RMRMeid struct {
73         PlmnID  string
74         EnbID   string
75         RanName string
76 }
77
78 func init() {
79         xapp.Logger.Info("SUBMGR")
80         viper.AutomaticEnv()
81         viper.SetEnvPrefix("submgr")
82         viper.AllowEmptyEnv(true)
83 }
84
85 func NewControl() *Control {
86
87         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
89
90         registry := new(Registry)
91         registry.Initialize()
92         registry.rtmgrClient = &rtmgrClient
93
94         tracker := new(Tracker)
95         tracker.Init()
96
97         timerMap := new(TimerMap)
98         timerMap.Init()
99
100         c := &Control{e2ap: new(E2ap),
101                 registry: registry,
102                 tracker:  tracker,
103                 timerMap: timerMap,
104         }
105         c.XappWrapper.Init("")
106         return c
107 }
108
109 func (c *Control) ReadyCB(data interface{}) {
110         if c.Rmr == nil {
111                 c.Rmr = xapp.Rmr
112         }
113 }
114
115 func (c *Control) Run() {
116         xapp.SetReadyCB(c.ReadyCB, nil)
117         xapp.Run(c)
118 }
119
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121         params := xapptweaks.NewParams(nil)
122         params.Mtype = trans.GetMtype()
123         params.SubId = int(subs.GetReqId().Seq)
124         params.Xid = ""
125         params.Meid = subs.GetMeid()
126         params.Src = ""
127         params.PayloadLen = len(trans.Payload.Buf)
128         params.Payload = trans.Payload.Buf
129         params.Mbuf = nil
130         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131         return c.RmrSend(params)
132 }
133
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
135
136         params := xapptweaks.NewParams(nil)
137         params.Mtype = trans.GetMtype()
138         params.SubId = int(subs.GetReqId().Seq)
139         params.Xid = trans.GetXid()
140         params.Meid = trans.GetMeid()
141         params.Src = ""
142         params.PayloadLen = len(trans.Payload.Buf)
143         params.Payload = trans.Payload.Buf
144         params.Mbuf = nil
145         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146         return c.RmrSend(params)
147 }
148
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150         msg := xapptweaks.NewParams(params)
151         if c.Rmr == nil {
152                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153                 xapp.Logger.Error("%s", err.Error())
154                 return
155         }
156         c.CntRecvMsg++
157
158         defer c.Rmr.Free(msg.Mbuf)
159
160         switch msg.Mtype {
161         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162                 go c.handleXAPPSubscriptionRequest(msg)
163         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164                 go c.handleE2TSubscriptionResponse(msg)
165         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166                 go c.handleE2TSubscriptionFailure(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168                 go c.handleXAPPSubscriptionDeleteRequest(msg)
169         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170                 go c.handleE2TSubscriptionDeleteResponse(msg)
171         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172                 go c.handleE2TSubscriptionDeleteFailure(msg)
173         default:
174                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175         }
176         return
177 }
178
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183         xapp.Logger.Info("MSG from XAPP: %s", params.String())
184
185         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
186         if err != nil {
187                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
188                 return
189         }
190
191         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
192         if trans == nil {
193                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
194                 return
195         }
196         defer trans.Release()
197
198         err = c.tracker.Track(trans)
199         if err != nil {
200                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
201                 return
202         }
203
204         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
205         if err != nil {
206                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
207                 return
208         }
209
210         //
211         // Wake subs request
212         //
213         go c.handleSubscriptionCreate(subs, trans)
214         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
215
216         err = nil
217         if event != nil {
218                 switch themsg := event.(type) {
219                 case *e2ap.E2APSubscriptionResponse:
220                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
221                         if err == nil {
222                                 trans.Release()
223                                 c.rmrSendToXapp("", subs, trans)
224                                 return
225                         }
226                 case *e2ap.E2APSubscriptionFailure:
227                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
228                         if err == nil {
229                                 c.rmrSendToXapp("", subs, trans)
230                         }
231                 default:
232                         break
233                 }
234         }
235         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
236         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
237 }
238
239 //-------------------------------------------------------------------
240 // handle from XAPP Subscription Delete Request
241 //------------------------------------------------------------------
242 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
243         xapp.Logger.Info("MSG from XAPP: %s", params.String())
244
245         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
246         if err != nil {
247                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
248                 return
249         }
250
251         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
252         if trans == nil {
253                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
254                 return
255         }
256         defer trans.Release()
257
258         err = c.tracker.Track(trans)
259         if err != nil {
260                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
261                 return
262         }
263
264         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
265         if err != nil {
266                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
267                 return
268         }
269
270         //
271         // Wake subs delete
272         //
273         go c.handleSubscriptionDelete(subs, trans)
274         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
275
276         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
277
278         // Whatever is received send ok delete response
279         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
280         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
281         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
282         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
283         if err == nil {
284                 c.rmrSendToXapp("", subs, trans)
285         }
286
287         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
288 }
289
290 //-------------------------------------------------------------------
291 // SUBS CREATE Handling
292 //-------------------------------------------------------------------
293 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
294
295         trans := c.tracker.NewSubsTransaction(subs)
296         subs.WaitTransactionTurn(trans)
297         defer subs.ReleaseTransactionTurn(trans)
298         defer trans.Release()
299
300         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
301
302         subRfMsg, valid := subs.GetCachedResponse()
303         if subRfMsg == nil && valid == true {
304                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
305                 switch event.(type) {
306                 case *e2ap.E2APSubscriptionResponse:
307                         subRfMsg, valid = subs.SetCachedResponse(event, true)
308                 case *e2ap.E2APSubscriptionFailure:
309                         subRfMsg, valid = subs.SetCachedResponse(event, false)
310                 default:
311                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
312                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
313                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
314                 }
315                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
316         } else {
317                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
318         }
319
320         parentTrans.SendEvent(subRfMsg, 0)
321 }
322
323 //-------------------------------------------------------------------
324 // SUBS DELETE Handling
325 //-------------------------------------------------------------------
326
327 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
328
329         trans := c.tracker.NewSubsTransaction(subs)
330         subs.WaitTransactionTurn(trans)
331         defer subs.ReleaseTransactionTurn(trans)
332         defer trans.Release()
333
334         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
335
336         subs.mutex.Lock()
337         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
338                 subs.valid = false
339                 subs.mutex.Unlock()
340                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
341         } else {
342                 subs.mutex.Unlock()
343         }
344
345         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
346         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
347         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
348         parentTrans.SendEvent(subDelRespMsg, 0)
349 }
350
351 //-------------------------------------------------------------------
352 // send to E2T Subscription Request
353 //-------------------------------------------------------------------
354 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
355         var err error
356         var event interface{} = nil
357         var timedOut bool = false
358
359         subReqMsg := subs.SubReqMsg
360         subReqMsg.RequestId = subs.GetReqId().RequestId
361         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
362         if err != nil {
363                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
364                 return event
365         }
366
367         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
368                 desc := fmt.Sprintf("(retry %d)", retries)
369                 c.rmrSendToE2T(desc, subs, trans)
370                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
371                 if timedOut {
372                         continue
373                 }
374                 break
375         }
376         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
377         return event
378 }
379
380 //-------------------------------------------------------------------
381 // send to E2T Subscription Delete Request
382 //-------------------------------------------------------------------
383
384 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
385         var err error
386         var event interface{}
387         var timedOut bool
388
389         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
390         subDelReqMsg.RequestId = subs.GetReqId().RequestId
391         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
392         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
393         if err != nil {
394                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
395                 return event
396         }
397
398         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
399                 desc := fmt.Sprintf("(retry %d)", retries)
400                 c.rmrSendToE2T(desc, subs, trans)
401                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
402                 if timedOut {
403                         continue
404                 }
405                 break
406         }
407         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
408         return event
409 }
410
411 //-------------------------------------------------------------------
412 // handle from E2T Subscription Reponse
413 //-------------------------------------------------------------------
414 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
415         xapp.Logger.Info("MSG from E2T: %s", params.String())
416         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
417         if err != nil {
418                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
419                 return
420         }
421         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
422         if err != nil {
423                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
424                 return
425         }
426         trans := subs.GetTransaction()
427         if trans == nil {
428                 err = fmt.Errorf("Ongoing transaction not found")
429                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
430                 return
431         }
432         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
433         if sendOk == false {
434                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
435                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
436         }
437         return
438 }
439
440 //-------------------------------------------------------------------
441 // handle from E2T Subscription Failure
442 //-------------------------------------------------------------------
443 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
444         xapp.Logger.Info("MSG from E2T: %s", params.String())
445         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
446         if err != nil {
447                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
448                 return
449         }
450         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
451         if err != nil {
452                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
453                 return
454         }
455         trans := subs.GetTransaction()
456         if trans == nil {
457                 err = fmt.Errorf("Ongoing transaction not found")
458                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
459                 return
460         }
461         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
462         if sendOk == false {
463                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
464                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
465         }
466         return
467 }
468
469 //-------------------------------------------------------------------
470 // handle from E2T Subscription Delete Response
471 //-------------------------------------------------------------------
472 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
473         xapp.Logger.Info("MSG from E2T: %s", params.String())
474         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
475         if err != nil {
476                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
477                 return
478         }
479         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
480         if err != nil {
481                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
482                 return
483         }
484         trans := subs.GetTransaction()
485         if trans == nil {
486                 err = fmt.Errorf("Ongoing transaction not found")
487                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
488                 return
489         }
490         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
491         if sendOk == false {
492                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
493                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
494         }
495         return
496 }
497
498 //-------------------------------------------------------------------
499 // handle from E2T Subscription Delete Failure
500 //-------------------------------------------------------------------
501 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
502         xapp.Logger.Info("MSG from E2T: %s", params.String())
503         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
504         if err != nil {
505                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
506                 return
507         }
508         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
509         if err != nil {
510                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
511                 return
512         }
513         trans := subs.GetTransaction()
514         if trans == nil {
515                 err = fmt.Errorf("Ongoing transaction not found")
516                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
517                 return
518         }
519         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
520         if sendOk == false {
521                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
522                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
523         }
524         return
525 }
526
527 //-------------------------------------------------------------------
528 //
529 //-------------------------------------------------------------------
530 func typeofSubsMessage(v interface{}) string {
531         if v == nil {
532                 return "NIL"
533         }
534         switch v.(type) {
535         case *e2ap.E2APSubscriptionRequest:
536                 return "SubReq"
537         case *e2ap.E2APSubscriptionResponse:
538                 return "SubResp"
539         case *e2ap.E2APSubscriptionFailure:
540                 return "SubFail"
541         case *e2ap.E2APSubscriptionDeleteRequest:
542                 return "SubDelReq"
543         case *e2ap.E2APSubscriptionDeleteResponse:
544                 return "SubDelResp"
545         case *e2ap.E2APSubscriptionDeleteFailure:
546                 return "SubDelFail"
547         default:
548                 return "Unknown"
549         }
550 }