Generalized unittest stubs so frame can be used also in other apps
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         httptransport "github.com/go-openapi/runtime/client"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "time"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 func idstring(err error, entries ...fmt.Stringer) string {
39         var retval string = ""
40         var filler string = ""
41         for _, entry := range entries {
42                 retval += filler + entry.String()
43                 filler = " "
44         }
45         if err != nil {
46                 retval += filler + "err(" + err.Error() + ")"
47                 filler = " "
48
49         }
50         return retval
51 }
52
53 //-----------------------------------------------------------------------------
54 //
55 //-----------------------------------------------------------------------------
56
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
61
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
63
64 type Control struct {
65         xapptweaks.XappWrapper
66         e2ap     *E2ap
67         registry *Registry
68         tracker  *Tracker
69         timerMap *TimerMap
70 }
71
72 type RMRMeid struct {
73         PlmnID  string
74         EnbID   string
75         RanName string
76 }
77
78 func init() {
79         xapp.Logger.Info("SUBMGR")
80         viper.AutomaticEnv()
81         viper.SetEnvPrefix("submgr")
82         viper.AllowEmptyEnv(true)
83 }
84
85 func NewControl() *Control {
86
87         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
89
90         registry := new(Registry)
91         registry.Initialize()
92         registry.rtmgrClient = &rtmgrClient
93
94         tracker := new(Tracker)
95         tracker.Init()
96
97         timerMap := new(TimerMap)
98         timerMap.Init()
99
100         c := &Control{e2ap: new(E2ap),
101                 registry: registry,
102                 tracker:  tracker,
103                 timerMap: timerMap,
104         }
105         c.XappWrapper.Init("")
106         return c
107 }
108
109 func (c *Control) ReadyCB(data interface{}) {
110         if c.Rmr == nil {
111                 c.Rmr = xapp.Rmr
112         }
113 }
114
115 func (c *Control) Run() {
116         xapp.SetReadyCB(c.ReadyCB, nil)
117         xapp.Run(c)
118 }
119
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121         params := xapptweaks.NewParams(nil)
122         params.Mtype = trans.GetMtype()
123         params.SubId = int(subs.GetReqId().Seq)
124         params.Xid = ""
125         params.Meid = subs.GetMeid()
126         params.Src = ""
127         params.PayloadLen = len(trans.Payload.Buf)
128         params.Payload = trans.Payload.Buf
129         params.Mbuf = nil
130         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131         return c.RmrSend(params)
132 }
133
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
135
136         params := xapptweaks.NewParams(nil)
137         params.Mtype = trans.GetMtype()
138         params.SubId = int(subs.GetReqId().Seq)
139         params.Xid = trans.GetXid()
140         params.Meid = trans.GetMeid()
141         params.Src = ""
142         params.PayloadLen = len(trans.Payload.Buf)
143         params.Payload = trans.Payload.Buf
144         params.Mbuf = nil
145         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146         return c.RmrSend(params)
147 }
148
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150         msg := xapptweaks.NewParams(params)
151         if c.Rmr == nil {
152                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153                 xapp.Logger.Error("%s", err.Error())
154                 return
155         }
156         c.CntRecvMsg++
157
158         defer c.Rmr.Free(msg.Mbuf)
159
160         switch msg.Mtype {
161         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162                 go c.handleXAPPSubscriptionRequest(msg)
163         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164                 go c.handleE2TSubscriptionResponse(msg)
165         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166                 go c.handleE2TSubscriptionFailure(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168                 go c.handleXAPPSubscriptionDeleteRequest(msg)
169         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170                 go c.handleE2TSubscriptionDeleteResponse(msg)
171         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172                 go c.handleE2TSubscriptionDeleteFailure(msg)
173         default:
174                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175         }
176         return
177 }
178
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183         xapp.Logger.Info("MSG from XAPP: %s", params.String())
184
185         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
186         if err != nil {
187                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
188                 return
189         }
190
191         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
192         if trans == nil {
193                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
194                 return
195         }
196         defer trans.Release()
197
198         err = c.tracker.Track(trans)
199         if err != nil {
200                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
201                 return
202         }
203
204         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
205         if err != nil {
206                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
207                 return
208         }
209
210         //
211         // Wake subs request
212         //
213         go c.handleSubscriptionCreate(subs, trans)
214         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
215
216         err = nil
217         if event != nil {
218                 switch themsg := event.(type) {
219                 case *e2ap.E2APSubscriptionResponse:
220                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
221                         if err == nil {
222                                 c.rmrSendToXapp("", subs, trans)
223                                 return
224                         }
225                 case *e2ap.E2APSubscriptionFailure:
226                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
227                         if err == nil {
228                                 c.rmrSendToXapp("", subs, trans)
229                         }
230                 default:
231                         break
232                 }
233         }
234         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
235         go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
236 }
237
238 //-------------------------------------------------------------------
239 // handle from XAPP Subscription Delete Request
240 //------------------------------------------------------------------
241 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
242         xapp.Logger.Info("MSG from XAPP: %s", params.String())
243
244         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
245         if err != nil {
246                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
247                 return
248         }
249
250         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
251         if trans == nil {
252                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
253                 return
254         }
255         defer trans.Release()
256
257         err = c.tracker.Track(trans)
258         if err != nil {
259                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
260                 return
261         }
262
263         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
264         if err != nil {
265                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
266                 return
267         }
268
269         //
270         // Wake subs delete
271         //
272         go c.handleSubscriptionDelete(subs, trans)
273         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
274
275         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
276
277         // Whatever is received send ok delete response
278         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
279         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
280         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
281         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
282         if err == nil {
283                 c.rmrSendToXapp("", subs, trans)
284         }
285
286         go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
287 }
288
289 //-------------------------------------------------------------------
290 // SUBS CREATE Handling
291 //-------------------------------------------------------------------
292 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
293
294         trans := c.tracker.NewSubsTransaction(subs)
295         subs.WaitTransactionTurn(trans)
296         defer subs.ReleaseTransactionTurn(trans)
297         defer trans.Release()
298
299         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
300
301         subRfMsg, valid := subs.GetCachedResponse()
302         if subRfMsg == nil && valid == true {
303                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
304                 switch event.(type) {
305                 case *e2ap.E2APSubscriptionResponse:
306                         subRfMsg, valid = subs.SetCachedResponse(event, true)
307                 case *e2ap.E2APSubscriptionFailure:
308                         subRfMsg, valid = subs.SetCachedResponse(event, false)
309                 default:
310                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
311                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
312                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
313                 }
314                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
315         } else {
316                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
317         }
318
319         parentTrans.SendEvent(subRfMsg, 0)
320 }
321
322 //-------------------------------------------------------------------
323 // SUBS DELETE Handling
324 //-------------------------------------------------------------------
325
326 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
327
328         trans := c.tracker.NewSubsTransaction(subs)
329         subs.WaitTransactionTurn(trans)
330         defer subs.ReleaseTransactionTurn(trans)
331         defer trans.Release()
332
333         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
334
335         subs.mutex.Lock()
336         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
337                 subs.valid = false
338                 subs.mutex.Unlock()
339                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
340         } else {
341                 subs.mutex.Unlock()
342         }
343
344         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
345         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
346         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
347         parentTrans.SendEvent(subDelRespMsg, 0)
348 }
349
350 //-------------------------------------------------------------------
351 // send to E2T Subscription Request
352 //-------------------------------------------------------------------
353 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
354         var err error
355         var event interface{} = nil
356         var timedOut bool = false
357
358         subReqMsg := subs.SubReqMsg
359         subReqMsg.RequestId = subs.GetReqId().RequestId
360         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
361         if err != nil {
362                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
363                 return event
364         }
365
366         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
367                 desc := fmt.Sprintf("(retry %d)", retries)
368                 c.rmrSendToE2T(desc, subs, trans)
369                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
370                 if timedOut {
371                         continue
372                 }
373                 break
374         }
375         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
376         return event
377 }
378
379 //-------------------------------------------------------------------
380 // send to E2T Subscription Delete Request
381 //-------------------------------------------------------------------
382
383 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
384         var err error
385         var event interface{}
386         var timedOut bool
387
388         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
389         subDelReqMsg.RequestId = subs.GetReqId().RequestId
390         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
391         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
392         if err != nil {
393                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
394                 return event
395         }
396
397         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
398                 desc := fmt.Sprintf("(retry %d)", retries)
399                 c.rmrSendToE2T(desc, subs, trans)
400                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
401                 if timedOut {
402                         continue
403                 }
404                 break
405         }
406         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
407         return event
408 }
409
410 //-------------------------------------------------------------------
411 // handle from E2T Subscription Reponse
412 //-------------------------------------------------------------------
413 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
414         xapp.Logger.Info("MSG from E2T: %s", params.String())
415         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
416         if err != nil {
417                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
418                 return
419         }
420         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
421         if err != nil {
422                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
423                 return
424         }
425         trans := subs.GetTransaction()
426         if trans == nil {
427                 err = fmt.Errorf("Ongoing transaction not found")
428                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
429                 return
430         }
431         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
432         if sendOk == false {
433                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
434                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
435         }
436         return
437 }
438
439 //-------------------------------------------------------------------
440 // handle from E2T Subscription Failure
441 //-------------------------------------------------------------------
442 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
443         xapp.Logger.Info("MSG from E2T: %s", params.String())
444         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
445         if err != nil {
446                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
447                 return
448         }
449         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
450         if err != nil {
451                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
452                 return
453         }
454         trans := subs.GetTransaction()
455         if trans == nil {
456                 err = fmt.Errorf("Ongoing transaction not found")
457                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
458                 return
459         }
460         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
461         if sendOk == false {
462                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
463                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
464         }
465         return
466 }
467
468 //-------------------------------------------------------------------
469 // handle from E2T Subscription Delete Response
470 //-------------------------------------------------------------------
471 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
472         xapp.Logger.Info("MSG from E2T: %s", params.String())
473         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
474         if err != nil {
475                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
476                 return
477         }
478         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
479         if err != nil {
480                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
481                 return
482         }
483         trans := subs.GetTransaction()
484         if trans == nil {
485                 err = fmt.Errorf("Ongoing transaction not found")
486                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
487                 return
488         }
489         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
490         if sendOk == false {
491                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
492                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
493         }
494         return
495 }
496
497 //-------------------------------------------------------------------
498 // handle from E2T Subscription Delete Failure
499 //-------------------------------------------------------------------
500 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
501         xapp.Logger.Info("MSG from E2T: %s", params.String())
502         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
503         if err != nil {
504                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
505                 return
506         }
507         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
508         if err != nil {
509                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
510                 return
511         }
512         trans := subs.GetTransaction()
513         if trans == nil {
514                 err = fmt.Errorf("Ongoing transaction not found")
515                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
516                 return
517         }
518         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
519         if sendOk == false {
520                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
521                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
522         }
523         return
524 }
525
526 //-------------------------------------------------------------------
527 //
528 //-------------------------------------------------------------------
529 func typeofSubsMessage(v interface{}) string {
530         if v == nil {
531                 return "NIL"
532         }
533         switch v.(type) {
534         case *e2ap.E2APSubscriptionRequest:
535                 return "SubReq"
536         case *e2ap.E2APSubscriptionResponse:
537                 return "SubResp"
538         case *e2ap.E2APSubscriptionFailure:
539                 return "SubFail"
540         case *e2ap.E2APSubscriptionDeleteRequest:
541                 return "SubDelReq"
542         case *e2ap.E2APSubscriptionDeleteResponse:
543                 return "SubDelResp"
544         case *e2ap.E2APSubscriptionDeleteFailure:
545                 return "SubDelFail"
546         default:
547                 return "Unknown"
548         }
549 }