REST duplicate detection
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70 var restDuplicateCtrl duplicateCtrl
71
72 type Control struct {
73         *xapp.RMRClient
74         e2ap          *E2ap
75         registry      *Registry
76         tracker       *Tracker
77         db            Sdlnterface
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81         LoggerLevel   uint32
82 }
83
84 type RMRMeid struct {
85         PlmnID  string
86         EnbID   string
87         RanName string
88 }
89
90 type SubmgrRestartTestEvent struct{}
91 type SubmgrRestartUpEvent struct{}
92
93 func init() {
94         xapp.Logger.Info("SUBMGR")
95         viper.AutomaticEnv()
96         viper.SetEnvPrefix("submgr")
97         viper.AllowEmptyEnv(true)
98 }
99
100 func NewControl() *Control {
101
102         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
103         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104
105         registry := new(Registry)
106         registry.Initialize()
107         registry.rtmgrClient = &rtmgrClient
108
109         tracker := new(Tracker)
110         tracker.Init()
111
112         c := &Control{e2ap: new(E2ap),
113                 registry:    registry,
114                 tracker:     tracker,
115                 db:          CreateSdl(),
116                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117                 LoggerLevel: 3,
118         }
119         c.ReadConfigParameters("")
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124
125         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         restDuplicateCtrl.Init()
132
133         // Read subscriptions from db
134         xapp.Logger.Info("Reading subscriptions from db")
135         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
136         if err != nil {
137                 xapp.Logger.Error("%v", err)
138         } else {
139                 c.registry.subIds = subIds
140                 c.registry.register = register
141                 c.HandleUncompletedSubscriptions(register)
142         }
143         return c
144 }
145
146 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
147         subscriptions, _ := c.registry.QueryHandler()
148         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
149 }
150
151 //-------------------------------------------------------------------
152 //
153 //-------------------------------------------------------------------
154 func (c *Control) ReadConfigParameters(f string) {
155
156         // viper.GetDuration returns nanoseconds
157         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
158         if e2tSubReqTimeout == 0 {
159                 e2tSubReqTimeout = 2000 * 1000000
160         }
161         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
162         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
163         if e2tSubDelReqTime == 0 {
164                 e2tSubDelReqTime = 2000 * 1000000
165         }
166         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
167         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
168         if e2tRecvMsgTimeout == 0 {
169                 e2tRecvMsgTimeout = 2000 * 1000000
170         }
171         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
172
173         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
174         // value 100ms used currently only in unittests.
175         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
176         if waitRouteCleanup_ms == 0 {
177                 waitRouteCleanup_ms = 5000 * 1000000
178         }
179         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
180
181         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
182         if e2tMaxSubReqTryCount == 0 {
183                 e2tMaxSubReqTryCount = 1
184         }
185         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
186
187         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
188         if e2tMaxSubDelReqTryCount == 0 {
189                 e2tMaxSubDelReqTryCount = 1
190         }
191         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
192
193         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
194         if readSubsFromDb == "" {
195                 readSubsFromDb = "true"
196         }
197         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
198         c.LoggerLevel = viper.GetUint32("logger.level")
199         if c.LoggerLevel == 0 {
200                 c.LoggerLevel = 3
201         }
202 }
203
204 //-------------------------------------------------------------------
205 //
206 //-------------------------------------------------------------------
207 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
208
209         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
210         for subId, subs := range register {
211                 if subs.SubRespRcvd == false {
212                         subs.NoRespToXapp = true
213                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
214                         c.SendSubscriptionDeleteReq(subs)
215                 }
216         }
217 }
218
219 func (c *Control) ReadyCB(data interface{}) {
220         if c.RMRClient == nil {
221                 c.RMRClient = xapp.Rmr
222         }
223 }
224
225 func (c *Control) Run() {
226         xapp.SetReadyCB(c.ReadyCB, nil)
227         xapp.AddConfigChangeListener(c.ReadConfigParameters)
228         xapp.Run(c)
229 }
230
231 //-------------------------------------------------------------------
232 //
233 //-------------------------------------------------------------------
234 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
235
236         c.CntRecvMsg++
237         c.UpdateCounter(cRestSubReqFromXapp)
238
239         subResp := models.SubscriptionResponse{}
240         p := params.(*models.SubscriptionParams)
241
242         if c.LoggerLevel > 2 {
243                 c.PrintRESTSubscriptionRequest(p)
244         }
245
246         if p.ClientEndpoint == nil {
247                 xapp.Logger.Error("ClientEndpoint == nil")
248                 c.UpdateCounter(cRestSubFailToXapp)
249                 return nil, fmt.Errorf("")
250         }
251
252         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
253         if err != nil {
254                 xapp.Logger.Error("%s", err.Error())
255                 c.UpdateCounter(cRestSubFailToXapp)
256                 return nil, err
257         }
258         var restSubId string
259         var restSubscription *RESTSubscription
260         if p.SubscriptionID == "" {
261                 restSubId = ksuid.New().String()
262                 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
263                 if err != nil {
264                         xapp.Logger.Error("%s", err.Error())
265                         c.UpdateCounter(cRestSubFailToXapp)
266                         return nil, err
267                 }
268
269         } else {
270                 restSubId = p.SubscriptionID
271                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
272                 if err != nil {
273                         xapp.Logger.Error("%s", err.Error())
274                         c.UpdateCounter(cRestSubFailToXapp)
275                         return nil, err
276                 }
277         }
278
279         subResp.SubscriptionID = &restSubId
280         subReqList := e2ap.SubscriptionRequestList{}
281         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
282         if err != nil {
283                 xapp.Logger.Error("%s", err.Error())
284                 c.registry.DeleteRESTSubscription(&restSubId)
285                 c.UpdateCounter(cRestSubFailToXapp)
286                 return nil, err
287         }
288
289         err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
290
291         if err != nil {
292                 // We were unable to detect whether this request was duplicate or not, proceed
293                 xapp.Logger.Info("%s - proceeding with the request", err.Error())
294         } else {
295                 if duplicate {
296                         if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
297                                 xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
298                                 c.UpdateCounter(cRestSubRespToXapp)
299                                 return &subResp, nil
300                         }
301                 }
302                 restSubscription.Md5sumOngoing = md5sum
303         }
304
305         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
306
307         c.UpdateCounter(cRestSubRespToXapp)
308         return &subResp, nil
309 }
310
311 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
312         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
313
314         // Send notification to xApp that prosessing of a Subscription Request has failed.
315         e2EventInstanceID := (int64)(0)
316         errorCause := err.Error()
317         resp := &models.SubscriptionResponse{
318                 SubscriptionID: restSubId,
319                 SubscriptionInstances: []*models.SubscriptionInstance{
320                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
321                                 ErrorCause:          &errorCause,
322                                 XappEventInstanceID: &xAppEventInstanceID},
323                 },
324         }
325         // Mark REST subscription request processed.
326         restSubscription.SetProcessed()
327         if trans != nil {
328                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
329                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
330         } else {
331                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
332                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
333         }
334         xapp.Subscription.Notify(resp, *clientEndpoint)
335         c.UpdateCounter(cRestSubFailNotifToXapp)
336 }
337
338 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
339         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
340
341         // Store successfully processed InstanceId for deletion
342         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
343         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
344
345         // Send notification to xApp that a Subscription Request has been processed.
346         resp := &models.SubscriptionResponse{
347                 SubscriptionID: restSubId,
348                 SubscriptionInstances: []*models.SubscriptionInstance{
349                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
350                                 ErrorCause:          nil,
351                                 XappEventInstanceID: &xAppEventInstanceID},
352                 },
353         }
354         // Mark REST subscription request processesd.
355         restSubscription.SetProcessed()
356         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
357                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
358         xapp.Subscription.Notify(resp, *clientEndpoint)
359         c.UpdateCounter(cRestSubNotifToXapp)
360 }
361
362 //-------------------------------------------------------------------
363 //
364 //-------------------------------------------------------------------
365
366 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
367         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
368
369         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
370
371         var xAppEventInstanceID int64
372         var e2EventInstanceID int64
373
374         defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing)
375
376         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
377                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
378                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
379
380                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
381                 if trans == nil {
382                         // Send notification to xApp that prosessing of a Subscription Request has failed.
383                         err := fmt.Errorf("Tracking failure")
384                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
385                         continue
386                 }
387
388                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
389
390                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
391                 if err != nil {
392                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
393                 } else {
394                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
395                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
396                                 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
397                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
398                 }
399                 trans.Release()
400         }
401 }
402
403 //-------------------------------------------------------------------
404 //
405 //------------------------------------------------------------------
406 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
407         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
408
409         err := c.tracker.Track(trans)
410         if err != nil {
411                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
412                 err = fmt.Errorf("Tracking failure")
413                 return nil, err
414         }
415
416         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
417         if err != nil {
418                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
419                 return nil, err
420         }
421
422         //
423         // Wake subs request
424         //
425         go c.handleSubscriptionCreate(subs, trans)
426         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
427
428         err = nil
429         if event != nil {
430                 switch themsg := event.(type) {
431                 case *e2ap.E2APSubscriptionResponse:
432                         trans.Release()
433                         return themsg, nil
434                 case *e2ap.E2APSubscriptionFailure:
435                         err = fmt.Errorf("E2 SubscriptionFailure received")
436                         return nil, err
437                 default:
438                         err = fmt.Errorf("unexpected E2 subscription response received")
439                         break
440                 }
441         } else {
442                 err = fmt.Errorf("E2 subscription response timeout")
443         }
444
445         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
446         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
447         return nil, err
448 }
449
450 //-------------------------------------------------------------------
451 //
452 //-------------------------------------------------------------------
453 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
454
455         c.CntRecvMsg++
456         c.UpdateCounter(cRestSubDelReqFromXapp)
457
458         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
459
460         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
461         if err != nil {
462                 xapp.Logger.Error("%s", err.Error())
463                 if restSubscription == nil {
464                         // Subscription was not found
465                         return nil
466                 } else {
467                         if restSubscription.SubReqOngoing == true {
468                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
469                                 xapp.Logger.Error("%s", err.Error())
470                                 return err
471                         } else if restSubscription.SubDelReqOngoing == true {
472                                 // Previous request for same restSubId still ongoing
473                                 return nil
474                         }
475                 }
476         }
477
478         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
479         go func() {
480                 for _, instanceId := range restSubscription.InstanceIds {
481                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
482
483                         if err != nil {
484                                 xapp.Logger.Error("%s", err.Error())
485                                 //return err
486                         }
487                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
488                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
489                         restSubscription.DeleteE2InstanceId(instanceId)
490                 }
491                 c.registry.DeleteRESTSubscription(&restSubId)
492         }()
493
494         c.UpdateCounter(cRestSubDelRespToXapp)
495
496         return nil
497 }
498
499 //-------------------------------------------------------------------
500 //
501 //-------------------------------------------------------------------
502 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
503
504         var xAppEventInstanceID int64
505         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
506         if err != nil {
507                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
508                         restSubId, instanceId, idstring(err, nil))
509                 return xAppEventInstanceID, nil
510         }
511
512         xAppEventInstanceID = int64(subs.ReqId.Id)
513         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
514         if trans == nil {
515                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
516                 xapp.Logger.Error("%s", err.Error())
517         }
518         defer trans.Release()
519
520         err = c.tracker.Track(trans)
521         if err != nil {
522                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
523                 xapp.Logger.Error("%s", err.Error())
524                 return xAppEventInstanceID, &time.ParseError{}
525         }
526         //
527         // Wake subs delete
528         //
529         go c.handleSubscriptionDelete(subs, trans)
530         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
531
532         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
533
534         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
535
536         return xAppEventInstanceID, nil
537 }
538
539 //-------------------------------------------------------------------
540 //
541 //-------------------------------------------------------------------
542 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
543         xapp.Logger.Info("QueryHandler() called")
544
545         c.CntRecvMsg++
546
547         return c.registry.QueryHandler()
548 }
549
550 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
551         xapp.Logger.Info("TestRestHandler() called")
552
553         pathParams := mux.Vars(r)
554         s := pathParams["testId"]
555
556         // This can be used to delete single subscription from db
557         if contains := strings.Contains(s, "deletesubid="); contains == true {
558                 var splits = strings.Split(s, "=")
559                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
560                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
561                         c.RemoveSubscriptionFromSdl(uint32(subId))
562                         return
563                 }
564         }
565
566         // This can be used to remove all subscriptions db from
567         if s == "emptydb" {
568                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
569                 c.RemoveAllSubscriptionsFromSdl()
570                 return
571         }
572
573         // This is meant to cause submgr's restart in testing
574         if s == "restart" {
575                 xapp.Logger.Info("os.Exit(1) called")
576                 os.Exit(1)
577         }
578
579         xapp.Logger.Info("Unsupported rest command received %s", s)
580 }
581
582 //-------------------------------------------------------------------
583 //
584 //-------------------------------------------------------------------
585
586 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
587         params := &xapp.RMRParams{}
588         params.Mtype = trans.GetMtype()
589         params.SubId = int(subs.GetReqId().InstanceId)
590         params.Xid = ""
591         params.Meid = subs.GetMeid()
592         params.Src = ""
593         params.PayloadLen = len(trans.Payload.Buf)
594         params.Payload = trans.Payload.Buf
595         params.Mbuf = nil
596         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
597         err = c.SendWithRetry(params, false, 5)
598         if err != nil {
599                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
600         }
601         return err
602 }
603
604 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
605
606         params := &xapp.RMRParams{}
607         params.Mtype = trans.GetMtype()
608         params.SubId = int(subs.GetReqId().InstanceId)
609         params.Xid = trans.GetXid()
610         params.Meid = trans.GetMeid()
611         params.Src = ""
612         params.PayloadLen = len(trans.Payload.Buf)
613         params.Payload = trans.Payload.Buf
614         params.Mbuf = nil
615         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
616         err = c.SendWithRetry(params, false, 5)
617         if err != nil {
618                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
619         }
620         return err
621 }
622
623 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
624         if c.RMRClient == nil {
625                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
626                 xapp.Logger.Error("%s", err.Error())
627                 return
628         }
629         c.CntRecvMsg++
630
631         defer c.RMRClient.Free(msg.Mbuf)
632
633         // xapp-frame might use direct access to c buffer and
634         // when msg.Mbuf is freed, someone might take it into use
635         // and payload data might be invalid inside message handle function
636         //
637         // subscriptions won't load system a lot so there is no
638         // real performance hit by cloning buffer into new go byte slice
639         cPay := append(msg.Payload[:0:0], msg.Payload...)
640         msg.Payload = cPay
641         msg.PayloadLen = len(cPay)
642
643         switch msg.Mtype {
644         case xapp.RIC_SUB_REQ:
645                 go c.handleXAPPSubscriptionRequest(msg)
646         case xapp.RIC_SUB_RESP:
647                 go c.handleE2TSubscriptionResponse(msg)
648         case xapp.RIC_SUB_FAILURE:
649                 go c.handleE2TSubscriptionFailure(msg)
650         case xapp.RIC_SUB_DEL_REQ:
651                 go c.handleXAPPSubscriptionDeleteRequest(msg)
652         case xapp.RIC_SUB_DEL_RESP:
653                 go c.handleE2TSubscriptionDeleteResponse(msg)
654         case xapp.RIC_SUB_DEL_FAILURE:
655                 go c.handleE2TSubscriptionDeleteFailure(msg)
656         default:
657                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
658         }
659         return
660 }
661
662 //-------------------------------------------------------------------
663 // handle from XAPP Subscription Request
664 //------------------------------------------------------------------
665 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
666         xapp.Logger.Info("MSG from XAPP: %s", params.String())
667         c.UpdateCounter(cSubReqFromXapp)
668
669         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
670         if err != nil {
671                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
672                 return
673         }
674
675         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
676         if trans == nil {
677                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
678                 return
679         }
680         defer trans.Release()
681
682         if err = c.tracker.Track(trans); err != nil {
683                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
684                 return
685         }
686
687         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
688         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
689         if err != nil {
690                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
691                 return
692         }
693
694         c.wakeSubscriptionRequest(subs, trans)
695 }
696
697 //-------------------------------------------------------------------
698 // Wake Subscription Request to E2node
699 //------------------------------------------------------------------
700 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
701
702         go c.handleSubscriptionCreate(subs, trans)
703         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
704         var err error
705         if event != nil {
706                 switch themsg := event.(type) {
707                 case *e2ap.E2APSubscriptionResponse:
708                         themsg.RequestId.Id = trans.RequestId.Id
709                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
710                         if err == nil {
711                                 trans.Release()
712                                 c.UpdateCounter(cSubRespToXapp)
713                                 c.rmrSendToXapp("", subs, trans)
714                                 return
715                         }
716                 case *e2ap.E2APSubscriptionFailure:
717                         themsg.RequestId.Id = trans.RequestId.Id
718                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
719                         if err == nil {
720                                 c.UpdateCounter(cSubFailToXapp)
721                                 c.rmrSendToXapp("", subs, trans)
722                         }
723                 default:
724                         break
725                 }
726         }
727         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
728         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
729 }
730
731 //-------------------------------------------------------------------
732 // handle from XAPP Subscription Delete Request
733 //------------------------------------------------------------------
734 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
735         xapp.Logger.Info("MSG from XAPP: %s", params.String())
736         c.UpdateCounter(cSubDelReqFromXapp)
737
738         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
739         if err != nil {
740                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
741                 return
742         }
743
744         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
745         if trans == nil {
746                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
747                 return
748         }
749         defer trans.Release()
750
751         err = c.tracker.Track(trans)
752         if err != nil {
753                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
754                 return
755         }
756
757         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
758         if err != nil {
759                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
760                 return
761         }
762
763         //
764         // Wake subs delete
765         //
766         go c.handleSubscriptionDelete(subs, trans)
767         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
768
769         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
770
771         if subs.NoRespToXapp == true {
772                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
773                 return
774         }
775
776         // Whatever is received success, fail or timeout, send successful delete response
777         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
778         subDelRespMsg.RequestId.Id = trans.RequestId.Id
779         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
780         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
781         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
782         if err == nil {
783                 c.UpdateCounter(cSubDelRespToXapp)
784                 c.rmrSendToXapp("", subs, trans)
785         }
786
787         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
788         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
789 }
790
791 //-------------------------------------------------------------------
792 // SUBS CREATE Handling
793 //-------------------------------------------------------------------
794 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
795
796         var removeSubscriptionFromDb bool = false
797         trans := c.tracker.NewSubsTransaction(subs)
798         subs.WaitTransactionTurn(trans)
799         defer subs.ReleaseTransactionTurn(trans)
800         defer trans.Release()
801
802         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
803
804         subRfMsg, valid := subs.GetCachedResponse()
805         if subRfMsg == nil && valid == true {
806                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
807                 switch event.(type) {
808                 case *e2ap.E2APSubscriptionResponse:
809                         subRfMsg, valid = subs.SetCachedResponse(event, true)
810                         subs.SubRespRcvd = true
811                 case *e2ap.E2APSubscriptionFailure:
812                         removeSubscriptionFromDb = true
813                         subRfMsg, valid = subs.SetCachedResponse(event, false)
814                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
815                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
816                 case *SubmgrRestartTestEvent:
817                         // This simulates that no response has been received and after restart subscriptions are restored from db
818                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
819                         return
820                 default:
821                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
822                         removeSubscriptionFromDb = true
823                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
824                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
825                 }
826                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
827         } else {
828                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
829         }
830
831         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
832         if valid == false {
833                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
834         }
835
836         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
837         parentTrans.SendEvent(subRfMsg, 0)
838 }
839
840 //-------------------------------------------------------------------
841 // SUBS DELETE Handling
842 //-------------------------------------------------------------------
843
844 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
845
846         trans := c.tracker.NewSubsTransaction(subs)
847         subs.WaitTransactionTurn(trans)
848         defer subs.ReleaseTransactionTurn(trans)
849         defer trans.Release()
850
851         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
852
853         subs.mutex.Lock()
854
855         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
856                 subs.valid = false
857                 subs.mutex.Unlock()
858                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
859         } else {
860                 subs.mutex.Unlock()
861         }
862         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
863         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
864         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
865         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
866         c.registry.UpdateSubscriptionToDb(subs, c)
867         parentTrans.SendEvent(nil, 0)
868 }
869
870 //-------------------------------------------------------------------
871 // send to E2T Subscription Request
872 //-------------------------------------------------------------------
873 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
874         var err error
875         var event interface{} = nil
876         var timedOut bool = false
877         const ricRequestorId = 123
878
879         subReqMsg := subs.SubReqMsg
880         subReqMsg.RequestId = subs.GetReqId().RequestId
881         subReqMsg.RequestId.Id = ricRequestorId
882         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
883         if err != nil {
884                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
885                 return event
886         }
887
888         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
889         c.WriteSubscriptionToDb(subs)
890
891         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
892                 desc := fmt.Sprintf("(retry %d)", retries)
893                 if retries == 0 {
894                         c.UpdateCounter(cSubReqToE2)
895                 } else {
896                         c.UpdateCounter(cSubReReqToE2)
897                 }
898                 c.rmrSendToE2T(desc, subs, trans)
899                 if subs.DoNotWaitSubResp == false {
900                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
901                         if timedOut {
902                                 c.UpdateCounter(cSubReqTimerExpiry)
903                                 continue
904                         }
905                 } else {
906                         // Simulating case where subscrition request has been sent but response has not been received before restart
907                         event = &SubmgrRestartTestEvent{}
908                 }
909                 break
910         }
911         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
912         return event
913 }
914
915 //-------------------------------------------------------------------
916 // send to E2T Subscription Delete Request
917 //-------------------------------------------------------------------
918
919 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
920         var err error
921         var event interface{}
922         var timedOut bool
923         const ricRequestorId = 123
924
925         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
926         subDelReqMsg.RequestId = subs.GetReqId().RequestId
927         subDelReqMsg.RequestId.Id = ricRequestorId
928         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
929         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
930         if err != nil {
931                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
932                 return event
933         }
934
935         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
936                 desc := fmt.Sprintf("(retry %d)", retries)
937                 if retries == 0 {
938                         c.UpdateCounter(cSubDelReqToE2)
939                 } else {
940                         c.UpdateCounter(cSubDelReReqToE2)
941                 }
942                 c.rmrSendToE2T(desc, subs, trans)
943                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
944                 if timedOut {
945                         c.UpdateCounter(cSubDelReqTimerExpiry)
946                         continue
947                 }
948                 break
949         }
950         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
951         return event
952 }
953
954 //-------------------------------------------------------------------
955 // handle from E2T Subscription Response
956 //-------------------------------------------------------------------
957 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
958         xapp.Logger.Info("MSG from E2T: %s", params.String())
959         c.UpdateCounter(cSubRespFromE2)
960
961         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
962         if err != nil {
963                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
964                 return
965         }
966         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
967         if err != nil {
968                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
969                 return
970         }
971         trans := subs.GetTransaction()
972         if trans == nil {
973                 err = fmt.Errorf("Ongoing transaction not found")
974                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
975                 return
976         }
977         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
978         if sendOk == false {
979                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
980                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
981         }
982         return
983 }
984
985 //-------------------------------------------------------------------
986 // handle from E2T Subscription Failure
987 //-------------------------------------------------------------------
988 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
989         xapp.Logger.Info("MSG from E2T: %s", params.String())
990         c.UpdateCounter(cSubFailFromE2)
991         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
992         if err != nil {
993                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
994                 return
995         }
996         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
997         if err != nil {
998                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
999                 return
1000         }
1001         trans := subs.GetTransaction()
1002         if trans == nil {
1003                 err = fmt.Errorf("Ongoing transaction not found")
1004                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1005                 return
1006         }
1007         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1008         if sendOk == false {
1009                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1010                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1011         }
1012         return
1013 }
1014
1015 //-------------------------------------------------------------------
1016 // handle from E2T Subscription Delete Response
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1019         xapp.Logger.Info("MSG from E2T: %s", params.String())
1020         c.UpdateCounter(cSubDelRespFromE2)
1021         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1022         if err != nil {
1023                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1024                 return
1025         }
1026         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1027         if err != nil {
1028                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1029                 return
1030         }
1031         trans := subs.GetTransaction()
1032         if trans == nil {
1033                 err = fmt.Errorf("Ongoing transaction not found")
1034                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1035                 return
1036         }
1037         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1038         if sendOk == false {
1039                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1040                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1041         }
1042         return
1043 }
1044
1045 //-------------------------------------------------------------------
1046 // handle from E2T Subscription Delete Failure
1047 //-------------------------------------------------------------------
1048 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1049         xapp.Logger.Info("MSG from E2T: %s", params.String())
1050         c.UpdateCounter(cSubDelFailFromE2)
1051         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1052         if err != nil {
1053                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1054                 return
1055         }
1056         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1057         if err != nil {
1058                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1059                 return
1060         }
1061         trans := subs.GetTransaction()
1062         if trans == nil {
1063                 err = fmt.Errorf("Ongoing transaction not found")
1064                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1065                 return
1066         }
1067         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1068         if sendOk == false {
1069                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1070                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1071         }
1072         return
1073 }
1074
1075 //-------------------------------------------------------------------
1076 //
1077 //-------------------------------------------------------------------
1078 func typeofSubsMessage(v interface{}) string {
1079         if v == nil {
1080                 return "NIL"
1081         }
1082         switch v.(type) {
1083         //case *e2ap.E2APSubscriptionRequest:
1084         //      return "SubReq"
1085         case *e2ap.E2APSubscriptionResponse:
1086                 return "SubResp"
1087         case *e2ap.E2APSubscriptionFailure:
1088                 return "SubFail"
1089         //case *e2ap.E2APSubscriptionDeleteRequest:
1090         //      return "SubDelReq"
1091         case *e2ap.E2APSubscriptionDeleteResponse:
1092                 return "SubDelResp"
1093         case *e2ap.E2APSubscriptionDeleteFailure:
1094                 return "SubDelFail"
1095         default:
1096                 return "Unknown"
1097         }
1098 }
1099
1100 //-------------------------------------------------------------------
1101 //
1102 //-------------------------------------------------------------------
1103 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1104         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1105         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1106         if err != nil {
1107                 xapp.Logger.Error("%v", err)
1108         }
1109 }
1110
1111 //-------------------------------------------------------------------
1112 //
1113 //-------------------------------------------------------------------
1114 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1115
1116         if removeSubscriptionFromDb == true {
1117                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1118                 c.RemoveSubscriptionFromDb(subs)
1119         } else {
1120                 // Update is needed for successful response and merge case here
1121                 if subs.RetryFromXapp == false {
1122                         c.WriteSubscriptionToDb(subs)
1123                 }
1124         }
1125         subs.RetryFromXapp = false
1126 }
1127
1128 //-------------------------------------------------------------------
1129 //
1130 //-------------------------------------------------------------------
1131 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1132         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1133         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1134         if err != nil {
1135                 xapp.Logger.Error("%v", err)
1136         }
1137 }
1138
1139 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1140
1141         const ricRequestorId = 123
1142         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1143
1144         // Send delete for every endpoint in the subscription
1145         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1146         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1147         subDelReqMsg.RequestId.Id = ricRequestorId
1148         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1149         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1150         if err != nil {
1151                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1152                 return
1153         }
1154         for _, endPoint := range subs.EpList.Endpoints {
1155                 params := &xapp.RMRParams{}
1156                 params.Mtype = mType
1157                 params.SubId = int(subs.GetReqId().InstanceId)
1158                 params.Xid = ""
1159                 params.Meid = subs.Meid
1160                 params.Src = endPoint.String()
1161                 params.PayloadLen = len(payload.Buf)
1162                 params.Payload = payload.Buf
1163                 params.Mbuf = nil
1164                 subs.DeleteFromDb = true
1165                 c.handleXAPPSubscriptionDeleteRequest(params)
1166         }
1167 }
1168
1169 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1170
1171         fmt.Println("CRESTSubscriptionRequest")
1172         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1173
1174         if p.ClientEndpoint.HTTPPort != nil {
1175                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1176         } else {
1177                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1178         }
1179
1180         if p.ClientEndpoint.RMRPort != nil {
1181                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1182         } else {
1183                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1184         }
1185
1186         if p.Meid != nil {
1187                 fmt.Printf("  Meid = %s\n", *p.Meid)
1188         } else {
1189                 fmt.Println("  Meid = nil")
1190         }
1191
1192         for _, subscriptionDetail := range p.SubscriptionDetails {
1193                 if p.RANFunctionID != nil {
1194                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1195                 } else {
1196                         fmt.Println("  RANFunctionID = nil")
1197                 }
1198                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1199                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1200
1201                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1202                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1203                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1204                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1205
1206                         if actionToBeSetup.SubsequentAction != nil {
1207                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1208                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1209                         } else {
1210                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1211                         }
1212                 }
1213         }
1214 }