Fixed function id handling and improved ut fail handling
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/strfmt"
29         "github.com/spf13/viper"
30         "sync"
31         "time"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 var e2tSubReqTimeout time.Duration = 5 * time.Second
39 var e2tSubDelReqTime time.Duration = 5 * time.Second
40 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
41 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
42
43 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
44
45 type Control struct {
46         e2ap         *E2ap
47         registry     *Registry
48         tracker      *Tracker
49         timerMap     *TimerMap
50         rmrSendMutex sync.Mutex
51         msgCounter   uint64
52 }
53
54 type RMRMeid struct {
55         PlmnID  string
56         EnbID   string
57         RanName string
58 }
59
60 const (
61         CREATE Action = 0
62         UPDATE Action = 1
63         NONE   Action = 2
64         DELETE Action = 3
65 )
66
67 func init() {
68         xapp.Logger.Info("SUBMGR")
69         viper.AutomaticEnv()
70         viper.SetEnvPrefix("submgr")
71         viper.AllowEmptyEnv(true)
72 }
73
74 func NewControl() *Control {
75
76         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
77         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
78
79         registry := new(Registry)
80         registry.Initialize()
81         registry.rtmgrClient = &rtmgrClient
82
83         tracker := new(Tracker)
84         tracker.Init()
85
86         timerMap := new(TimerMap)
87         timerMap.Init()
88
89         return &Control{e2ap: new(E2ap),
90                 registry:   registry,
91                 tracker:    tracker,
92                 timerMap:   timerMap,
93                 msgCounter: 0,
94         }
95 }
96
97 func (c *Control) Run() {
98         xapp.Run(c)
99 }
100
101 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
102
103         xapp.Logger.Info("%s: %s", desc, params.String())
104         status := false
105         i := 1
106         for ; i <= 10 && status == false; i++ {
107                 c.rmrSendMutex.Lock()
108                 status = xapp.Rmr.Send(params.RMRParams, false)
109                 c.rmrSendMutex.Unlock()
110                 if status == false {
111                         xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
112                         time.Sleep(500 * time.Millisecond)
113                 }
114         }
115         if status == false {
116                 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
117                 xapp.Logger.Error("%s: %s", desc, err.Error())
118                 xapp.Rmr.Free(params.Mbuf)
119         }
120         return
121 }
122
123 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
124         params := &RMRParams{&xapp.RMRParams{}}
125         params.Mtype = trans.GetMtype()
126         params.SubId = int(subs.GetSubId())
127         params.Xid = ""
128         params.Meid = subs.GetMeid()
129         params.Src = ""
130         params.PayloadLen = len(trans.Payload.Buf)
131         params.Payload = trans.Payload.Buf
132         params.Mbuf = nil
133
134         return c.rmrSendRaw(desc, params)
135 }
136
137 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
138         params := &RMRParams{&xapp.RMRParams{}}
139         params.Mtype = trans.GetMtype()
140         params.SubId = int(subs.GetSubId())
141         params.Xid = trans.GetXid()
142         params.Meid = trans.GetMeid()
143         params.Src = ""
144         params.PayloadLen = len(trans.Payload.Buf)
145         params.Payload = trans.Payload.Buf
146         params.Mbuf = nil
147
148         return c.rmrSendRaw(desc, params)
149 }
150
151 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
152         xapp.Rmr.Free(params.Mbuf)
153         params.Mbuf = nil
154         msg := &RMRParams{params}
155         c.msgCounter++
156         switch msg.Mtype {
157         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
158                 go c.handleXAPPSubscriptionRequest(msg)
159         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
160                 go c.handleE2TSubscriptionResponse(msg)
161         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
162                 go c.handleE2TSubscriptionFailure(msg)
163         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
164                 go c.handleXAPPSubscriptionDeleteRequest(msg)
165         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
166                 go c.handleE2TSubscriptionDeleteResponse(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
168                 go c.handleE2TSubscriptionDeleteFailure(msg)
169         default:
170                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
171         }
172
173         return nil
174 }
175 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
176         var retval string = ""
177         var filler string = ""
178         if trans != nil {
179                 retval += filler + trans.String()
180                 filler = " "
181         }
182         if subs != nil {
183                 retval += filler + subs.String()
184                 filler = " "
185         }
186         if err != nil {
187                 retval += filler + "err(" + err.Error() + ")"
188                 filler = " "
189
190         }
191         return retval
192 }
193
194 //-------------------------------------------------------------------
195 // handle from XAPP Subscription Request
196 //------------------------------------------------------------------
197 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
198         xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
199
200         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
201         if err != nil {
202                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
203                 return
204         }
205
206         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
207         if err != nil {
208                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
209                 return
210         }
211         defer trans.Release()
212
213         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
214         if err != nil {
215                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
216                 return
217         }
218
219         if subs.IsTransactionReserved() {
220                 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
221                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
222                 return
223         }
224
225         //
226         // Wake subs request
227         //
228         go c.handleSubscriptionCreate(subs, trans)
229         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
230
231         err = nil
232         if event != nil {
233                 switch themsg := event.(type) {
234                 case *e2ap.E2APSubscriptionResponse:
235                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
236                         if err == nil {
237                                 c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
238                                 return
239                         }
240                 case *e2ap.E2APSubscriptionFailure:
241                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
242                         if err == nil {
243                                 c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
244                         }
245                         return
246                 default:
247                         break
248                 }
249         }
250         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
251 }
252
253 //-------------------------------------------------------------------
254 // handle from XAPP Subscription Delete Request
255 //------------------------------------------------------------------
256 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
257         xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
258
259         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
260         if err != nil {
261                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
262                 return
263         }
264
265         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
266         if err != nil {
267                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
268                 return
269         }
270         defer trans.Release()
271
272         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
273         if err != nil {
274                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
275                 return
276         }
277
278         if subs.IsTransactionReserved() {
279                 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
280                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
281                 return
282         }
283
284         //
285         // Wake subs delete
286         //
287         go c.handleSubscriptionDelete(subs, trans)
288         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
289
290         // Whatever is received send ok delete response
291         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
292         subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
293         subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
294         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
295         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
296         if err == nil {
297                 c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
298         }
299 }
300
301 //-------------------------------------------------------------------
302 // SUBS CREATE Handling
303 //-------------------------------------------------------------------
304 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
305
306         trans := c.tracker.NewTransaction(subs.GetMeid())
307         subs.WaitTransactionTurn(trans)
308         defer subs.ReleaseTransactionTurn(trans)
309         defer trans.Release()
310
311         xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
312
313         if subs.SubRespMsg != nil {
314                 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
315                 parentTrans.SendEvent(subs.SubRespMsg, 0)
316                 return
317         }
318
319         event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
320         switch themsg := event.(type) {
321         case *e2ap.E2APSubscriptionResponse:
322                 subs.SubRespMsg = themsg
323                 parentTrans.SendEvent(event, 0)
324                 return
325         case *e2ap.E2APSubscriptionFailure:
326                 //TODO: Possible delete and one retry for subs req
327                 parentTrans.SendEvent(event, 0)
328         default:
329                 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
330                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
331                 parentTrans.SendEvent(nil, 0)
332         }
333
334         go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
335 }
336
337 //-------------------------------------------------------------------
338 // SUBS DELETE Handling
339 //-------------------------------------------------------------------
340 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
341
342         trans := c.tracker.NewTransaction(subs.GetMeid())
343         subs.WaitTransactionTurn(trans)
344         defer subs.ReleaseTransactionTurn(trans)
345         defer trans.Release()
346
347         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
348
349         event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
350
351         parentTrans.SendEvent(event, 0)
352         go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
353 }
354
355 //-------------------------------------------------------------------
356 // send to E2T Subscription Request
357 //-------------------------------------------------------------------
358 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
359         var err error
360         var event interface{} = nil
361         var timedOut bool = false
362
363         subReqMsg := subs.SubReqMsg
364         subReqMsg.RequestId.Id = 123
365         subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
366         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
367         if err != nil {
368                 xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
369                 return event
370         }
371
372         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
373                 desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
374                 c.rmrSend(desc, subs, trans)
375                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
376                 if timedOut {
377                         continue
378                 }
379                 break
380         }
381         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
382         return event
383 }
384
385 //-------------------------------------------------------------------
386 // send to E2T Subscription Delete Request
387 //-------------------------------------------------------------------
388
389 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
390         var err error
391         var event interface{}
392         var timedOut bool
393
394         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
395         subDelReqMsg.RequestId.Id = 123
396         subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
397         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
398         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
399         if err != nil {
400                 xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
401                 return event
402         }
403
404         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
405                 desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
406                 c.rmrSend(desc, subs, trans)
407                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
408                 if timedOut {
409                         continue
410                 }
411                 break
412         }
413         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
414         return event
415 }
416
417 //-------------------------------------------------------------------
418 // handle from E2T Subscription Reponse
419 //-------------------------------------------------------------------
420 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
421         xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
422         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
423         if err != nil {
424                 xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
425                 return
426         }
427         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
428         if err != nil {
429                 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
430                 return
431         }
432         trans := subs.GetTransaction()
433         if trans == nil {
434                 err = fmt.Errorf("Ongoing transaction not found")
435                 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
436                 return
437         }
438         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
439         if sendOk == false {
440                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
441                 xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
442         }
443         return
444 }
445
446 //-------------------------------------------------------------------
447 // handle from E2T Subscription Failure
448 //-------------------------------------------------------------------
449 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
450         xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
451         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
452         if err != nil {
453                 xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
454                 return
455         }
456         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
457         if err != nil {
458                 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
459                 return
460         }
461         trans := subs.GetTransaction()
462         if trans == nil {
463                 err = fmt.Errorf("Ongoing transaction not found")
464                 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
465                 return
466         }
467         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
468         if sendOk == false {
469                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
470                 xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
471         }
472         return
473 }
474
475 //-------------------------------------------------------------------
476 // handle from E2T Subscription Delete Response
477 //-------------------------------------------------------------------
478 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
479         xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
480         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
481         if err != nil {
482                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
483                 return
484         }
485         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
486         if err != nil {
487                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
488                 return
489         }
490         trans := subs.GetTransaction()
491         if trans == nil {
492                 err = fmt.Errorf("Ongoing transaction not found")
493                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
494                 return
495         }
496         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
497         if sendOk == false {
498                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
499                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
500         }
501         return
502 }
503
504 //-------------------------------------------------------------------
505 // handle from E2T Subscription Delete Failure
506 //-------------------------------------------------------------------
507 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
508         xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
509         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
510         if err != nil {
511                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
512                 return
513         }
514         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
515         if err != nil {
516                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
517                 return
518         }
519         trans := subs.GetTransaction()
520         if trans == nil {
521                 err = fmt.Errorf("Ongoing transaction not found")
522                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
523                 return
524         }
525         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
526         if sendOk == false {
527                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
528                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
529         }
530         return
531 }
532
533 //-------------------------------------------------------------------
534 //
535 //-------------------------------------------------------------------
536 func typeofSubsMessage(v interface{}) string {
537         if v == nil {
538                 return "NIL"
539         }
540         switch v.(type) {
541         case *e2ap.E2APSubscriptionRequest:
542                 return "SubReq"
543         case *e2ap.E2APSubscriptionResponse:
544                 return "SubResp"
545         case *e2ap.E2APSubscriptionFailure:
546                 return "SubFail"
547         case *e2ap.E2APSubscriptionDeleteRequest:
548                 return "SubDelReq"
549         case *e2ap.E2APSubscriptionDeleteResponse:
550                 return "SubDelResp"
551         case *e2ap.E2APSubscriptionDeleteFailure:
552                 return "SubDelFail"
553         default:
554                 return "Unknown"
555         }
556 }