2d7d535e7e099937667a25334fef95c1143bebe5
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         httptransport "github.com/go-openapi/runtime/client"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "time"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 func idstring(err error, entries ...fmt.Stringer) string {
39         var retval string = ""
40         var filler string = ""
41         for _, entry := range entries {
42                 retval += filler + entry.String()
43                 filler = " "
44         }
45         if err != nil {
46                 retval += filler + "err(" + err.Error() + ")"
47                 filler = " "
48
49         }
50         return retval
51 }
52
53 //-----------------------------------------------------------------------------
54 //
55 //-----------------------------------------------------------------------------
56
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
61
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
63
64 type Control struct {
65         xapptweaks.XappWrapper
66         e2ap     *E2ap
67         registry *Registry
68         tracker  *Tracker
69         timerMap *TimerMap
70 }
71
72 type RMRMeid struct {
73         PlmnID  string
74         EnbID   string
75         RanName string
76 }
77
78 func init() {
79         xapp.Logger.Info("SUBMGR")
80         viper.AutomaticEnv()
81         viper.SetEnvPrefix("submgr")
82         viper.AllowEmptyEnv(true)
83 }
84
85 func NewControl() *Control {
86
87         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
89
90         registry := new(Registry)
91         registry.Initialize()
92         registry.rtmgrClient = &rtmgrClient
93
94         tracker := new(Tracker)
95         tracker.Init()
96
97         timerMap := new(TimerMap)
98         timerMap.Init()
99
100         c := &Control{e2ap: new(E2ap),
101                 registry: registry,
102                 tracker:  tracker,
103                 timerMap: timerMap,
104         }
105         c.XappWrapper.Init("")
106         return c
107 }
108
109 func (c *Control) ReadyCB(data interface{}) {
110         if c.Rmr == nil {
111                 c.Rmr = xapp.Rmr
112         }
113 }
114
115 func (c *Control) Run() {
116         xapp.SetReadyCB(c.ReadyCB, nil)
117         xapp.Run(c)
118 }
119
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121         params := xapptweaks.NewParams(nil)
122         params.Mtype = trans.GetMtype()
123         params.SubId = int(subs.GetReqId().Seq)
124         params.Xid = ""
125         params.Meid = subs.GetMeid()
126         params.Src = ""
127         params.PayloadLen = len(trans.Payload.Buf)
128         params.Payload = trans.Payload.Buf
129         params.Mbuf = nil
130         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131         return c.RmrSend(params)
132 }
133
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
135
136         params := xapptweaks.NewParams(nil)
137         params.Mtype = trans.GetMtype()
138         params.SubId = int(subs.GetReqId().Seq)
139         params.Xid = trans.GetXid()
140         params.Meid = trans.GetMeid()
141         params.Src = ""
142         params.PayloadLen = len(trans.Payload.Buf)
143         params.Payload = trans.Payload.Buf
144         params.Mbuf = nil
145         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146         return c.RmrSend(params)
147 }
148
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150         msg := xapptweaks.NewParams(params)
151         if c.Rmr == nil {
152                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153                 xapp.Logger.Error("%s", err.Error())
154                 return
155         }
156         c.CntRecvMsg++
157
158         defer c.Rmr.Free(msg.Mbuf)
159
160         switch msg.Mtype {
161         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162                 go c.handleXAPPSubscriptionRequest(msg)
163         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164                 go c.handleE2TSubscriptionResponse(msg)
165         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166                 go c.handleE2TSubscriptionFailure(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168                 go c.handleXAPPSubscriptionDeleteRequest(msg)
169         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170                 go c.handleE2TSubscriptionDeleteResponse(msg)
171         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172                 go c.handleE2TSubscriptionDeleteFailure(msg)
173         default:
174                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175         }
176         return
177 }
178
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183         xapp.Logger.Info("MSG from XAPP: %s", params.String())
184
185         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
186         if err != nil {
187                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
188                 return
189         }
190
191         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
192         if trans == nil {
193                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
194                 return
195         }
196         defer trans.Release()
197
198         err = c.tracker.Track(trans)
199         if err != nil {
200                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
201                 return
202         }
203
204         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
205         if err != nil {
206                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
207                 return
208         }
209
210         //
211         // Wake subs request
212         //
213         go c.handleSubscriptionCreate(subs, trans)
214         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
215
216         err = nil
217         if event != nil {
218                 switch themsg := event.(type) {
219                 case *e2ap.E2APSubscriptionResponse:
220                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
221                         if err == nil {
222                                 trans.Release()
223                                 c.rmrSendToXapp("", subs, trans)
224                                 return
225                         }
226                 case *e2ap.E2APSubscriptionFailure:
227                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
228                         if err == nil {
229                                 c.rmrSendToXapp("", subs, trans)
230                         }
231                 default:
232                         break
233                 }
234         }
235         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
236         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
237 }
238
239 //-------------------------------------------------------------------
240 // handle from XAPP Subscription Delete Request
241 //------------------------------------------------------------------
242 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
243         xapp.Logger.Info("MSG from XAPP: %s", params.String())
244
245         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
246         if err != nil {
247                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
248                 return
249         }
250
251         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
252         if trans == nil {
253                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
254                 return
255         }
256         defer trans.Release()
257
258         err = c.tracker.Track(trans)
259         if err != nil {
260                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
261                 return
262         }
263
264         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
265         if err != nil {
266                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
267                 return
268         }
269
270         //
271         // Wake subs delete
272         //
273         go c.handleSubscriptionDelete(subs, trans)
274         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
275
276         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
277
278         // Whatever is received send ok delete response
279         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
280         subDelRespMsg.RequestId = subs.GetReqId().RequestId
281         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
282         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
283         if err == nil {
284                 c.rmrSendToXapp("", subs, trans)
285         }
286
287         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
288 }
289
290 //-------------------------------------------------------------------
291 // SUBS CREATE Handling
292 //-------------------------------------------------------------------
293 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
294
295         trans := c.tracker.NewSubsTransaction(subs)
296         subs.WaitTransactionTurn(trans)
297         defer subs.ReleaseTransactionTurn(trans)
298         defer trans.Release()
299
300         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
301
302         subRfMsg, valid := subs.GetCachedResponse()
303         if subRfMsg == nil && valid == true {
304                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
305                 switch event.(type) {
306                 case *e2ap.E2APSubscriptionResponse:
307                         subRfMsg, valid = subs.SetCachedResponse(event, true)
308                 case *e2ap.E2APSubscriptionFailure:
309                         subRfMsg, valid = subs.SetCachedResponse(event, false)
310                 default:
311                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
312                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
313                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
314                 }
315                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
316         } else {
317                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
318         }
319
320         parentTrans.SendEvent(subRfMsg, 0)
321 }
322
323 //-------------------------------------------------------------------
324 // SUBS DELETE Handling
325 //-------------------------------------------------------------------
326
327 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
328
329         trans := c.tracker.NewSubsTransaction(subs)
330         subs.WaitTransactionTurn(trans)
331         defer subs.ReleaseTransactionTurn(trans)
332         defer trans.Release()
333
334         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
335
336         subs.mutex.Lock()
337         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
338                 subs.valid = false
339                 subs.mutex.Unlock()
340                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
341         } else {
342                 subs.mutex.Unlock()
343         }
344
345         parentTrans.SendEvent(nil, 0)
346 }
347
348 //-------------------------------------------------------------------
349 // send to E2T Subscription Request
350 //-------------------------------------------------------------------
351 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
352         var err error
353         var event interface{} = nil
354         var timedOut bool = false
355
356         subReqMsg := subs.SubReqMsg
357         subReqMsg.RequestId = subs.GetReqId().RequestId
358         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
359         if err != nil {
360                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
361                 return event
362         }
363
364         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
365                 desc := fmt.Sprintf("(retry %d)", retries)
366                 c.rmrSendToE2T(desc, subs, trans)
367                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
368                 if timedOut {
369                         continue
370                 }
371                 break
372         }
373         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
374         return event
375 }
376
377 //-------------------------------------------------------------------
378 // send to E2T Subscription Delete Request
379 //-------------------------------------------------------------------
380
381 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
382         var err error
383         var event interface{}
384         var timedOut bool
385
386         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
387         subDelReqMsg.RequestId = subs.GetReqId().RequestId
388         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
389         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
390         if err != nil {
391                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
392                 return event
393         }
394
395         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
396                 desc := fmt.Sprintf("(retry %d)", retries)
397                 c.rmrSendToE2T(desc, subs, trans)
398                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
399                 if timedOut {
400                         continue
401                 }
402                 break
403         }
404         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
405         return event
406 }
407
408 //-------------------------------------------------------------------
409 // handle from E2T Subscription Reponse
410 //-------------------------------------------------------------------
411 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
412         xapp.Logger.Info("MSG from E2T: %s", params.String())
413         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
414         if err != nil {
415                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
416                 return
417         }
418         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
419         if err != nil {
420                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
421                 return
422         }
423         trans := subs.GetTransaction()
424         if trans == nil {
425                 err = fmt.Errorf("Ongoing transaction not found")
426                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
427                 return
428         }
429         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
430         if sendOk == false {
431                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
432                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
433         }
434         return
435 }
436
437 //-------------------------------------------------------------------
438 // handle from E2T Subscription Failure
439 //-------------------------------------------------------------------
440 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
441         xapp.Logger.Info("MSG from E2T: %s", params.String())
442         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
443         if err != nil {
444                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
445                 return
446         }
447         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
448         if err != nil {
449                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
450                 return
451         }
452         trans := subs.GetTransaction()
453         if trans == nil {
454                 err = fmt.Errorf("Ongoing transaction not found")
455                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
456                 return
457         }
458         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
459         if sendOk == false {
460                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
461                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
462         }
463         return
464 }
465
466 //-------------------------------------------------------------------
467 // handle from E2T Subscription Delete Response
468 //-------------------------------------------------------------------
469 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
470         xapp.Logger.Info("MSG from E2T: %s", params.String())
471         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
472         if err != nil {
473                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
474                 return
475         }
476         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
477         if err != nil {
478                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
479                 return
480         }
481         trans := subs.GetTransaction()
482         if trans == nil {
483                 err = fmt.Errorf("Ongoing transaction not found")
484                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
485                 return
486         }
487         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
488         if sendOk == false {
489                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
490                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
491         }
492         return
493 }
494
495 //-------------------------------------------------------------------
496 // handle from E2T Subscription Delete Failure
497 //-------------------------------------------------------------------
498 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
499         xapp.Logger.Info("MSG from E2T: %s", params.String())
500         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
501         if err != nil {
502                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
503                 return
504         }
505         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
506         if err != nil {
507                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
508                 return
509         }
510         trans := subs.GetTransaction()
511         if trans == nil {
512                 err = fmt.Errorf("Ongoing transaction not found")
513                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
514                 return
515         }
516         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
517         if sendOk == false {
518                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
519                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
520         }
521         return
522 }
523
524 //-------------------------------------------------------------------
525 //
526 //-------------------------------------------------------------------
527 func typeofSubsMessage(v interface{}) string {
528         if v == nil {
529                 return "NIL"
530         }
531         switch v.(type) {
532         case *e2ap.E2APSubscriptionRequest:
533                 return "SubReq"
534         case *e2ap.E2APSubscriptionResponse:
535                 return "SubResp"
536         case *e2ap.E2APSubscriptionFailure:
537                 return "SubFail"
538         case *e2ap.E2APSubscriptionDeleteRequest:
539                 return "SubDelReq"
540         case *e2ap.E2APSubscriptionDeleteResponse:
541                 return "SubDelResp"
542         case *e2ap.E2APSubscriptionDeleteFailure:
543                 return "SubDelFail"
544         default:
545                 return "Unknown"
546         }
547 }