RIC-79 intial implementation to fetch subscriptions via rest
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29         httptransport "github.com/go-openapi/runtime/client"
30         "github.com/go-openapi/strfmt"
31         "github.com/spf13/viper"
32         "time"
33 )
34
35 //-----------------------------------------------------------------------------
36 //
37 //-----------------------------------------------------------------------------
38
39 func idstring(err error, entries ...fmt.Stringer) string {
40         var retval string = ""
41         var filler string = ""
42         for _, entry := range entries {
43                 retval += filler + entry.String()
44                 filler = " "
45         }
46         if err != nil {
47                 retval += filler + "err(" + err.Error() + ")"
48                 filler = " "
49
50         }
51         return retval
52 }
53
54 //-----------------------------------------------------------------------------
55 //
56 //-----------------------------------------------------------------------------
57
58 var e2tSubReqTimeout time.Duration = 5 * time.Second
59 var e2tSubDelReqTime time.Duration = 5 * time.Second
60 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
62
63 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
64
65 type Control struct {
66         xapptweaks.XappWrapper
67         e2ap     *E2ap
68         registry *Registry
69         tracker  *Tracker
70         //subscriber *xapp.Subscriber
71 }
72
73 type RMRMeid struct {
74         PlmnID  string
75         EnbID   string
76         RanName string
77 }
78
79 func init() {
80         xapp.Logger.Info("SUBMGR")
81         viper.AutomaticEnv()
82         viper.SetEnvPrefix("submgr")
83         viper.AllowEmptyEnv(true)
84 }
85
86 func NewControl() *Control {
87
88         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
90
91         registry := new(Registry)
92         registry.Initialize()
93         registry.rtmgrClient = &rtmgrClient
94
95         tracker := new(Tracker)
96         tracker.Init()
97
98         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
99
100         c := &Control{e2ap: new(E2ap),
101                 registry: registry,
102                 tracker:  tracker,
103                 //subscriber: subscriber,
104         }
105         c.XappWrapper.Init("")
106         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
107         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
108         return c
109 }
110
111 func (c *Control) ReadyCB(data interface{}) {
112         if c.Rmr == nil {
113                 c.Rmr = xapp.Rmr
114         }
115 }
116
117 func (c *Control) Run() {
118         xapp.SetReadyCB(c.ReadyCB, nil)
119         xapp.Run(c)
120 }
121
122 //-------------------------------------------------------------------
123 //
124 //-------------------------------------------------------------------
125 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
126         /*
127            switch p := params.(type) {
128            case *models.ReportParams:
129                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
130                if trans == nil {
131                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
132                      return
133                }
134                defer trans.Release()
135            case *models.ControlParams:
136            case *models.PolicyParams:
137            }
138         */
139         return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
140 }
141
142 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
143         return c.registry.QueryHandler()
144 }
145
146 //-------------------------------------------------------------------
147 //
148 //-------------------------------------------------------------------
149
150 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
151         params := xapptweaks.NewParams(nil)
152         params.Mtype = trans.GetMtype()
153         params.SubId = int(subs.GetReqId().Seq)
154         params.Xid = ""
155         params.Meid = subs.GetMeid()
156         params.Src = ""
157         params.PayloadLen = len(trans.Payload.Buf)
158         params.Payload = trans.Payload.Buf
159         params.Mbuf = nil
160         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
161         return c.RmrSend(params)
162 }
163
164 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
165
166         params := xapptweaks.NewParams(nil)
167         params.Mtype = trans.GetMtype()
168         params.SubId = int(subs.GetReqId().Seq)
169         params.Xid = trans.GetXid()
170         params.Meid = trans.GetMeid()
171         params.Src = ""
172         params.PayloadLen = len(trans.Payload.Buf)
173         params.Payload = trans.Payload.Buf
174         params.Mbuf = nil
175         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
176         return c.RmrSend(params)
177 }
178
179 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
180         msg := xapptweaks.NewParams(params)
181         if c.Rmr == nil {
182                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
183                 xapp.Logger.Error("%s", err.Error())
184                 return
185         }
186         c.CntRecvMsg++
187
188         defer c.Rmr.Free(msg.Mbuf)
189
190         switch msg.Mtype {
191         case xapp.RIC_SUB_REQ:
192                 go c.handleXAPPSubscriptionRequest(msg)
193         case xapp.RIC_SUB_RESP:
194                 go c.handleE2TSubscriptionResponse(msg)
195         case xapp.RIC_SUB_FAILURE:
196                 go c.handleE2TSubscriptionFailure(msg)
197         case xapp.RIC_SUB_DEL_REQ:
198                 go c.handleXAPPSubscriptionDeleteRequest(msg)
199         case xapp.RIC_SUB_DEL_RESP:
200                 go c.handleE2TSubscriptionDeleteResponse(msg)
201         case xapp.RIC_SUB_DEL_FAILURE:
202                 go c.handleE2TSubscriptionDeleteFailure(msg)
203         default:
204                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
205         }
206         return
207 }
208
209 //-------------------------------------------------------------------
210 // handle from XAPP Subscription Request
211 //------------------------------------------------------------------
212 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
213         xapp.Logger.Info("MSG from XAPP: %s", params.String())
214
215         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
216         if err != nil {
217                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
218                 return
219         }
220
221         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
222         if trans == nil {
223                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
224                 return
225         }
226         defer trans.Release()
227
228         err = c.tracker.Track(trans)
229         if err != nil {
230                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
231                 return
232         }
233
234         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
235         if err != nil {
236                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
237                 return
238         }
239
240         //
241         // Wake subs request
242         //
243         go c.handleSubscriptionCreate(subs, trans)
244         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
245
246         err = nil
247         if event != nil {
248                 switch themsg := event.(type) {
249                 case *e2ap.E2APSubscriptionResponse:
250                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
251                         if err == nil {
252                                 trans.Release()
253                                 c.rmrSendToXapp("", subs, trans)
254                                 return
255                         }
256                 case *e2ap.E2APSubscriptionFailure:
257                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
258                         if err == nil {
259                                 c.rmrSendToXapp("", subs, trans)
260                         }
261                 default:
262                         break
263                 }
264         }
265         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
266         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
267 }
268
269 //-------------------------------------------------------------------
270 // handle from XAPP Subscription Delete Request
271 //------------------------------------------------------------------
272 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
273         xapp.Logger.Info("MSG from XAPP: %s", params.String())
274
275         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
276         if err != nil {
277                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
278                 return
279         }
280
281         trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
282         if trans == nil {
283                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
284                 return
285         }
286         defer trans.Release()
287
288         err = c.tracker.Track(trans)
289         if err != nil {
290                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
291                 return
292         }
293
294         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
295         if err != nil {
296                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
297                 return
298         }
299
300         //
301         // Wake subs delete
302         //
303         go c.handleSubscriptionDelete(subs, trans)
304         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
305
306         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
307
308         // Whatever is received send ok delete response
309         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
310         subDelRespMsg.RequestId = subs.GetReqId().RequestId
311         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
312         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
313         if err == nil {
314                 c.rmrSendToXapp("", subs, trans)
315         }
316
317         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
318 }
319
320 //-------------------------------------------------------------------
321 // SUBS CREATE Handling
322 //-------------------------------------------------------------------
323 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
324
325         trans := c.tracker.NewSubsTransaction(subs)
326         subs.WaitTransactionTurn(trans)
327         defer subs.ReleaseTransactionTurn(trans)
328         defer trans.Release()
329
330         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
331
332         subRfMsg, valid := subs.GetCachedResponse()
333         if subRfMsg == nil && valid == true {
334                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
335                 switch event.(type) {
336                 case *e2ap.E2APSubscriptionResponse:
337                         subRfMsg, valid = subs.SetCachedResponse(event, true)
338                 case *e2ap.E2APSubscriptionFailure:
339                         subRfMsg, valid = subs.SetCachedResponse(event, false)
340                 default:
341                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
342                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
343                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
344                 }
345                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
346         } else {
347                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
348         }
349
350         parentTrans.SendEvent(subRfMsg, 0)
351 }
352
353 //-------------------------------------------------------------------
354 // SUBS DELETE Handling
355 //-------------------------------------------------------------------
356
357 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
358
359         trans := c.tracker.NewSubsTransaction(subs)
360         subs.WaitTransactionTurn(trans)
361         defer subs.ReleaseTransactionTurn(trans)
362         defer trans.Release()
363
364         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
365
366         subs.mutex.Lock()
367         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
368                 subs.valid = false
369                 subs.mutex.Unlock()
370                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
371         } else {
372                 subs.mutex.Unlock()
373         }
374
375         parentTrans.SendEvent(nil, 0)
376 }
377
378 //-------------------------------------------------------------------
379 // send to E2T Subscription Request
380 //-------------------------------------------------------------------
381 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
382         var err error
383         var event interface{} = nil
384         var timedOut bool = false
385
386         subReqMsg := subs.SubReqMsg
387         subReqMsg.RequestId = subs.GetReqId().RequestId
388         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
389         if err != nil {
390                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
391                 return event
392         }
393
394         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
395                 desc := fmt.Sprintf("(retry %d)", retries)
396                 c.rmrSendToE2T(desc, subs, trans)
397                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
398                 if timedOut {
399                         continue
400                 }
401                 break
402         }
403         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
404         return event
405 }
406
407 //-------------------------------------------------------------------
408 // send to E2T Subscription Delete Request
409 //-------------------------------------------------------------------
410
411 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
412         var err error
413         var event interface{}
414         var timedOut bool
415
416         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
417         subDelReqMsg.RequestId = subs.GetReqId().RequestId
418         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
419         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
420         if err != nil {
421                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
422                 return event
423         }
424
425         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
426                 desc := fmt.Sprintf("(retry %d)", retries)
427                 c.rmrSendToE2T(desc, subs, trans)
428                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
429                 if timedOut {
430                         continue
431                 }
432                 break
433         }
434         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
435         return event
436 }
437
438 //-------------------------------------------------------------------
439 // handle from E2T Subscription Reponse
440 //-------------------------------------------------------------------
441 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
442         xapp.Logger.Info("MSG from E2T: %s", params.String())
443         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
444         if err != nil {
445                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
446                 return
447         }
448         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
449         if err != nil {
450                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
451                 return
452         }
453         trans := subs.GetTransaction()
454         if trans == nil {
455                 err = fmt.Errorf("Ongoing transaction not found")
456                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
457                 return
458         }
459         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
460         if sendOk == false {
461                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
462                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
463         }
464         return
465 }
466
467 //-------------------------------------------------------------------
468 // handle from E2T Subscription Failure
469 //-------------------------------------------------------------------
470 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
471         xapp.Logger.Info("MSG from E2T: %s", params.String())
472         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
473         if err != nil {
474                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
475                 return
476         }
477         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
478         if err != nil {
479                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
480                 return
481         }
482         trans := subs.GetTransaction()
483         if trans == nil {
484                 err = fmt.Errorf("Ongoing transaction not found")
485                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
486                 return
487         }
488         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
489         if sendOk == false {
490                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
491                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
492         }
493         return
494 }
495
496 //-------------------------------------------------------------------
497 // handle from E2T Subscription Delete Response
498 //-------------------------------------------------------------------
499 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
500         xapp.Logger.Info("MSG from E2T: %s", params.String())
501         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
502         if err != nil {
503                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
504                 return
505         }
506         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
507         if err != nil {
508                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
509                 return
510         }
511         trans := subs.GetTransaction()
512         if trans == nil {
513                 err = fmt.Errorf("Ongoing transaction not found")
514                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
515                 return
516         }
517         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
518         if sendOk == false {
519                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
520                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
521         }
522         return
523 }
524
525 //-------------------------------------------------------------------
526 // handle from E2T Subscription Delete Failure
527 //-------------------------------------------------------------------
528 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
529         xapp.Logger.Info("MSG from E2T: %s", params.String())
530         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
531         if err != nil {
532                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
533                 return
534         }
535         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
536         if err != nil {
537                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
538                 return
539         }
540         trans := subs.GetTransaction()
541         if trans == nil {
542                 err = fmt.Errorf("Ongoing transaction not found")
543                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
544                 return
545         }
546         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
547         if sendOk == false {
548                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
549                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
550         }
551         return
552 }
553
554 //-------------------------------------------------------------------
555 //
556 //-------------------------------------------------------------------
557 func typeofSubsMessage(v interface{}) string {
558         if v == nil {
559                 return "NIL"
560         }
561         switch v.(type) {
562         case *e2ap.E2APSubscriptionRequest:
563                 return "SubReq"
564         case *e2ap.E2APSubscriptionResponse:
565                 return "SubResp"
566         case *e2ap.E2APSubscriptionFailure:
567                 return "SubFail"
568         case *e2ap.E2APSubscriptionDeleteRequest:
569                 return "SubDelReq"
570         case *e2ap.E2APSubscriptionDeleteResponse:
571                 return "SubDelResp"
572         case *e2ap.E2APSubscriptionDeleteFailure:
573                 return "SubDelFail"
574         default:
575                 return "Unknown"
576         }
577 }