1d64f3c64bbf17c77fd2be1235800275b732bbd2
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/strfmt"
29         "github.com/spf13/viper"
30         "sync"
31         "time"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 var e2tSubReqTimeout time.Duration = 5 * time.Second
39 var e2tSubDelReqTime time.Duration = 5 * time.Second
40 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
41 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
42
43 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
44
45 type Control struct {
46         e2ap         *E2ap
47         registry     *Registry
48         tracker      *Tracker
49         timerMap     *TimerMap
50         rmrSendMutex sync.Mutex
51         msgCounter   uint64
52 }
53
54 type RMRMeid struct {
55         PlmnID  string
56         EnbID   string
57         RanName string
58 }
59
60 func init() {
61         xapp.Logger.Info("SUBMGR")
62         viper.AutomaticEnv()
63         viper.SetEnvPrefix("submgr")
64         viper.AllowEmptyEnv(true)
65 }
66
67 func NewControl() *Control {
68
69         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
70         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
71
72         registry := new(Registry)
73         registry.Initialize()
74         registry.rtmgrClient = &rtmgrClient
75
76         tracker := new(Tracker)
77         tracker.Init()
78
79         timerMap := new(TimerMap)
80         timerMap.Init()
81
82         return &Control{e2ap: new(E2ap),
83                 registry:   registry,
84                 tracker:    tracker,
85                 timerMap:   timerMap,
86                 msgCounter: 0,
87         }
88 }
89
90 func (c *Control) Run() {
91         xapp.Run(c)
92 }
93
94 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
95
96         xapp.Logger.Info("%s: %s", desc, params.String())
97         status := false
98         i := 1
99         for ; i <= 10 && status == false; i++ {
100                 c.rmrSendMutex.Lock()
101                 status = xapp.Rmr.Send(params.RMRParams, false)
102                 c.rmrSendMutex.Unlock()
103                 if status == false {
104                         xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
105                         time.Sleep(500 * time.Millisecond)
106                 }
107         }
108         if status == false {
109                 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
110                 xapp.Logger.Error("%s: %s", desc, err.Error())
111                 xapp.Rmr.Free(params.Mbuf)
112         }
113         return
114 }
115
116 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
117         params := &RMRParams{&xapp.RMRParams{}}
118         params.Mtype = trans.GetMtype()
119         params.SubId = int(subs.GetReqId().Seq)
120         params.Xid = ""
121         params.Meid = subs.GetMeid()
122         params.Src = ""
123         params.PayloadLen = len(trans.Payload.Buf)
124         params.Payload = trans.Payload.Buf
125         params.Mbuf = nil
126
127         return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params)
128 }
129
130 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
131
132         params := &RMRParams{&xapp.RMRParams{}}
133         params.Mtype = trans.GetMtype()
134         params.SubId = int(subs.GetReqId().Seq)
135         params.Xid = trans.GetXid()
136         params.Meid = trans.GetMeid()
137         params.Src = ""
138         params.PayloadLen = len(trans.Payload.Buf)
139         params.Payload = trans.Payload.Buf
140         params.Mbuf = nil
141
142         return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params)
143 }
144
145 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
146         xapp.Rmr.Free(params.Mbuf)
147         params.Mbuf = nil
148         msg := &RMRParams{params}
149         c.msgCounter++
150
151         switch msg.Mtype {
152         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
153                 go c.handleXAPPSubscriptionRequest(msg)
154         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
155                 go c.handleE2TSubscriptionResponse(msg)
156         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
157                 go c.handleE2TSubscriptionFailure(msg)
158         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
159                 go c.handleXAPPSubscriptionDeleteRequest(msg)
160         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
161                 go c.handleE2TSubscriptionDeleteResponse(msg)
162         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
163                 go c.handleE2TSubscriptionDeleteFailure(msg)
164         default:
165                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
166         }
167
168         return nil
169 }
170
171 func idstring(err error, entries ...fmt.Stringer) string {
172         var retval string = ""
173         var filler string = ""
174         for _, entry := range entries {
175                 retval += filler + entry.String()
176                 filler = " "
177         }
178         if err != nil {
179                 retval += filler + "err(" + err.Error() + ")"
180                 filler = " "
181
182         }
183         return retval
184 }
185
186 //-------------------------------------------------------------------
187 // handle from XAPP Subscription Request
188 //------------------------------------------------------------------
189 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
190         xapp.Logger.Info("MSG from XAPP: %s", params.String())
191
192         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
193         if err != nil {
194                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
195                 return
196         }
197
198         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
199         if trans == nil {
200                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
201                 return
202         }
203         defer trans.Release()
204
205         err = c.tracker.Track(trans)
206         if err != nil {
207                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
208                 return
209         }
210
211         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
212         if err != nil {
213                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
214                 return
215         }
216
217         //
218         // Wake subs request
219         //
220         go c.handleSubscriptionCreate(subs, trans)
221         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
222
223         err = nil
224         if event != nil {
225                 switch themsg := event.(type) {
226                 case *e2ap.E2APSubscriptionResponse:
227                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
228                         if err == nil {
229                                 c.rmrSendToXapp("", subs, trans)
230                                 return
231                         }
232                 case *e2ap.E2APSubscriptionFailure:
233                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
234                         if err == nil {
235                                 c.rmrSendToXapp("", subs, trans)
236                         }
237                 default:
238                         break
239                 }
240         }
241         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
242         go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
243 }
244
245 //-------------------------------------------------------------------
246 // handle from XAPP Subscription Delete Request
247 //------------------------------------------------------------------
248 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
249         xapp.Logger.Info("MSG from XAPP: %s", params.String())
250
251         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
252         if err != nil {
253                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
254                 return
255         }
256
257         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
258         if trans == nil {
259                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
260                 return
261         }
262         defer trans.Release()
263
264         err = c.tracker.Track(trans)
265         if err != nil {
266                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
267                 return
268         }
269
270         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
271         if err != nil {
272                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
273                 return
274         }
275
276         //
277         // Wake subs delete
278         //
279         go c.handleSubscriptionDelete(subs, trans)
280         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
281
282         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
283
284         // Whatever is received send ok delete response
285         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
286         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
287         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
288         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
289         if err == nil {
290                 c.rmrSendToXapp("", subs, trans)
291         }
292
293         go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
294 }
295
296 //-------------------------------------------------------------------
297 // SUBS CREATE Handling
298 //-------------------------------------------------------------------
299 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
300
301         trans := c.tracker.NewSubsTransaction(subs)
302         subs.WaitTransactionTurn(trans)
303         defer subs.ReleaseTransactionTurn(trans)
304         defer trans.Release()
305
306         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
307
308         subRfMsg, valid := subs.GetCachedResponse()
309         if subRfMsg == nil && valid == true {
310                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
311                 switch event.(type) {
312                 case *e2ap.E2APSubscriptionResponse:
313                         subRfMsg, valid = subs.SetCachedResponse(event, true)
314                 case *e2ap.E2APSubscriptionFailure:
315                         subRfMsg, valid = subs.SetCachedResponse(event, false)
316                 default:
317                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
318                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
319                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
320                 }
321                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
322         } else {
323                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
324         }
325
326         parentTrans.SendEvent(subRfMsg, 0)
327 }
328
329 //-------------------------------------------------------------------
330 // SUBS DELETE Handling
331 //-------------------------------------------------------------------
332
333 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
334
335         trans := c.tracker.NewSubsTransaction(subs)
336         subs.WaitTransactionTurn(trans)
337         defer subs.ReleaseTransactionTurn(trans)
338         defer trans.Release()
339
340         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
341
342         subs.mutex.Lock()
343         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
344                 subs.valid = false
345                 subs.mutex.Unlock()
346                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
347         } else {
348                 subs.mutex.Unlock()
349         }
350
351         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
352         subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
353         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
354         parentTrans.SendEvent(subDelRespMsg, 0)
355 }
356
357 //-------------------------------------------------------------------
358 // send to E2T Subscription Request
359 //-------------------------------------------------------------------
360 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
361         var err error
362         var event interface{} = nil
363         var timedOut bool = false
364
365         subReqMsg := subs.SubReqMsg
366         subReqMsg.RequestId = subs.GetReqId().RequestId
367         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
368         if err != nil {
369                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
370                 return event
371         }
372
373         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
374                 desc := fmt.Sprintf("(retry %d)", retries)
375                 c.rmrSendToE2T(desc, subs, trans)
376                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
377                 if timedOut {
378                         continue
379                 }
380                 break
381         }
382         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
383         return event
384 }
385
386 //-------------------------------------------------------------------
387 // send to E2T Subscription Delete Request
388 //-------------------------------------------------------------------
389
390 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
391         var err error
392         var event interface{}
393         var timedOut bool
394
395         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
396         subDelReqMsg.RequestId = subs.GetReqId().RequestId
397         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
398         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
399         if err != nil {
400                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
401                 return event
402         }
403
404         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
405                 desc := fmt.Sprintf("(retry %d)", retries)
406                 c.rmrSendToE2T(desc, subs, trans)
407                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
408                 if timedOut {
409                         continue
410                 }
411                 break
412         }
413         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
414         return event
415 }
416
417 //-------------------------------------------------------------------
418 // handle from E2T Subscription Reponse
419 //-------------------------------------------------------------------
420 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
421         xapp.Logger.Info("MSG from E2T: %s", params.String())
422         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
423         if err != nil {
424                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
425                 return
426         }
427         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
428         if err != nil {
429                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
430                 return
431         }
432         trans := subs.GetTransaction()
433         if trans == nil {
434                 err = fmt.Errorf("Ongoing transaction not found")
435                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
436                 return
437         }
438         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
439         if sendOk == false {
440                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
441                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
442         }
443         return
444 }
445
446 //-------------------------------------------------------------------
447 // handle from E2T Subscription Failure
448 //-------------------------------------------------------------------
449 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
450         xapp.Logger.Info("MSG from E2T: %s", params.String())
451         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
452         if err != nil {
453                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
454                 return
455         }
456         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
457         if err != nil {
458                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
459                 return
460         }
461         trans := subs.GetTransaction()
462         if trans == nil {
463                 err = fmt.Errorf("Ongoing transaction not found")
464                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
465                 return
466         }
467         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
468         if sendOk == false {
469                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
470                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
471         }
472         return
473 }
474
475 //-------------------------------------------------------------------
476 // handle from E2T Subscription Delete Response
477 //-------------------------------------------------------------------
478 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
479         xapp.Logger.Info("MSG from E2T: %s", params.String())
480         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
481         if err != nil {
482                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
483                 return
484         }
485         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
486         if err != nil {
487                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
488                 return
489         }
490         trans := subs.GetTransaction()
491         if trans == nil {
492                 err = fmt.Errorf("Ongoing transaction not found")
493                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
494                 return
495         }
496         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
497         if sendOk == false {
498                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
499                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
500         }
501         return
502 }
503
504 //-------------------------------------------------------------------
505 // handle from E2T Subscription Delete Failure
506 //-------------------------------------------------------------------
507 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
508         xapp.Logger.Info("MSG from E2T: %s", params.String())
509         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
510         if err != nil {
511                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
512                 return
513         }
514         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
515         if err != nil {
516                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
517                 return
518         }
519         trans := subs.GetTransaction()
520         if trans == nil {
521                 err = fmt.Errorf("Ongoing transaction not found")
522                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
523                 return
524         }
525         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
526         if sendOk == false {
527                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
528                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
529         }
530         return
531 }
532
533 //-------------------------------------------------------------------
534 //
535 //-------------------------------------------------------------------
536 func typeofSubsMessage(v interface{}) string {
537         if v == nil {
538                 return "NIL"
539         }
540         switch v.(type) {
541         case *e2ap.E2APSubscriptionRequest:
542                 return "SubReq"
543         case *e2ap.E2APSubscriptionResponse:
544                 return "SubResp"
545         case *e2ap.E2APSubscriptionFailure:
546                 return "SubFail"
547         case *e2ap.E2APSubscriptionDeleteRequest:
548                 return "SubDelReq"
549         case *e2ap.E2APSubscriptionDeleteResponse:
550                 return "SubDelResp"
551         case *e2ap.E2APSubscriptionDeleteFailure:
552                 return "SubDelFail"
553         default:
554                 return "Unknown"
555         }
556 }