Restored UT cases + reduced UT run time
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70
71 type Control struct {
72         *xapp.RMRClient
73         e2ap     *E2ap
74         registry *Registry
75         tracker  *Tracker
76         db       Sdlnterface
77         //subscriber *xapp.Subscriber
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81 }
82
83 type RMRMeid struct {
84         PlmnID  string
85         EnbID   string
86         RanName string
87 }
88
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
91
92 func init() {
93         xapp.Logger.Info("SUBMGR")
94         viper.AutomaticEnv()
95         viper.SetEnvPrefix("submgr")
96         viper.AllowEmptyEnv(true)
97 }
98
99 func NewControl() *Control {
100
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         c := &Control{e2ap: new(E2ap),
112                 registry: registry,
113                 tracker:  tracker,
114                 db:       CreateSdl(),
115                 //subscriber: subscriber,
116                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117         }
118         c.ReadConfigParameters("")
119
120         // Register REST handler for testing support
121         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125
126         if readSubsFromDb == "false" {
127                 return c
128         }
129
130         // Read subscriptions from db
131         xapp.Logger.Info("Reading subscriptions from db")
132         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133         if err != nil {
134                 xapp.Logger.Error("%v", err)
135         } else {
136                 c.registry.subIds = subIds
137                 c.registry.register = register
138                 c.HandleUncompletedSubscriptions(register)
139         }
140         return c
141 }
142
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144         subscriptions, _ := c.registry.QueryHandler()
145         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 }
147
148 //-------------------------------------------------------------------
149 //
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
152
153         // viper.GetDuration returns nanoseconds
154         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155         if e2tSubReqTimeout == 0 {
156                 e2tSubReqTimeout = 2000 * 1000000
157         }
158         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160         if e2tSubDelReqTime == 0 {
161                 e2tSubDelReqTime = 2000 * 1000000
162         }
163         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165         if e2tRecvMsgTimeout == 0 {
166                 e2tRecvMsgTimeout = 2000 * 1000000
167         }
168         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
169
170         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171         // value 100ms used currently only in unittests.
172         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173         if waitRouteCleanup_ms == 0 {
174                 waitRouteCleanup_ms = 5000 * 1000000
175         }
176         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
177
178         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179         if e2tMaxSubReqTryCount == 0 {
180                 e2tMaxSubReqTryCount = 1
181         }
182         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
183
184         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185         if e2tMaxSubDelReqTryCount == 0 {
186                 e2tMaxSubDelReqTryCount = 1
187         }
188         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
189
190         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191         if readSubsFromDb == "" {
192                 readSubsFromDb = "true"
193         }
194         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 }
196
197 //-------------------------------------------------------------------
198 //
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
201
202         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203         for subId, subs := range register {
204                 if subs.SubRespRcvd == false {
205                         subs.NoRespToXapp = true
206                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207                         c.SendSubscriptionDeleteReq(subs)
208                 }
209         }
210 }
211
212 func (c *Control) ReadyCB(data interface{}) {
213         if c.RMRClient == nil {
214                 c.RMRClient = xapp.Rmr
215         }
216 }
217
218 func (c *Control) Run() {
219         xapp.SetReadyCB(c.ReadyCB, nil)
220         xapp.AddConfigChangeListener(c.ReadConfigParameters)
221         xapp.Run(c)
222 }
223
224 //-------------------------------------------------------------------
225 //
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
228
229         restSubId := ksuid.New().String()
230         subResp := models.SubscriptionResponse{}
231         subResp.SubscriptionID = &restSubId
232         p := params.(*models.SubscriptionParams)
233
234         c.CntRecvMsg++
235
236         c.UpdateCounter(cSubReqFromXapp)
237
238         if p.ClientEndpoint == nil {
239                 xapp.Logger.Error("ClientEndpoint == nil")
240                 return nil, fmt.Errorf("")
241         }
242
243         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
244         if err != nil {
245                 xapp.Logger.Error("%s", err.Error())
246                 return nil, err
247         }
248
249         restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
250         if err != nil {
251                 xapp.Logger.Error("%s", err.Error())
252                 return nil, err
253         }
254
255         subReqList := e2ap.SubscriptionRequestList{}
256         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
257         if err != nil {
258                 xapp.Logger.Error("%s", err.Error())
259                 c.registry.DeleteRESTSubscription(&restSubId)
260                 return nil, err
261         }
262
263         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
264
265         return &subResp, nil
266
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
275
276         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
277
278         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
279         if err != nil {
280                 xapp.Logger.Error("%s", err.Error())
281                 return
282         }
283
284         var requestorID int64
285         var instanceId int64
286         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
288
289                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
290                 if trans == nil {
291                         c.registry.DeleteRESTSubscription(restSubId)
292                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
293                         return
294                 }
295
296                 defer trans.Release()
297                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
299                 if err != nil {
300                         // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301                         // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302                         requestorID = (int64)(0)
303                         instanceId = (int64)(0)
304                         resp := &models.SubscriptionResponse{
305                                 SubscriptionID: restSubId,
306                                 SubscriptionInstances: []*models.SubscriptionInstance{
307                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
308                                 },
309                         }
310                         // Mark REST subscription request processed.
311                         restSubscription.SetProcessed()
312                         xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313                         xapp.Subscription.Notify(resp, *clientEndpoint)
314                 } else {
315                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
316
317                         // Store successfully processed InstanceId for deletion
318                         restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
319
320                         // Send notification to xApp that a Subscription Request has been processed.
321                         requestorID = (int64)(subRespMsg.RequestId.Id)
322                         instanceId = (int64)(subRespMsg.RequestId.InstanceId)
323                         resp := &models.SubscriptionResponse{
324                                 SubscriptionID: restSubId,
325                                 SubscriptionInstances: []*models.SubscriptionInstance{
326                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
327                                 },
328                         }
329                         // Mark REST subscription request processesd.
330                         restSubscription.SetProcessed()
331                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
332                         xapp.Subscription.Notify(resp, *clientEndpoint)
333                 }
334                 c.UpdateCounter(cSubRespToXapp)
335         }
336 }
337
338 //-------------------------------------------------------------------
339 //
340 //------------------------------------------------------------------
341 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
342         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
343
344         err := c.tracker.Track(trans)
345         if err != nil {
346                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
347                 xapp.Logger.Error("%s", err.Error())
348                 return nil, err
349         }
350
351         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
352         if err != nil {
353                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
354                 xapp.Logger.Error("%s", err.Error())
355                 return nil, err
356         }
357
358         //
359         // Wake subs request
360         //
361         go c.handleSubscriptionCreate(subs, trans)
362         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
363
364         err = nil
365         if event != nil {
366                 switch themsg := event.(type) {
367                 case *e2ap.E2APSubscriptionResponse:
368                         trans.Release()
369                         return themsg, nil
370                 case *e2ap.E2APSubscriptionFailure:
371                         err = fmt.Errorf("SubscriptionFailure received")
372                         return nil, err
373                 default:
374                         break
375                 }
376         }
377         err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
378         xapp.Logger.Error("%s", err.Error())
379         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
380         return nil, err
381 }
382
383 //-------------------------------------------------------------------
384 //
385 //-------------------------------------------------------------------
386 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
387
388         c.CntRecvMsg++
389         c.UpdateCounter(cSubDelReqFromXapp)
390
391         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
392
393         restSubscription, err := c.registry.GetRESTSubscription(restSubId)
394         if err != nil {
395                 xapp.Logger.Error("%s", err.Error())
396                 if restSubscription == nil {
397                         // Subscription was not found
398                         return nil
399                 } else {
400                         if restSubscription.SubReqOngoing == true {
401                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
402                                 xapp.Logger.Error("%s", err.Error())
403                                 return err
404                         } else if restSubscription.SubDelReqOngoing == true {
405                                 // Previous request for same restSubId still ongoing
406                                 return nil
407                         }
408                 }
409         }
410
411         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
412         go func() {
413                 for _, instanceId := range restSubscription.InstanceIds {
414                         err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
415                         if err != nil {
416                                 xapp.Logger.Error("%s", err.Error())
417                                 //return err
418                         }
419                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
420                         restSubscription.DeleteInstanceId(instanceId)
421                 }
422                 c.registry.DeleteRESTSubscription(&restSubId)
423         }()
424
425         c.UpdateCounter(cSubDelRespToXapp)
426
427         return nil
428 }
429
430 //-------------------------------------------------------------------
431 //
432 //-------------------------------------------------------------------
433 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
434
435         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
436         if trans == nil {
437                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
438                 xapp.Logger.Error("%s", err.Error())
439         }
440         defer trans.Release()
441
442         err := c.tracker.Track(trans)
443         if err != nil {
444                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
445                 xapp.Logger.Error("%s", err.Error())
446                 return &time.ParseError{}
447         }
448
449         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
450         if err != nil {
451                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
452                 xapp.Logger.Error("%s", err.Error())
453                 return err
454         }
455         //
456         // Wake subs delete
457         //
458         go c.handleSubscriptionDelete(subs, trans)
459         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
460
461         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
462
463         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
464
465         return nil
466 }
467
468 //-------------------------------------------------------------------
469 //
470 //-------------------------------------------------------------------
471 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
472         xapp.Logger.Info("QueryHandler() called")
473
474         c.CntRecvMsg++
475
476         return c.registry.QueryHandler()
477 }
478
479 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
480         xapp.Logger.Info("TestRestHandler() called")
481
482         pathParams := mux.Vars(r)
483         s := pathParams["testId"]
484
485         // This can be used to delete single subscription from db
486         if contains := strings.Contains(s, "deletesubid="); contains == true {
487                 var splits = strings.Split(s, "=")
488                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
489                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
490                         c.RemoveSubscriptionFromSdl(uint32(subId))
491                         return
492                 }
493         }
494
495         // This can be used to remove all subscriptions db from
496         if s == "emptydb" {
497                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
498                 c.RemoveAllSubscriptionsFromSdl()
499                 return
500         }
501
502         // This is meant to cause submgr's restart in testing
503         if s == "restart" {
504                 xapp.Logger.Info("os.Exit(1) called")
505                 os.Exit(1)
506         }
507
508         xapp.Logger.Info("Unsupported rest command received %s", s)
509 }
510
511 //-------------------------------------------------------------------
512 //
513 //-------------------------------------------------------------------
514
515 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
516         params := &xapp.RMRParams{}
517         params.Mtype = trans.GetMtype()
518         params.SubId = int(subs.GetReqId().InstanceId)
519         params.Xid = ""
520         params.Meid = subs.GetMeid()
521         params.Src = ""
522         params.PayloadLen = len(trans.Payload.Buf)
523         params.Payload = trans.Payload.Buf
524         params.Mbuf = nil
525         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
526         err = c.SendWithRetry(params, false, 5)
527         if err != nil {
528                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
529         }
530         return err
531 }
532
533 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
534
535         params := &xapp.RMRParams{}
536         params.Mtype = trans.GetMtype()
537         params.SubId = int(subs.GetReqId().InstanceId)
538         params.Xid = trans.GetXid()
539         params.Meid = trans.GetMeid()
540         params.Src = ""
541         params.PayloadLen = len(trans.Payload.Buf)
542         params.Payload = trans.Payload.Buf
543         params.Mbuf = nil
544         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
545         err = c.SendWithRetry(params, false, 5)
546         if err != nil {
547                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
548         }
549         return err
550 }
551
552 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
553         if c.RMRClient == nil {
554                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
555                 xapp.Logger.Error("%s", err.Error())
556                 return
557         }
558         c.CntRecvMsg++
559
560         defer c.RMRClient.Free(msg.Mbuf)
561
562         // xapp-frame might use direct access to c buffer and
563         // when msg.Mbuf is freed, someone might take it into use
564         // and payload data might be invalid inside message handle function
565         //
566         // subscriptions won't load system a lot so there is no
567         // real performance hit by cloning buffer into new go byte slice
568         cPay := append(msg.Payload[:0:0], msg.Payload...)
569         msg.Payload = cPay
570         msg.PayloadLen = len(cPay)
571
572         switch msg.Mtype {
573         case xapp.RIC_SUB_REQ:
574                 go c.handleXAPPSubscriptionRequest(msg)
575         case xapp.RIC_SUB_RESP:
576                 go c.handleE2TSubscriptionResponse(msg)
577         case xapp.RIC_SUB_FAILURE:
578                 go c.handleE2TSubscriptionFailure(msg)
579         case xapp.RIC_SUB_DEL_REQ:
580                 go c.handleXAPPSubscriptionDeleteRequest(msg)
581         case xapp.RIC_SUB_DEL_RESP:
582                 go c.handleE2TSubscriptionDeleteResponse(msg)
583         case xapp.RIC_SUB_DEL_FAILURE:
584                 go c.handleE2TSubscriptionDeleteFailure(msg)
585         default:
586                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
587         }
588         return
589 }
590
591 //-------------------------------------------------------------------
592 // handle from XAPP Subscription Request
593 //------------------------------------------------------------------
594 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
595         xapp.Logger.Info("MSG from XAPP: %s", params.String())
596         c.UpdateCounter(cSubReqFromXapp)
597
598         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
599         if err != nil {
600                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
601                 return
602         }
603
604         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
605         if trans == nil {
606                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
607                 return
608         }
609         defer trans.Release()
610
611         if err = c.tracker.Track(trans); err != nil {
612                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
613                 return
614         }
615
616         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
617         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
618         if err != nil {
619                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
620                 return
621         }
622
623         c.wakeSubscriptionRequest(subs, trans)
624 }
625
626 //-------------------------------------------------------------------
627 // Wake Subscription Request to E2node
628 //------------------------------------------------------------------
629 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
630
631         go c.handleSubscriptionCreate(subs, trans)
632         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
633         var err error
634         if event != nil {
635                 switch themsg := event.(type) {
636                 case *e2ap.E2APSubscriptionResponse:
637                         themsg.RequestId.Id = trans.RequestId.Id
638                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
639                         if err == nil {
640                                 trans.Release()
641                                 c.UpdateCounter(cSubRespToXapp)
642                                 c.rmrSendToXapp("", subs, trans)
643                                 return
644                         }
645                 case *e2ap.E2APSubscriptionFailure:
646                         themsg.RequestId.Id = trans.RequestId.Id
647                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
648                         if err == nil {
649                                 c.UpdateCounter(cSubFailToXapp)
650                                 c.rmrSendToXapp("", subs, trans)
651                         }
652                 default:
653                         break
654                 }
655         }
656         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
657         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
658 }
659
660 //-------------------------------------------------------------------
661 // handle from XAPP Subscription Delete Request
662 //------------------------------------------------------------------
663 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
664         xapp.Logger.Info("MSG from XAPP: %s", params.String())
665         c.UpdateCounter(cSubDelReqFromXapp)
666
667         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
668         if err != nil {
669                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
670                 return
671         }
672
673         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
674         if trans == nil {
675                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
676                 return
677         }
678         defer trans.Release()
679
680         err = c.tracker.Track(trans)
681         if err != nil {
682                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
683                 return
684         }
685
686         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
687         if err != nil {
688                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
689                 return
690         }
691
692         //
693         // Wake subs delete
694         //
695         go c.handleSubscriptionDelete(subs, trans)
696         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
697
698         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
699
700         if subs.NoRespToXapp == true {
701                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
702                 return
703         }
704
705         // Whatever is received success, fail or timeout, send successful delete response
706         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
707         subDelRespMsg.RequestId.Id = trans.RequestId.Id
708         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
709         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
710         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
711         if err == nil {
712                 c.UpdateCounter(cSubDelRespToXapp)
713                 c.rmrSendToXapp("", subs, trans)
714         }
715
716         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
717         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
718 }
719
720 //-------------------------------------------------------------------
721 // SUBS CREATE Handling
722 //-------------------------------------------------------------------
723 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
724
725         var removeSubscriptionFromDb bool = false
726         trans := c.tracker.NewSubsTransaction(subs)
727         subs.WaitTransactionTurn(trans)
728         defer subs.ReleaseTransactionTurn(trans)
729         defer trans.Release()
730
731         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
732
733         subRfMsg, valid := subs.GetCachedResponse()
734         if subRfMsg == nil && valid == true {
735                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
736                 switch event.(type) {
737                 case *e2ap.E2APSubscriptionResponse:
738                         subRfMsg, valid = subs.SetCachedResponse(event, true)
739                         subs.SubRespRcvd = true
740                 case *e2ap.E2APSubscriptionFailure:
741                         removeSubscriptionFromDb = true
742                         subRfMsg, valid = subs.SetCachedResponse(event, false)
743                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
744                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
745                 case *SubmgrRestartTestEvent:
746                         // This simulates that no response has been received and after restart subscriptions are restored from db
747                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
748                         return
749                 default:
750                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
751                         removeSubscriptionFromDb = true
752                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
753                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
754                 }
755                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
756         } else {
757                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
758         }
759
760         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
761         if valid == false {
762                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
763         }
764
765         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
766         parentTrans.SendEvent(subRfMsg, 0)
767 }
768
769 //-------------------------------------------------------------------
770 // SUBS DELETE Handling
771 //-------------------------------------------------------------------
772
773 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
774
775         trans := c.tracker.NewSubsTransaction(subs)
776         subs.WaitTransactionTurn(trans)
777         defer subs.ReleaseTransactionTurn(trans)
778         defer trans.Release()
779
780         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
781
782         subs.mutex.Lock()
783
784         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
785                 subs.valid = false
786                 subs.mutex.Unlock()
787                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
788         } else {
789                 subs.mutex.Unlock()
790         }
791         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
792         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
793         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
794         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
795         c.registry.UpdateSubscriptionToDb(subs, c)
796         parentTrans.SendEvent(nil, 0)
797 }
798
799 //-------------------------------------------------------------------
800 // send to E2T Subscription Request
801 //-------------------------------------------------------------------
802 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
803         var err error
804         var event interface{} = nil
805         var timedOut bool = false
806
807         subReqMsg := subs.SubReqMsg
808         subReqMsg.RequestId = subs.GetReqId().RequestId
809         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
810         if err != nil {
811                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
812                 return event
813         }
814
815         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
816         c.WriteSubscriptionToDb(subs)
817
818         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
819                 desc := fmt.Sprintf("(retry %d)", retries)
820                 if retries == 0 {
821                         c.UpdateCounter(cSubReqToE2)
822                 } else {
823                         c.UpdateCounter(cSubReReqToE2)
824                 }
825                 c.rmrSendToE2T(desc, subs, trans)
826                 if subs.DoNotWaitSubResp == false {
827                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
828                         if timedOut {
829                                 c.UpdateCounter(cSubReqTimerExpiry)
830                                 continue
831                         }
832                 } else {
833                         // Simulating case where subscrition request has been sent but response has not been received before restart
834                         event = &SubmgrRestartTestEvent{}
835                 }
836                 break
837         }
838         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
839         return event
840 }
841
842 //-------------------------------------------------------------------
843 // send to E2T Subscription Delete Request
844 //-------------------------------------------------------------------
845
846 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
847         var err error
848         var event interface{}
849         var timedOut bool
850
851         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
852         subDelReqMsg.RequestId = subs.GetReqId().RequestId
853         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
854         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
855         if err != nil {
856                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
857                 return event
858         }
859
860         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
861                 desc := fmt.Sprintf("(retry %d)", retries)
862                 if retries == 0 {
863                         c.UpdateCounter(cSubDelReqToE2)
864                 } else {
865                         c.UpdateCounter(cSubDelReReqToE2)
866                 }
867                 c.rmrSendToE2T(desc, subs, trans)
868                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
869                 if timedOut {
870                         c.UpdateCounter(cSubDelReqTimerExpiry)
871                         continue
872                 }
873                 break
874         }
875         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
876         return event
877 }
878
879 //-------------------------------------------------------------------
880 // handle from E2T Subscription Response
881 //-------------------------------------------------------------------
882 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
883         xapp.Logger.Info("MSG from E2T: %s", params.String())
884         c.UpdateCounter(cSubRespFromE2)
885         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
886         if err != nil {
887                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
888                 return
889         }
890         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
891         if err != nil {
892                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
893                 return
894         }
895         trans := subs.GetTransaction()
896         if trans == nil {
897                 err = fmt.Errorf("Ongoing transaction not found")
898                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
899                 return
900         }
901         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
902         if sendOk == false {
903                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
904                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
905         }
906         return
907 }
908
909 //-------------------------------------------------------------------
910 // handle from E2T Subscription Failure
911 //-------------------------------------------------------------------
912 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
913         xapp.Logger.Info("MSG from E2T: %s", params.String())
914         c.UpdateCounter(cSubFailFromE2)
915         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
916         if err != nil {
917                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
918                 return
919         }
920         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
921         if err != nil {
922                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
923                 return
924         }
925         trans := subs.GetTransaction()
926         if trans == nil {
927                 err = fmt.Errorf("Ongoing transaction not found")
928                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
929                 return
930         }
931         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
932         if sendOk == false {
933                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
934                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
935         }
936         return
937 }
938
939 //-------------------------------------------------------------------
940 // handle from E2T Subscription Delete Response
941 //-------------------------------------------------------------------
942 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
943         xapp.Logger.Info("MSG from E2T: %s", params.String())
944         c.UpdateCounter(cSubDelRespFromE2)
945         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
946         if err != nil {
947                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
948                 return
949         }
950         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
951         if err != nil {
952                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
953                 return
954         }
955         trans := subs.GetTransaction()
956         if trans == nil {
957                 err = fmt.Errorf("Ongoing transaction not found")
958                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
959                 return
960         }
961         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
962         if sendOk == false {
963                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
964                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
965         }
966         return
967 }
968
969 //-------------------------------------------------------------------
970 // handle from E2T Subscription Delete Failure
971 //-------------------------------------------------------------------
972 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
973         xapp.Logger.Info("MSG from E2T: %s", params.String())
974         c.UpdateCounter(cSubDelFailFromE2)
975         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
976         if err != nil {
977                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
978                 return
979         }
980         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
981         if err != nil {
982                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
983                 return
984         }
985         trans := subs.GetTransaction()
986         if trans == nil {
987                 err = fmt.Errorf("Ongoing transaction not found")
988                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
989                 return
990         }
991         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
992         if sendOk == false {
993                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
994                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
995         }
996         return
997 }
998
999 //-------------------------------------------------------------------
1000 //
1001 //-------------------------------------------------------------------
1002 func typeofSubsMessage(v interface{}) string {
1003         if v == nil {
1004                 return "NIL"
1005         }
1006         switch v.(type) {
1007         case *e2ap.E2APSubscriptionRequest:
1008                 return "SubReq"
1009         case *e2ap.E2APSubscriptionResponse:
1010                 return "SubResp"
1011         case *e2ap.E2APSubscriptionFailure:
1012                 return "SubFail"
1013         case *e2ap.E2APSubscriptionDeleteRequest:
1014                 return "SubDelReq"
1015         case *e2ap.E2APSubscriptionDeleteResponse:
1016                 return "SubDelResp"
1017         case *e2ap.E2APSubscriptionDeleteFailure:
1018                 return "SubDelFail"
1019         default:
1020                 return "Unknown"
1021         }
1022 }
1023
1024 //-------------------------------------------------------------------
1025 //
1026 //-------------------------------------------------------------------
1027 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1028         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1029         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1030         if err != nil {
1031                 xapp.Logger.Error("%v", err)
1032         }
1033 }
1034
1035 //-------------------------------------------------------------------
1036 //
1037 //-------------------------------------------------------------------
1038 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1039
1040         if removeSubscriptionFromDb == true {
1041                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1042                 c.RemoveSubscriptionFromDb(subs)
1043         } else {
1044                 // Update is needed for successful response and merge case here
1045                 if subs.RetryFromXapp == false {
1046                         c.WriteSubscriptionToDb(subs)
1047                 }
1048         }
1049         subs.RetryFromXapp = false
1050 }
1051
1052 //-------------------------------------------------------------------
1053 //
1054 //-------------------------------------------------------------------
1055 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1056         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1057         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1058         if err != nil {
1059                 xapp.Logger.Error("%v", err)
1060         }
1061 }
1062
1063 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1064
1065         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1066
1067         // Send delete for every endpoint in the subscription
1068         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1069         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1070         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1071         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1072         if err != nil {
1073                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1074                 return
1075         }
1076         for _, endPoint := range subs.EpList.Endpoints {
1077                 params := &xapp.RMRParams{}
1078                 params.Mtype = mType
1079                 params.SubId = int(subs.GetReqId().InstanceId)
1080                 params.Xid = ""
1081                 params.Meid = subs.Meid
1082                 params.Src = endPoint.String()
1083                 params.PayloadLen = len(payload.Buf)
1084                 params.Payload = payload.Buf
1085                 params.Mbuf = nil
1086                 subs.DeleteFromDb = true
1087                 c.handleXAPPSubscriptionDeleteRequest(params)
1088         }
1089 }