Increment counters before sending Notification response
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70 var restDuplicateCtrl duplicateCtrl
71
72 type Control struct {
73         *xapp.RMRClient
74         e2ap          *E2ap
75         registry      *Registry
76         tracker       *Tracker
77         db            Sdlnterface
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81         LoggerLevel   uint32
82 }
83
84 type RMRMeid struct {
85         PlmnID  string
86         EnbID   string
87         RanName string
88 }
89
90 type SubmgrRestartTestEvent struct{}
91 type SubmgrRestartUpEvent struct{}
92
93 func init() {
94         xapp.Logger.Info("SUBMGR")
95         viper.AutomaticEnv()
96         viper.SetEnvPrefix("submgr")
97         viper.AllowEmptyEnv(true)
98 }
99
100 func NewControl() *Control {
101
102         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
103         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104
105         registry := new(Registry)
106         registry.Initialize()
107         registry.rtmgrClient = &rtmgrClient
108
109         tracker := new(Tracker)
110         tracker.Init()
111
112         c := &Control{e2ap: new(E2ap),
113                 registry:    registry,
114                 tracker:     tracker,
115                 db:          CreateSdl(),
116                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117                 LoggerLevel: 3,
118         }
119         c.ReadConfigParameters("")
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124
125         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         restDuplicateCtrl.Init()
132
133         // Read subscriptions from db
134         xapp.Logger.Info("Reading subscriptions from db")
135         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
136         if err != nil {
137                 xapp.Logger.Error("%v", err)
138         } else {
139                 c.registry.subIds = subIds
140                 c.registry.register = register
141                 c.HandleUncompletedSubscriptions(register)
142         }
143         return c
144 }
145
146 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
147         subscriptions, _ := c.registry.QueryHandler()
148         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
149 }
150
151 //-------------------------------------------------------------------
152 //
153 //-------------------------------------------------------------------
154 func (c *Control) ReadConfigParameters(f string) {
155
156         // viper.GetDuration returns nanoseconds
157         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
158         if e2tSubReqTimeout == 0 {
159                 e2tSubReqTimeout = 2000 * 1000000
160         }
161         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
162         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
163         if e2tSubDelReqTime == 0 {
164                 e2tSubDelReqTime = 2000 * 1000000
165         }
166         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
167         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
168         if e2tRecvMsgTimeout == 0 {
169                 e2tRecvMsgTimeout = 2000 * 1000000
170         }
171         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
172
173         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
174         // value 100ms used currently only in unittests.
175         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
176         if waitRouteCleanup_ms == 0 {
177                 waitRouteCleanup_ms = 5000 * 1000000
178         }
179         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
180
181         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
182         if e2tMaxSubReqTryCount == 0 {
183                 e2tMaxSubReqTryCount = 1
184         }
185         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
186
187         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
188         if e2tMaxSubDelReqTryCount == 0 {
189                 e2tMaxSubDelReqTryCount = 1
190         }
191         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
192
193         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
194         if readSubsFromDb == "" {
195                 readSubsFromDb = "true"
196         }
197         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
198         c.LoggerLevel = viper.GetUint32("logger.level")
199         if c.LoggerLevel == 0 {
200                 c.LoggerLevel = 3
201         }
202 }
203
204 //-------------------------------------------------------------------
205 //
206 //-------------------------------------------------------------------
207 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
208
209         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
210         for subId, subs := range register {
211                 if subs.SubRespRcvd == false {
212                         subs.NoRespToXapp = true
213                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
214                         c.SendSubscriptionDeleteReq(subs)
215                 }
216         }
217 }
218
219 func (c *Control) ReadyCB(data interface{}) {
220         if c.RMRClient == nil {
221                 c.RMRClient = xapp.Rmr
222         }
223 }
224
225 func (c *Control) Run() {
226         xapp.SetReadyCB(c.ReadyCB, nil)
227         xapp.AddConfigChangeListener(c.ReadConfigParameters)
228         xapp.Run(c)
229 }
230
231 //-------------------------------------------------------------------
232 //
233 //-------------------------------------------------------------------
234 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
235
236         c.CntRecvMsg++
237         c.UpdateCounter(cRestSubReqFromXapp)
238
239         subResp := models.SubscriptionResponse{}
240         p := params.(*models.SubscriptionParams)
241
242         if c.LoggerLevel > 2 {
243                 c.PrintRESTSubscriptionRequest(p)
244         }
245
246         if p.ClientEndpoint == nil {
247                 xapp.Logger.Error("ClientEndpoint == nil")
248                 c.UpdateCounter(cRestSubFailToXapp)
249                 return nil, fmt.Errorf("")
250         }
251
252         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
253         if err != nil {
254                 xapp.Logger.Error("%s", err.Error())
255                 c.UpdateCounter(cRestSubFailToXapp)
256                 return nil, err
257         }
258         var restSubId string
259         var restSubscription *RESTSubscription
260         if p.SubscriptionID == "" {
261                 restSubId = ksuid.New().String()
262                 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
263                 if err != nil {
264                         xapp.Logger.Error("%s", err.Error())
265                         c.UpdateCounter(cRestSubFailToXapp)
266                         return nil, err
267                 }
268
269         } else {
270                 restSubId = p.SubscriptionID
271                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
272                 if err != nil {
273                         xapp.Logger.Error("%s", err.Error())
274                         c.UpdateCounter(cRestSubFailToXapp)
275                         return nil, err
276                 }
277         }
278
279         subResp.SubscriptionID = &restSubId
280         subReqList := e2ap.SubscriptionRequestList{}
281         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
282         if err != nil {
283                 xapp.Logger.Error("%s", err.Error())
284                 c.registry.DeleteRESTSubscription(&restSubId)
285                 c.UpdateCounter(cRestSubFailToXapp)
286                 return nil, err
287         }
288
289         err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
290
291         if err != nil {
292                 // We were unable to detect whether this request was duplicate or not, proceed
293                 xapp.Logger.Info("%s - proceeding with the request", err.Error())
294         } else {
295                 if duplicate {
296                         if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
297                                 xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
298                                 c.UpdateCounter(cRestSubRespToXapp)
299                                 return &subResp, nil
300                         }
301                 }
302                 restSubscription.Md5sumOngoing = md5sum
303         }
304
305         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
306
307         c.UpdateCounter(cRestSubRespToXapp)
308         return &subResp, nil
309 }
310
311 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
312         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
313
314         // Send notification to xApp that prosessing of a Subscription Request has failed.
315         e2EventInstanceID := (int64)(0)
316         errorCause := err.Error()
317         resp := &models.SubscriptionResponse{
318                 SubscriptionID: restSubId,
319                 SubscriptionInstances: []*models.SubscriptionInstance{
320                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
321                                 ErrorCause:          &errorCause,
322                                 XappEventInstanceID: &xAppEventInstanceID},
323                 },
324         }
325         // Mark REST subscription request processed.
326         restSubscription.SetProcessed()
327         if trans != nil {
328                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
329                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
330         } else {
331                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
332                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
333         }
334
335         c.UpdateCounter(cRestSubFailNotifToXapp)
336         xapp.Subscription.Notify(resp, *clientEndpoint)
337 }
338
339 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
340         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
341
342         // Store successfully processed InstanceId for deletion
343         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
344         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
345
346         // Send notification to xApp that a Subscription Request has been processed.
347         resp := &models.SubscriptionResponse{
348                 SubscriptionID: restSubId,
349                 SubscriptionInstances: []*models.SubscriptionInstance{
350                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
351                                 ErrorCause:          nil,
352                                 XappEventInstanceID: &xAppEventInstanceID},
353                 },
354         }
355         // Mark REST subscription request processesd.
356         restSubscription.SetProcessed()
357         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
358                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
359
360         c.UpdateCounter(cRestSubNotifToXapp)
361         xapp.Subscription.Notify(resp, *clientEndpoint)
362 }
363
364 //-------------------------------------------------------------------
365 //
366 //-------------------------------------------------------------------
367
368 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
369         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
370
371         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
372
373         var xAppEventInstanceID int64
374         var e2EventInstanceID int64
375
376         defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing)
377
378         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
379                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
380                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
381
382                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
383                 if trans == nil {
384                         // Send notification to xApp that prosessing of a Subscription Request has failed.
385                         err := fmt.Errorf("Tracking failure")
386                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
387                         continue
388                 }
389
390                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
391
392                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
393                 if err != nil {
394                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
395                 } else {
396                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
397                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
398                                 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
399                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
400                 }
401                 trans.Release()
402         }
403 }
404
405 //-------------------------------------------------------------------
406 //
407 //------------------------------------------------------------------
408 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
409         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
410
411         err := c.tracker.Track(trans)
412         if err != nil {
413                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
414                 err = fmt.Errorf("Tracking failure")
415                 return nil, err
416         }
417
418         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
419         if err != nil {
420                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
421                 return nil, err
422         }
423
424         //
425         // Wake subs request
426         //
427         go c.handleSubscriptionCreate(subs, trans)
428         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
429
430         err = nil
431         if event != nil {
432                 switch themsg := event.(type) {
433                 case *e2ap.E2APSubscriptionResponse:
434                         trans.Release()
435                         return themsg, nil
436                 case *e2ap.E2APSubscriptionFailure:
437                         err = fmt.Errorf("E2 SubscriptionFailure received")
438                         return nil, err
439                 default:
440                         err = fmt.Errorf("unexpected E2 subscription response received")
441                         break
442                 }
443         } else {
444                 err = fmt.Errorf("E2 subscription response timeout")
445         }
446
447         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
448         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
449         return nil, err
450 }
451
452 //-------------------------------------------------------------------
453 //
454 //-------------------------------------------------------------------
455 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
456
457         c.CntRecvMsg++
458         c.UpdateCounter(cRestSubDelReqFromXapp)
459
460         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
461
462         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
463         if err != nil {
464                 xapp.Logger.Error("%s", err.Error())
465                 if restSubscription == nil {
466                         // Subscription was not found
467                         return nil
468                 } else {
469                         if restSubscription.SubReqOngoing == true {
470                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
471                                 xapp.Logger.Error("%s", err.Error())
472                                 return err
473                         } else if restSubscription.SubDelReqOngoing == true {
474                                 // Previous request for same restSubId still ongoing
475                                 return nil
476                         }
477                 }
478         }
479
480         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
481         go func() {
482                 for _, instanceId := range restSubscription.InstanceIds {
483                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
484
485                         if err != nil {
486                                 xapp.Logger.Error("%s", err.Error())
487                                 //return err
488                         }
489                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
490                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
491                         restSubscription.DeleteE2InstanceId(instanceId)
492                 }
493                 c.registry.DeleteRESTSubscription(&restSubId)
494         }()
495
496         c.UpdateCounter(cRestSubDelRespToXapp)
497
498         return nil
499 }
500
501 //-------------------------------------------------------------------
502 //
503 //-------------------------------------------------------------------
504 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
505
506         var xAppEventInstanceID int64
507         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
508         if err != nil {
509                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
510                         restSubId, instanceId, idstring(err, nil))
511                 return xAppEventInstanceID, nil
512         }
513
514         xAppEventInstanceID = int64(subs.ReqId.Id)
515         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
516         if trans == nil {
517                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
518                 xapp.Logger.Error("%s", err.Error())
519         }
520         defer trans.Release()
521
522         err = c.tracker.Track(trans)
523         if err != nil {
524                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
525                 xapp.Logger.Error("%s", err.Error())
526                 return xAppEventInstanceID, &time.ParseError{}
527         }
528         //
529         // Wake subs delete
530         //
531         go c.handleSubscriptionDelete(subs, trans)
532         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
533
534         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
535
536         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
537
538         return xAppEventInstanceID, nil
539 }
540
541 //-------------------------------------------------------------------
542 //
543 //-------------------------------------------------------------------
544 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
545         xapp.Logger.Info("QueryHandler() called")
546
547         c.CntRecvMsg++
548
549         return c.registry.QueryHandler()
550 }
551
552 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
553         xapp.Logger.Info("TestRestHandler() called")
554
555         pathParams := mux.Vars(r)
556         s := pathParams["testId"]
557
558         // This can be used to delete single subscription from db
559         if contains := strings.Contains(s, "deletesubid="); contains == true {
560                 var splits = strings.Split(s, "=")
561                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
562                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
563                         c.RemoveSubscriptionFromSdl(uint32(subId))
564                         return
565                 }
566         }
567
568         // This can be used to remove all subscriptions db from
569         if s == "emptydb" {
570                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
571                 c.RemoveAllSubscriptionsFromSdl()
572                 return
573         }
574
575         // This is meant to cause submgr's restart in testing
576         if s == "restart" {
577                 xapp.Logger.Info("os.Exit(1) called")
578                 os.Exit(1)
579         }
580
581         xapp.Logger.Info("Unsupported rest command received %s", s)
582 }
583
584 //-------------------------------------------------------------------
585 //
586 //-------------------------------------------------------------------
587
588 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
589         params := &xapp.RMRParams{}
590         params.Mtype = trans.GetMtype()
591         params.SubId = int(subs.GetReqId().InstanceId)
592         params.Xid = ""
593         params.Meid = subs.GetMeid()
594         params.Src = ""
595         params.PayloadLen = len(trans.Payload.Buf)
596         params.Payload = trans.Payload.Buf
597         params.Mbuf = nil
598         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
599         err = c.SendWithRetry(params, false, 5)
600         if err != nil {
601                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
602         }
603         return err
604 }
605
606 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
607
608         params := &xapp.RMRParams{}
609         params.Mtype = trans.GetMtype()
610         params.SubId = int(subs.GetReqId().InstanceId)
611         params.Xid = trans.GetXid()
612         params.Meid = trans.GetMeid()
613         params.Src = ""
614         params.PayloadLen = len(trans.Payload.Buf)
615         params.Payload = trans.Payload.Buf
616         params.Mbuf = nil
617         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
618         err = c.SendWithRetry(params, false, 5)
619         if err != nil {
620                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
621         }
622         return err
623 }
624
625 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
626         if c.RMRClient == nil {
627                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
628                 xapp.Logger.Error("%s", err.Error())
629                 return
630         }
631         c.CntRecvMsg++
632
633         defer c.RMRClient.Free(msg.Mbuf)
634
635         // xapp-frame might use direct access to c buffer and
636         // when msg.Mbuf is freed, someone might take it into use
637         // and payload data might be invalid inside message handle function
638         //
639         // subscriptions won't load system a lot so there is no
640         // real performance hit by cloning buffer into new go byte slice
641         cPay := append(msg.Payload[:0:0], msg.Payload...)
642         msg.Payload = cPay
643         msg.PayloadLen = len(cPay)
644
645         switch msg.Mtype {
646         case xapp.RIC_SUB_REQ:
647                 go c.handleXAPPSubscriptionRequest(msg)
648         case xapp.RIC_SUB_RESP:
649                 go c.handleE2TSubscriptionResponse(msg)
650         case xapp.RIC_SUB_FAILURE:
651                 go c.handleE2TSubscriptionFailure(msg)
652         case xapp.RIC_SUB_DEL_REQ:
653                 go c.handleXAPPSubscriptionDeleteRequest(msg)
654         case xapp.RIC_SUB_DEL_RESP:
655                 go c.handleE2TSubscriptionDeleteResponse(msg)
656         case xapp.RIC_SUB_DEL_FAILURE:
657                 go c.handleE2TSubscriptionDeleteFailure(msg)
658         default:
659                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
660         }
661         return
662 }
663
664 //-------------------------------------------------------------------
665 // handle from XAPP Subscription Request
666 //------------------------------------------------------------------
667 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
668         xapp.Logger.Info("MSG from XAPP: %s", params.String())
669         c.UpdateCounter(cSubReqFromXapp)
670
671         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
672         if err != nil {
673                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
674                 return
675         }
676
677         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
678         if trans == nil {
679                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
680                 return
681         }
682         defer trans.Release()
683
684         if err = c.tracker.Track(trans); err != nil {
685                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
686                 return
687         }
688
689         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
690         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
691         if err != nil {
692                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
693                 return
694         }
695
696         c.wakeSubscriptionRequest(subs, trans)
697 }
698
699 //-------------------------------------------------------------------
700 // Wake Subscription Request to E2node
701 //------------------------------------------------------------------
702 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
703
704         go c.handleSubscriptionCreate(subs, trans)
705         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
706         var err error
707         if event != nil {
708                 switch themsg := event.(type) {
709                 case *e2ap.E2APSubscriptionResponse:
710                         themsg.RequestId.Id = trans.RequestId.Id
711                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
712                         if err == nil {
713                                 trans.Release()
714                                 c.UpdateCounter(cSubRespToXapp)
715                                 c.rmrSendToXapp("", subs, trans)
716                                 return
717                         }
718                 case *e2ap.E2APSubscriptionFailure:
719                         themsg.RequestId.Id = trans.RequestId.Id
720                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
721                         if err == nil {
722                                 c.UpdateCounter(cSubFailToXapp)
723                                 c.rmrSendToXapp("", subs, trans)
724                         }
725                 default:
726                         break
727                 }
728         }
729         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
730         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
731 }
732
733 //-------------------------------------------------------------------
734 // handle from XAPP Subscription Delete Request
735 //------------------------------------------------------------------
736 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
737         xapp.Logger.Info("MSG from XAPP: %s", params.String())
738         c.UpdateCounter(cSubDelReqFromXapp)
739
740         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
741         if err != nil {
742                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
743                 return
744         }
745
746         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
747         if trans == nil {
748                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
749                 return
750         }
751         defer trans.Release()
752
753         err = c.tracker.Track(trans)
754         if err != nil {
755                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
756                 return
757         }
758
759         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
760         if err != nil {
761                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
762                 return
763         }
764
765         //
766         // Wake subs delete
767         //
768         go c.handleSubscriptionDelete(subs, trans)
769         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
770
771         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
772
773         if subs.NoRespToXapp == true {
774                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
775                 return
776         }
777
778         // Whatever is received success, fail or timeout, send successful delete response
779         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
780         subDelRespMsg.RequestId.Id = trans.RequestId.Id
781         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
782         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
783         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
784         if err == nil {
785                 c.UpdateCounter(cSubDelRespToXapp)
786                 c.rmrSendToXapp("", subs, trans)
787         }
788
789         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
790         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
791 }
792
793 //-------------------------------------------------------------------
794 // SUBS CREATE Handling
795 //-------------------------------------------------------------------
796 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
797
798         var removeSubscriptionFromDb bool = false
799         trans := c.tracker.NewSubsTransaction(subs)
800         subs.WaitTransactionTurn(trans)
801         defer subs.ReleaseTransactionTurn(trans)
802         defer trans.Release()
803
804         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
805
806         subRfMsg, valid := subs.GetCachedResponse()
807         if subRfMsg == nil && valid == true {
808                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
809                 switch event.(type) {
810                 case *e2ap.E2APSubscriptionResponse:
811                         subRfMsg, valid = subs.SetCachedResponse(event, true)
812                         subs.SubRespRcvd = true
813                 case *e2ap.E2APSubscriptionFailure:
814                         removeSubscriptionFromDb = true
815                         subRfMsg, valid = subs.SetCachedResponse(event, false)
816                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
817                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
818                 case *SubmgrRestartTestEvent:
819                         // This simulates that no response has been received and after restart subscriptions are restored from db
820                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
821                         return
822                 default:
823                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
824                         removeSubscriptionFromDb = true
825                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
826                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
827                 }
828                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
829         } else {
830                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
831         }
832
833         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
834         if valid == false {
835                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
836         }
837
838         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
839         parentTrans.SendEvent(subRfMsg, 0)
840 }
841
842 //-------------------------------------------------------------------
843 // SUBS DELETE Handling
844 //-------------------------------------------------------------------
845
846 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
847
848         trans := c.tracker.NewSubsTransaction(subs)
849         subs.WaitTransactionTurn(trans)
850         defer subs.ReleaseTransactionTurn(trans)
851         defer trans.Release()
852
853         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
854
855         subs.mutex.Lock()
856
857         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
858                 subs.valid = false
859                 subs.mutex.Unlock()
860                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
861         } else {
862                 subs.mutex.Unlock()
863         }
864         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
865         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
866         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
867         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
868         c.registry.UpdateSubscriptionToDb(subs, c)
869         parentTrans.SendEvent(nil, 0)
870 }
871
872 //-------------------------------------------------------------------
873 // send to E2T Subscription Request
874 //-------------------------------------------------------------------
875 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
876         var err error
877         var event interface{} = nil
878         var timedOut bool = false
879         const ricRequestorId = 123
880
881         subReqMsg := subs.SubReqMsg
882         subReqMsg.RequestId = subs.GetReqId().RequestId
883         subReqMsg.RequestId.Id = ricRequestorId
884         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
885         if err != nil {
886                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
887                 return event
888         }
889
890         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
891         c.WriteSubscriptionToDb(subs)
892
893         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
894                 desc := fmt.Sprintf("(retry %d)", retries)
895                 if retries == 0 {
896                         c.UpdateCounter(cSubReqToE2)
897                 } else {
898                         c.UpdateCounter(cSubReReqToE2)
899                 }
900                 c.rmrSendToE2T(desc, subs, trans)
901                 if subs.DoNotWaitSubResp == false {
902                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
903                         if timedOut {
904                                 c.UpdateCounter(cSubReqTimerExpiry)
905                                 continue
906                         }
907                 } else {
908                         // Simulating case where subscrition request has been sent but response has not been received before restart
909                         event = &SubmgrRestartTestEvent{}
910                 }
911                 break
912         }
913         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
914         return event
915 }
916
917 //-------------------------------------------------------------------
918 // send to E2T Subscription Delete Request
919 //-------------------------------------------------------------------
920
921 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
922         var err error
923         var event interface{}
924         var timedOut bool
925         const ricRequestorId = 123
926
927         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
928         subDelReqMsg.RequestId = subs.GetReqId().RequestId
929         subDelReqMsg.RequestId.Id = ricRequestorId
930         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
931         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
932         if err != nil {
933                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
934                 return event
935         }
936
937         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
938                 desc := fmt.Sprintf("(retry %d)", retries)
939                 if retries == 0 {
940                         c.UpdateCounter(cSubDelReqToE2)
941                 } else {
942                         c.UpdateCounter(cSubDelReReqToE2)
943                 }
944                 c.rmrSendToE2T(desc, subs, trans)
945                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
946                 if timedOut {
947                         c.UpdateCounter(cSubDelReqTimerExpiry)
948                         continue
949                 }
950                 break
951         }
952         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
953         return event
954 }
955
956 //-------------------------------------------------------------------
957 // handle from E2T Subscription Response
958 //-------------------------------------------------------------------
959 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
960         xapp.Logger.Info("MSG from E2T: %s", params.String())
961         c.UpdateCounter(cSubRespFromE2)
962
963         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
964         if err != nil {
965                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
966                 return
967         }
968         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
969         if err != nil {
970                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
971                 return
972         }
973         trans := subs.GetTransaction()
974         if trans == nil {
975                 err = fmt.Errorf("Ongoing transaction not found")
976                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
977                 return
978         }
979         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
980         if sendOk == false {
981                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
982                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
983         }
984         return
985 }
986
987 //-------------------------------------------------------------------
988 // handle from E2T Subscription Failure
989 //-------------------------------------------------------------------
990 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
991         xapp.Logger.Info("MSG from E2T: %s", params.String())
992         c.UpdateCounter(cSubFailFromE2)
993         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
994         if err != nil {
995                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
996                 return
997         }
998         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
999         if err != nil {
1000                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1001                 return
1002         }
1003         trans := subs.GetTransaction()
1004         if trans == nil {
1005                 err = fmt.Errorf("Ongoing transaction not found")
1006                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1007                 return
1008         }
1009         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1010         if sendOk == false {
1011                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1012                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1013         }
1014         return
1015 }
1016
1017 //-------------------------------------------------------------------
1018 // handle from E2T Subscription Delete Response
1019 //-------------------------------------------------------------------
1020 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1021         xapp.Logger.Info("MSG from E2T: %s", params.String())
1022         c.UpdateCounter(cSubDelRespFromE2)
1023         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1024         if err != nil {
1025                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1026                 return
1027         }
1028         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1029         if err != nil {
1030                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1031                 return
1032         }
1033         trans := subs.GetTransaction()
1034         if trans == nil {
1035                 err = fmt.Errorf("Ongoing transaction not found")
1036                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1037                 return
1038         }
1039         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1040         if sendOk == false {
1041                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1042                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1043         }
1044         return
1045 }
1046
1047 //-------------------------------------------------------------------
1048 // handle from E2T Subscription Delete Failure
1049 //-------------------------------------------------------------------
1050 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1051         xapp.Logger.Info("MSG from E2T: %s", params.String())
1052         c.UpdateCounter(cSubDelFailFromE2)
1053         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1054         if err != nil {
1055                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1056                 return
1057         }
1058         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1059         if err != nil {
1060                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1061                 return
1062         }
1063         trans := subs.GetTransaction()
1064         if trans == nil {
1065                 err = fmt.Errorf("Ongoing transaction not found")
1066                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1067                 return
1068         }
1069         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1070         if sendOk == false {
1071                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1072                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1073         }
1074         return
1075 }
1076
1077 //-------------------------------------------------------------------
1078 //
1079 //-------------------------------------------------------------------
1080 func typeofSubsMessage(v interface{}) string {
1081         if v == nil {
1082                 return "NIL"
1083         }
1084         switch v.(type) {
1085         //case *e2ap.E2APSubscriptionRequest:
1086         //      return "SubReq"
1087         case *e2ap.E2APSubscriptionResponse:
1088                 return "SubResp"
1089         case *e2ap.E2APSubscriptionFailure:
1090                 return "SubFail"
1091         //case *e2ap.E2APSubscriptionDeleteRequest:
1092         //      return "SubDelReq"
1093         case *e2ap.E2APSubscriptionDeleteResponse:
1094                 return "SubDelResp"
1095         case *e2ap.E2APSubscriptionDeleteFailure:
1096                 return "SubDelFail"
1097         default:
1098                 return "Unknown"
1099         }
1100 }
1101
1102 //-------------------------------------------------------------------
1103 //
1104 //-------------------------------------------------------------------
1105 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1106         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1107         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1108         if err != nil {
1109                 xapp.Logger.Error("%v", err)
1110         }
1111 }
1112
1113 //-------------------------------------------------------------------
1114 //
1115 //-------------------------------------------------------------------
1116 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1117
1118         if removeSubscriptionFromDb == true {
1119                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1120                 c.RemoveSubscriptionFromDb(subs)
1121         } else {
1122                 // Update is needed for successful response and merge case here
1123                 if subs.RetryFromXapp == false {
1124                         c.WriteSubscriptionToDb(subs)
1125                 }
1126         }
1127         subs.RetryFromXapp = false
1128 }
1129
1130 //-------------------------------------------------------------------
1131 //
1132 //-------------------------------------------------------------------
1133 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1134         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1135         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1136         if err != nil {
1137                 xapp.Logger.Error("%v", err)
1138         }
1139 }
1140
1141 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1142
1143         const ricRequestorId = 123
1144         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1145
1146         // Send delete for every endpoint in the subscription
1147         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1148         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1149         subDelReqMsg.RequestId.Id = ricRequestorId
1150         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1151         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1152         if err != nil {
1153                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1154                 return
1155         }
1156         for _, endPoint := range subs.EpList.Endpoints {
1157                 params := &xapp.RMRParams{}
1158                 params.Mtype = mType
1159                 params.SubId = int(subs.GetReqId().InstanceId)
1160                 params.Xid = ""
1161                 params.Meid = subs.Meid
1162                 params.Src = endPoint.String()
1163                 params.PayloadLen = len(payload.Buf)
1164                 params.Payload = payload.Buf
1165                 params.Mbuf = nil
1166                 subs.DeleteFromDb = true
1167                 c.handleXAPPSubscriptionDeleteRequest(params)
1168         }
1169 }
1170
1171 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1172
1173         fmt.Println("CRESTSubscriptionRequest")
1174
1175         if p.SubscriptionID != "" {
1176                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1177         } else {
1178                 fmt.Println("  SubscriptionID = ''")
1179         }
1180
1181         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1182
1183         if p.ClientEndpoint.HTTPPort != nil {
1184                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1185         } else {
1186                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1187         }
1188
1189         if p.ClientEndpoint.RMRPort != nil {
1190                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1191         } else {
1192                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1193         }
1194
1195         if p.Meid != nil {
1196                 fmt.Printf("  Meid = %s\n", *p.Meid)
1197         } else {
1198                 fmt.Println("  Meid = nil")
1199         }
1200
1201         for _, subscriptionDetail := range p.SubscriptionDetails {
1202                 if p.RANFunctionID != nil {
1203                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1204                 } else {
1205                         fmt.Println("  RANFunctionID = nil")
1206                 }
1207                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1208                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1209
1210                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1211                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1212                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1213                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1214
1215                         if actionToBeSetup.SubsequentAction != nil {
1216                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1217                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1218                         } else {
1219                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1220                         }
1221                 }
1222         }
1223 }