Rest interface specific counters defined and used: cRestSubReqFromXapp,cRestSubRespTo...
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70
71 type Control struct {
72         *xapp.RMRClient
73         e2ap     *E2ap
74         registry *Registry
75         tracker  *Tracker
76         db       Sdlnterface
77         //subscriber *xapp.Subscriber
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81 }
82
83 type RMRMeid struct {
84         PlmnID  string
85         EnbID   string
86         RanName string
87 }
88
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
91
92 func init() {
93         xapp.Logger.Info("SUBMGR")
94         viper.AutomaticEnv()
95         viper.SetEnvPrefix("submgr")
96         viper.AllowEmptyEnv(true)
97 }
98
99 func NewControl() *Control {
100
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         c := &Control{e2ap: new(E2ap),
112                 registry: registry,
113                 tracker:  tracker,
114                 db:       CreateSdl(),
115                 //subscriber: subscriber,
116                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117         }
118         c.ReadConfigParameters("")
119
120         // Register REST handler for testing support
121         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125
126         if readSubsFromDb == "false" {
127                 return c
128         }
129
130         // Read subscriptions from db
131         xapp.Logger.Info("Reading subscriptions from db")
132         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133         if err != nil {
134                 xapp.Logger.Error("%v", err)
135         } else {
136                 c.registry.subIds = subIds
137                 c.registry.register = register
138                 c.HandleUncompletedSubscriptions(register)
139         }
140         return c
141 }
142
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144         subscriptions, _ := c.registry.QueryHandler()
145         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 }
147
148 //-------------------------------------------------------------------
149 //
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
152
153         // viper.GetDuration returns nanoseconds
154         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155         if e2tSubReqTimeout == 0 {
156                 e2tSubReqTimeout = 2000 * 1000000
157         }
158         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160         if e2tSubDelReqTime == 0 {
161                 e2tSubDelReqTime = 2000 * 1000000
162         }
163         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165         if e2tRecvMsgTimeout == 0 {
166                 e2tRecvMsgTimeout = 2000 * 1000000
167         }
168         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
169
170         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171         // value 100ms used currently only in unittests.
172         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173         if waitRouteCleanup_ms == 0 {
174                 waitRouteCleanup_ms = 5000 * 1000000
175         }
176         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
177
178         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179         if e2tMaxSubReqTryCount == 0 {
180                 e2tMaxSubReqTryCount = 1
181         }
182         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
183
184         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185         if e2tMaxSubDelReqTryCount == 0 {
186                 e2tMaxSubDelReqTryCount = 1
187         }
188         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
189
190         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191         if readSubsFromDb == "" {
192                 readSubsFromDb = "true"
193         }
194         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 }
196
197 //-------------------------------------------------------------------
198 //
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
201
202         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203         for subId, subs := range register {
204                 if subs.SubRespRcvd == false {
205                         subs.NoRespToXapp = true
206                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207                         c.SendSubscriptionDeleteReq(subs)
208                 }
209         }
210 }
211
212 func (c *Control) ReadyCB(data interface{}) {
213         if c.RMRClient == nil {
214                 c.RMRClient = xapp.Rmr
215         }
216 }
217
218 func (c *Control) Run() {
219         xapp.SetReadyCB(c.ReadyCB, nil)
220         xapp.AddConfigChangeListener(c.ReadConfigParameters)
221         xapp.Run(c)
222 }
223
224 //-------------------------------------------------------------------
225 //
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
228
229         restSubId := ksuid.New().String()
230         subResp := models.SubscriptionResponse{}
231         subResp.SubscriptionID = &restSubId
232         p := params.(*models.SubscriptionParams)
233
234         c.CntRecvMsg++
235
236         c.UpdateCounter(cRestSubReqFromXapp)
237
238         if p.ClientEndpoint == nil {
239                 xapp.Logger.Error("ClientEndpoint == nil")
240                 return nil, fmt.Errorf("")
241         }
242
243         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
244         if err != nil {
245                 xapp.Logger.Error("%s", err.Error())
246                 return nil, err
247         }
248
249         restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
250         if err != nil {
251                 xapp.Logger.Error("%s", err.Error())
252                 return nil, err
253         }
254
255         subReqList := e2ap.SubscriptionRequestList{}
256         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
257         if err != nil {
258                 xapp.Logger.Error("%s", err.Error())
259                 c.registry.DeleteRESTSubscription(&restSubId)
260                 return nil, err
261         }
262
263         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
264
265         return &subResp, nil
266
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
275
276         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
277
278         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
279         if err != nil {
280                 xapp.Logger.Error("%s", err.Error())
281                 return
282         }
283
284         var requestorID int64
285         var instanceId int64
286         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
288
289                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
290                 if trans == nil {
291                         c.registry.DeleteRESTSubscription(restSubId)
292                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
293                         return
294                 }
295
296                 defer trans.Release()
297                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
299                 if err != nil {
300                         // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301                         // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302                         requestorID = (int64)(0)
303                         instanceId = (int64)(0)
304                         resp := &models.SubscriptionResponse{
305                                 SubscriptionID: restSubId,
306                                 SubscriptionInstances: []*models.SubscriptionInstance{
307                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
308                                 },
309                         }
310                         // Mark REST subscription request processed.
311                         restSubscription.SetProcessed()
312                         xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313                         xapp.Subscription.Notify(resp, *clientEndpoint)
314                 } else {
315                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
316
317                         // Store successfully processed InstanceId for deletion
318                         restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
319
320                         // Send notification to xApp that a Subscription Request has been processed.
321                         requestorID = (int64)(subRespMsg.RequestId.Id)
322                         instanceId = (int64)(subRespMsg.RequestId.InstanceId)
323                         resp := &models.SubscriptionResponse{
324                                 SubscriptionID: restSubId,
325                                 SubscriptionInstances: []*models.SubscriptionInstance{
326                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
327                                 },
328                         }
329                         // Mark REST subscription request processesd.
330                         restSubscription.SetProcessed()
331                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
332                         xapp.Subscription.Notify(resp, *clientEndpoint)
333
334                 }
335                 c.UpdateCounter(cRestSubRespToXapp)
336         }
337 }
338
339 //-------------------------------------------------------------------
340 //
341 //------------------------------------------------------------------
342 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
343         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
344
345         err := c.tracker.Track(trans)
346         if err != nil {
347                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
348                 xapp.Logger.Error("%s", err.Error())
349                 return nil, err
350         }
351
352         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
353         if err != nil {
354                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
355                 xapp.Logger.Error("%s", err.Error())
356                 return nil, err
357         }
358
359         //
360         // Wake subs request
361         //
362         go c.handleSubscriptionCreate(subs, trans)
363         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
364
365         err = nil
366         if event != nil {
367                 switch themsg := event.(type) {
368                 case *e2ap.E2APSubscriptionResponse:
369                         trans.Release()
370                         return themsg, nil
371                 case *e2ap.E2APSubscriptionFailure:
372                         err = fmt.Errorf("SubscriptionFailure received")
373                         return nil, err
374                 default:
375                         break
376                 }
377         }
378         err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
379         xapp.Logger.Error("%s", err.Error())
380         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
381         return nil, err
382 }
383
384 //-------------------------------------------------------------------
385 //
386 //-------------------------------------------------------------------
387 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
388
389         c.CntRecvMsg++
390         c.UpdateCounter(cRestSubDelReqFromXapp)
391
392         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
393
394         restSubscription, err := c.registry.GetRESTSubscription(restSubId)
395         if err != nil {
396                 xapp.Logger.Error("%s", err.Error())
397                 if restSubscription == nil {
398                         // Subscription was not found
399                         return nil
400                 } else {
401                         if restSubscription.SubReqOngoing == true {
402                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
403                                 xapp.Logger.Error("%s", err.Error())
404                                 return err
405                         } else if restSubscription.SubDelReqOngoing == true {
406                                 // Previous request for same restSubId still ongoing
407                                 return nil
408                         }
409                 }
410         }
411
412         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
413         go func() {
414                 for _, instanceId := range restSubscription.InstanceIds {
415                         err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
416                         if err != nil {
417                                 xapp.Logger.Error("%s", err.Error())
418                                 //return err
419                         }
420                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
421                         restSubscription.DeleteInstanceId(instanceId)
422                 }
423                 c.registry.DeleteRESTSubscription(&restSubId)
424         }()
425
426         c.UpdateCounter(cRestSubDelRespToXapp)
427
428         return nil
429 }
430
431 //-------------------------------------------------------------------
432 //
433 //-------------------------------------------------------------------
434 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
435
436         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
437         if trans == nil {
438                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
439                 xapp.Logger.Error("%s", err.Error())
440         }
441         defer trans.Release()
442
443         err := c.tracker.Track(trans)
444         if err != nil {
445                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
446                 xapp.Logger.Error("%s", err.Error())
447                 return &time.ParseError{}
448         }
449
450         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
451         if err != nil {
452                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
453                 xapp.Logger.Error("%s", err.Error())
454                 return err
455         }
456         //
457         // Wake subs delete
458         //
459         go c.handleSubscriptionDelete(subs, trans)
460         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
461
462         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
463
464         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
465
466         return nil
467 }
468
469 //-------------------------------------------------------------------
470 //
471 //-------------------------------------------------------------------
472 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
473         xapp.Logger.Info("QueryHandler() called")
474
475         c.CntRecvMsg++
476
477         return c.registry.QueryHandler()
478 }
479
480 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
481         xapp.Logger.Info("TestRestHandler() called")
482
483         pathParams := mux.Vars(r)
484         s := pathParams["testId"]
485
486         // This can be used to delete single subscription from db
487         if contains := strings.Contains(s, "deletesubid="); contains == true {
488                 var splits = strings.Split(s, "=")
489                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
490                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
491                         c.RemoveSubscriptionFromSdl(uint32(subId))
492                         return
493                 }
494         }
495
496         // This can be used to remove all subscriptions db from
497         if s == "emptydb" {
498                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
499                 c.RemoveAllSubscriptionsFromSdl()
500                 return
501         }
502
503         // This is meant to cause submgr's restart in testing
504         if s == "restart" {
505                 xapp.Logger.Info("os.Exit(1) called")
506                 os.Exit(1)
507         }
508
509         xapp.Logger.Info("Unsupported rest command received %s", s)
510 }
511
512 //-------------------------------------------------------------------
513 //
514 //-------------------------------------------------------------------
515
516 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
517         params := &xapp.RMRParams{}
518         params.Mtype = trans.GetMtype()
519         params.SubId = int(subs.GetReqId().InstanceId)
520         params.Xid = ""
521         params.Meid = subs.GetMeid()
522         params.Src = ""
523         params.PayloadLen = len(trans.Payload.Buf)
524         params.Payload = trans.Payload.Buf
525         params.Mbuf = nil
526         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
527         err = c.SendWithRetry(params, false, 5)
528         if err != nil {
529                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
530         }
531         return err
532 }
533
534 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
535
536         params := &xapp.RMRParams{}
537         params.Mtype = trans.GetMtype()
538         params.SubId = int(subs.GetReqId().InstanceId)
539         params.Xid = trans.GetXid()
540         params.Meid = trans.GetMeid()
541         params.Src = ""
542         params.PayloadLen = len(trans.Payload.Buf)
543         params.Payload = trans.Payload.Buf
544         params.Mbuf = nil
545         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
546         err = c.SendWithRetry(params, false, 5)
547         if err != nil {
548                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
549         }
550         return err
551 }
552
553 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
554         if c.RMRClient == nil {
555                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
556                 xapp.Logger.Error("%s", err.Error())
557                 return
558         }
559         c.CntRecvMsg++
560
561         defer c.RMRClient.Free(msg.Mbuf)
562
563         // xapp-frame might use direct access to c buffer and
564         // when msg.Mbuf is freed, someone might take it into use
565         // and payload data might be invalid inside message handle function
566         //
567         // subscriptions won't load system a lot so there is no
568         // real performance hit by cloning buffer into new go byte slice
569         cPay := append(msg.Payload[:0:0], msg.Payload...)
570         msg.Payload = cPay
571         msg.PayloadLen = len(cPay)
572
573         switch msg.Mtype {
574         case xapp.RIC_SUB_REQ:
575                 go c.handleXAPPSubscriptionRequest(msg)
576         case xapp.RIC_SUB_RESP:
577                 go c.handleE2TSubscriptionResponse(msg)
578         case xapp.RIC_SUB_FAILURE:
579                 go c.handleE2TSubscriptionFailure(msg)
580         case xapp.RIC_SUB_DEL_REQ:
581                 go c.handleXAPPSubscriptionDeleteRequest(msg)
582         case xapp.RIC_SUB_DEL_RESP:
583                 go c.handleE2TSubscriptionDeleteResponse(msg)
584         case xapp.RIC_SUB_DEL_FAILURE:
585                 go c.handleE2TSubscriptionDeleteFailure(msg)
586         default:
587                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
588         }
589         return
590 }
591
592 //-------------------------------------------------------------------
593 // handle from XAPP Subscription Request
594 //------------------------------------------------------------------
595 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
596         xapp.Logger.Info("MSG from XAPP: %s", params.String())
597         c.UpdateCounter(cSubReqFromXapp)
598
599         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
600         if err != nil {
601                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
602                 return
603         }
604
605         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
606         if trans == nil {
607                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
608                 return
609         }
610         defer trans.Release()
611
612         if err = c.tracker.Track(trans); err != nil {
613                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
614                 return
615         }
616
617         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
618         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
619         if err != nil {
620                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
621                 return
622         }
623
624         c.wakeSubscriptionRequest(subs, trans)
625 }
626
627 //-------------------------------------------------------------------
628 // Wake Subscription Request to E2node
629 //------------------------------------------------------------------
630 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
631
632         go c.handleSubscriptionCreate(subs, trans)
633         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
634         var err error
635         if event != nil {
636                 switch themsg := event.(type) {
637                 case *e2ap.E2APSubscriptionResponse:
638                         themsg.RequestId.Id = trans.RequestId.Id
639                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
640                         if err == nil {
641                                 trans.Release()
642                                 c.UpdateCounter(cSubRespToXapp)
643                                 c.rmrSendToXapp("", subs, trans)
644                                 return
645                         }
646                 case *e2ap.E2APSubscriptionFailure:
647                         themsg.RequestId.Id = trans.RequestId.Id
648                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
649                         if err == nil {
650                                 c.UpdateCounter(cSubFailToXapp)
651                                 c.rmrSendToXapp("", subs, trans)
652                         }
653                 default:
654                         break
655                 }
656         }
657         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
658         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
659 }
660
661 //-------------------------------------------------------------------
662 // handle from XAPP Subscription Delete Request
663 //------------------------------------------------------------------
664 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
665         xapp.Logger.Info("MSG from XAPP: %s", params.String())
666         c.UpdateCounter(cSubDelReqFromXapp)
667
668         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
669         if err != nil {
670                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
671                 return
672         }
673
674         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
675         if trans == nil {
676                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
677                 return
678         }
679         defer trans.Release()
680
681         err = c.tracker.Track(trans)
682         if err != nil {
683                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
684                 return
685         }
686
687         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
688         if err != nil {
689                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
690                 return
691         }
692
693         //
694         // Wake subs delete
695         //
696         go c.handleSubscriptionDelete(subs, trans)
697         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
698
699         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
700
701         if subs.NoRespToXapp == true {
702                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
703                 return
704         }
705
706         // Whatever is received success, fail or timeout, send successful delete response
707         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
708         subDelRespMsg.RequestId.Id = trans.RequestId.Id
709         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
710         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
711         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
712         if err == nil {
713                 c.UpdateCounter(cSubDelRespToXapp)
714                 c.rmrSendToXapp("", subs, trans)
715         }
716
717         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
718         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
719 }
720
721 //-------------------------------------------------------------------
722 // SUBS CREATE Handling
723 //-------------------------------------------------------------------
724 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
725
726         var removeSubscriptionFromDb bool = false
727         trans := c.tracker.NewSubsTransaction(subs)
728         subs.WaitTransactionTurn(trans)
729         defer subs.ReleaseTransactionTurn(trans)
730         defer trans.Release()
731
732         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
733
734         subRfMsg, valid := subs.GetCachedResponse()
735         if subRfMsg == nil && valid == true {
736                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
737                 switch event.(type) {
738                 case *e2ap.E2APSubscriptionResponse:
739                         subRfMsg, valid = subs.SetCachedResponse(event, true)
740                         subs.SubRespRcvd = true
741                 case *e2ap.E2APSubscriptionFailure:
742                         removeSubscriptionFromDb = true
743                         subRfMsg, valid = subs.SetCachedResponse(event, false)
744                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
745                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
746                 case *SubmgrRestartTestEvent:
747                         // This simulates that no response has been received and after restart subscriptions are restored from db
748                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
749                         return
750                 default:
751                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
752                         removeSubscriptionFromDb = true
753                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
754                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
755                 }
756                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
757         } else {
758                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
759         }
760
761         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
762         if valid == false {
763                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
764         }
765
766         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
767         parentTrans.SendEvent(subRfMsg, 0)
768 }
769
770 //-------------------------------------------------------------------
771 // SUBS DELETE Handling
772 //-------------------------------------------------------------------
773
774 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
775
776         trans := c.tracker.NewSubsTransaction(subs)
777         subs.WaitTransactionTurn(trans)
778         defer subs.ReleaseTransactionTurn(trans)
779         defer trans.Release()
780
781         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
782
783         subs.mutex.Lock()
784
785         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
786                 subs.valid = false
787                 subs.mutex.Unlock()
788                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
789         } else {
790                 subs.mutex.Unlock()
791         }
792         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
793         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
794         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
795         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
796         c.registry.UpdateSubscriptionToDb(subs, c)
797         parentTrans.SendEvent(nil, 0)
798 }
799
800 //-------------------------------------------------------------------
801 // send to E2T Subscription Request
802 //-------------------------------------------------------------------
803 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
804         var err error
805         var event interface{} = nil
806         var timedOut bool = false
807
808         subReqMsg := subs.SubReqMsg
809         subReqMsg.RequestId = subs.GetReqId().RequestId
810         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
811         if err != nil {
812                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
813                 return event
814         }
815
816         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
817         c.WriteSubscriptionToDb(subs)
818
819         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
820                 desc := fmt.Sprintf("(retry %d)", retries)
821                 if retries == 0 {
822                         c.UpdateCounter(cSubReqToE2)
823                 } else {
824                         c.UpdateCounter(cSubReReqToE2)
825                 }
826                 c.rmrSendToE2T(desc, subs, trans)
827                 if subs.DoNotWaitSubResp == false {
828                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
829                         if timedOut {
830                                 c.UpdateCounter(cSubReqTimerExpiry)
831                                 continue
832                         }
833                 } else {
834                         // Simulating case where subscrition request has been sent but response has not been received before restart
835                         event = &SubmgrRestartTestEvent{}
836                 }
837                 break
838         }
839         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
840         return event
841 }
842
843 //-------------------------------------------------------------------
844 // send to E2T Subscription Delete Request
845 //-------------------------------------------------------------------
846
847 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
848         var err error
849         var event interface{}
850         var timedOut bool
851
852         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
853         subDelReqMsg.RequestId = subs.GetReqId().RequestId
854         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
855         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
856         if err != nil {
857                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
858                 return event
859         }
860
861         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
862                 desc := fmt.Sprintf("(retry %d)", retries)
863                 if retries == 0 {
864                         c.UpdateCounter(cSubDelReqToE2)
865                 } else {
866                         c.UpdateCounter(cSubDelReReqToE2)
867                 }
868                 c.rmrSendToE2T(desc, subs, trans)
869                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
870                 if timedOut {
871                         c.UpdateCounter(cSubDelReqTimerExpiry)
872                         continue
873                 }
874                 break
875         }
876         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
877         return event
878 }
879
880 //-------------------------------------------------------------------
881 // handle from E2T Subscription Response
882 //-------------------------------------------------------------------
883 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
884         xapp.Logger.Info("MSG from E2T: %s", params.String())
885         c.UpdateCounter(cSubRespFromE2)
886
887         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
888         if err != nil {
889                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
890                 return
891         }
892         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
893         if err != nil {
894                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
895                 return
896         }
897         trans := subs.GetTransaction()
898         if trans == nil {
899                 err = fmt.Errorf("Ongoing transaction not found")
900                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
901                 return
902         }
903         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
904         if sendOk == false {
905                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
906                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
907         }
908         return
909 }
910
911 //-------------------------------------------------------------------
912 // handle from E2T Subscription Failure
913 //-------------------------------------------------------------------
914 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
915         xapp.Logger.Info("MSG from E2T: %s", params.String())
916         c.UpdateCounter(cSubFailFromE2)
917         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
918         if err != nil {
919                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
920                 return
921         }
922         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
923         if err != nil {
924                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
925                 return
926         }
927         trans := subs.GetTransaction()
928         if trans == nil {
929                 err = fmt.Errorf("Ongoing transaction not found")
930                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
931                 return
932         }
933         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
934         if sendOk == false {
935                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
936                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
937         }
938         return
939 }
940
941 //-------------------------------------------------------------------
942 // handle from E2T Subscription Delete Response
943 //-------------------------------------------------------------------
944 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
945         xapp.Logger.Info("MSG from E2T: %s", params.String())
946         c.UpdateCounter(cSubDelRespFromE2)
947         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
948         if err != nil {
949                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
950                 return
951         }
952         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
953         if err != nil {
954                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
955                 return
956         }
957         trans := subs.GetTransaction()
958         if trans == nil {
959                 err = fmt.Errorf("Ongoing transaction not found")
960                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
961                 return
962         }
963         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
964         if sendOk == false {
965                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
966                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
967         }
968         return
969 }
970
971 //-------------------------------------------------------------------
972 // handle from E2T Subscription Delete Failure
973 //-------------------------------------------------------------------
974 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
975         xapp.Logger.Info("MSG from E2T: %s", params.String())
976         c.UpdateCounter(cSubDelFailFromE2)
977         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
978         if err != nil {
979                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
980                 return
981         }
982         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
983         if err != nil {
984                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
985                 return
986         }
987         trans := subs.GetTransaction()
988         if trans == nil {
989                 err = fmt.Errorf("Ongoing transaction not found")
990                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
991                 return
992         }
993         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
994         if sendOk == false {
995                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
996                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
997         }
998         return
999 }
1000
1001 //-------------------------------------------------------------------
1002 //
1003 //-------------------------------------------------------------------
1004 func typeofSubsMessage(v interface{}) string {
1005         if v == nil {
1006                 return "NIL"
1007         }
1008         switch v.(type) {
1009         //case *e2ap.E2APSubscriptionRequest:
1010         //      return "SubReq"
1011         case *e2ap.E2APSubscriptionResponse:
1012                 return "SubResp"
1013         case *e2ap.E2APSubscriptionFailure:
1014                 return "SubFail"
1015         //case *e2ap.E2APSubscriptionDeleteRequest:
1016         //      return "SubDelReq"
1017         case *e2ap.E2APSubscriptionDeleteResponse:
1018                 return "SubDelResp"
1019         case *e2ap.E2APSubscriptionDeleteFailure:
1020                 return "SubDelFail"
1021         default:
1022                 return "Unknown"
1023         }
1024 }
1025
1026 //-------------------------------------------------------------------
1027 //
1028 //-------------------------------------------------------------------
1029 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1030         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1031         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1032         if err != nil {
1033                 xapp.Logger.Error("%v", err)
1034         }
1035 }
1036
1037 //-------------------------------------------------------------------
1038 //
1039 //-------------------------------------------------------------------
1040 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1041
1042         if removeSubscriptionFromDb == true {
1043                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1044                 c.RemoveSubscriptionFromDb(subs)
1045         } else {
1046                 // Update is needed for successful response and merge case here
1047                 if subs.RetryFromXapp == false {
1048                         c.WriteSubscriptionToDb(subs)
1049                 }
1050         }
1051         subs.RetryFromXapp = false
1052 }
1053
1054 //-------------------------------------------------------------------
1055 //
1056 //-------------------------------------------------------------------
1057 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1058         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1059         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1060         if err != nil {
1061                 xapp.Logger.Error("%v", err)
1062         }
1063 }
1064
1065 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1066
1067         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1068
1069         // Send delete for every endpoint in the subscription
1070         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1071         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1072         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1073         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1074         if err != nil {
1075                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1076                 return
1077         }
1078         for _, endPoint := range subs.EpList.Endpoints {
1079                 params := &xapp.RMRParams{}
1080                 params.Mtype = mType
1081                 params.SubId = int(subs.GetReqId().InstanceId)
1082                 params.Xid = ""
1083                 params.Meid = subs.Meid
1084                 params.Src = endPoint.String()
1085                 params.PayloadLen = len(payload.Buf)
1086                 params.Payload = payload.Buf
1087                 params.Mbuf = nil
1088                 subs.DeleteFromDb = true
1089                 c.handleXAPPSubscriptionDeleteRequest(params)
1090         }
1091 }