70570cd022ea3eb0b6d5c13ba695fd956eb48bad
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70
71 type Control struct {
72         *xapp.RMRClient
73         e2ap          *E2ap
74         registry      *Registry
75         tracker       *Tracker
76         db            Sdlnterface
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80         LoggerLevel   uint32
81 }
82
83 type RMRMeid struct {
84         PlmnID  string
85         EnbID   string
86         RanName string
87 }
88
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
91
92 func init() {
93         xapp.Logger.Info("SUBMGR")
94         viper.AutomaticEnv()
95         viper.SetEnvPrefix("submgr")
96         viper.AllowEmptyEnv(true)
97 }
98
99 func NewControl() *Control {
100
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         c := &Control{e2ap: new(E2ap),
112                 registry:    registry,
113                 tracker:     tracker,
114                 db:          CreateSdl(),
115                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
116                 LoggerLevel: 3,
117         }
118         c.ReadConfigParameters("")
119
120         // Register REST handler for testing support
121         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125
126         if readSubsFromDb == "false" {
127                 return c
128         }
129
130         // Read subscriptions from db
131         xapp.Logger.Info("Reading subscriptions from db")
132         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133         if err != nil {
134                 xapp.Logger.Error("%v", err)
135         } else {
136                 c.registry.subIds = subIds
137                 c.registry.register = register
138                 c.HandleUncompletedSubscriptions(register)
139         }
140         return c
141 }
142
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144         subscriptions, _ := c.registry.QueryHandler()
145         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 }
147
148 //-------------------------------------------------------------------
149 //
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
152
153         // viper.GetDuration returns nanoseconds
154         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155         if e2tSubReqTimeout == 0 {
156                 e2tSubReqTimeout = 2000 * 1000000
157         }
158         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160         if e2tSubDelReqTime == 0 {
161                 e2tSubDelReqTime = 2000 * 1000000
162         }
163         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165         if e2tRecvMsgTimeout == 0 {
166                 e2tRecvMsgTimeout = 2000 * 1000000
167         }
168         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
169
170         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171         // value 100ms used currently only in unittests.
172         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173         if waitRouteCleanup_ms == 0 {
174                 waitRouteCleanup_ms = 5000 * 1000000
175         }
176         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
177
178         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179         if e2tMaxSubReqTryCount == 0 {
180                 e2tMaxSubReqTryCount = 1
181         }
182         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
183
184         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185         if e2tMaxSubDelReqTryCount == 0 {
186                 e2tMaxSubDelReqTryCount = 1
187         }
188         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
189
190         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191         if readSubsFromDb == "" {
192                 readSubsFromDb = "true"
193         }
194         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195         c.LoggerLevel = viper.GetUint32("logger.level")
196         if c.LoggerLevel == 0 {
197                 c.LoggerLevel = 3
198         }
199 }
200
201 //-------------------------------------------------------------------
202 //
203 //-------------------------------------------------------------------
204 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
205
206         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
207         for subId, subs := range register {
208                 if subs.SubRespRcvd == false {
209                         subs.NoRespToXapp = true
210                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
211                         c.SendSubscriptionDeleteReq(subs)
212                 }
213         }
214 }
215
216 func (c *Control) ReadyCB(data interface{}) {
217         if c.RMRClient == nil {
218                 c.RMRClient = xapp.Rmr
219         }
220 }
221
222 func (c *Control) Run() {
223         xapp.SetReadyCB(c.ReadyCB, nil)
224         xapp.AddConfigChangeListener(c.ReadConfigParameters)
225         xapp.Run(c)
226 }
227
228 //-------------------------------------------------------------------
229 //
230 //-------------------------------------------------------------------
231 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
232
233         c.CntRecvMsg++
234         c.UpdateCounter(cRestSubReqFromXapp)
235
236         subResp := models.SubscriptionResponse{}
237         p := params.(*models.SubscriptionParams)
238
239         if c.LoggerLevel > 2 {
240                 c.PrintRESTSubscriptionRequest(p)
241         }
242
243         if p.ClientEndpoint == nil {
244                 xapp.Logger.Error("ClientEndpoint == nil")
245                 c.UpdateCounter(cRestSubFailToXapp)
246                 return nil, fmt.Errorf("")
247         }
248
249         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
250         if err != nil {
251                 xapp.Logger.Error("%s", err.Error())
252                 c.UpdateCounter(cRestSubFailToXapp)
253                 return nil, err
254         }
255         var restSubId string
256         var restSubscription *RESTSubscription
257         if p.SubscriptionID == "" {
258                 restSubId = ksuid.New().String()
259                 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
260                 if err != nil {
261                         xapp.Logger.Error("%s", err.Error())
262                         c.UpdateCounter(cRestSubFailToXapp)
263                         return nil, err
264                 }
265
266         } else {
267                 restSubId = p.SubscriptionID
268                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
269                 if err != nil {
270                         xapp.Logger.Error("%s", err.Error())
271                         c.UpdateCounter(cRestSubFailToXapp)
272                         return nil, err
273                 }
274         }
275
276         subResp.SubscriptionID = &restSubId
277         subReqList := e2ap.SubscriptionRequestList{}
278         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
279         if err != nil {
280                 xapp.Logger.Error("%s", err.Error())
281                 c.registry.DeleteRESTSubscription(&restSubId)
282                 c.UpdateCounter(cRestSubFailToXapp)
283                 return nil, err
284         }
285
286         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
287
288         c.UpdateCounter(cRestSubRespToXapp)
289         return &subResp, nil
290 }
291
292 //-------------------------------------------------------------------
293 //
294 //-------------------------------------------------------------------
295
296 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
297         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
298
299         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
300
301         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
302         if err != nil {
303                 c.registry.DeleteRESTSubscription(restSubId)
304                 xapp.Logger.Error("XAPP-SubReq transaction not created, endpoint createtion failed for RESTSubId=%s, Meid=%s", *restSubId, *meid)
305                 return
306         }
307
308         var xAppEventInstanceID int64
309         var e2EventInstanceID int64
310         var errorCause string
311         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
312                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
313
314                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
315                 if trans == nil {
316                         c.registry.DeleteRESTSubscription(restSubId)
317                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
318                         return
319                 }
320
321                 defer trans.Release()
322                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
323                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
324                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
325                 if err != nil {
326                         // Send notification to xApp that prosessing of a Subscription Request has failed.
327                         e2EventInstanceID = (int64)(0)
328                         errorCause = err.Error()
329                         resp := &models.SubscriptionResponse{
330                                 SubscriptionID: restSubId,
331                                 SubscriptionInstances: []*models.SubscriptionInstance{
332                                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
333                                                 ErrorCause:          &errorCause,
334                                                 XappEventInstanceID: &xAppEventInstanceID},
335                                 },
336                         }
337                         // Mark REST subscription request processed.
338                         restSubscription.SetProcessed()
339                         xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
340                                 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
341                         xapp.Subscription.Notify(resp, *clientEndpoint)
342                         c.UpdateCounter(cRestSubFailNotifToXapp)
343                 } else {
344                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
345
346                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
347                                 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
348
349                         // Store successfully processed InstanceId for deletion
350                         restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
351                         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
352
353                         // Send notification to xApp that a Subscription Request has been processed.
354                         resp := &models.SubscriptionResponse{
355                                 SubscriptionID: restSubId,
356                                 SubscriptionInstances: []*models.SubscriptionInstance{
357                                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
358                                                 ErrorCause:          nil,
359                                                 XappEventInstanceID: &xAppEventInstanceID},
360                                 },
361                         }
362                         // Mark REST subscription request processesd.
363                         restSubscription.SetProcessed()
364                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
365                                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
366                         xapp.Subscription.Notify(resp, *clientEndpoint)
367                         c.UpdateCounter(cRestSubNotifToXapp)
368
369                 }
370         }
371 }
372
373 //-------------------------------------------------------------------
374 //
375 //------------------------------------------------------------------
376 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
377         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
378
379         err := c.tracker.Track(trans)
380         if err != nil {
381                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
382                 err = fmt.Errorf("Tracking failure")
383                 return nil, err
384         }
385
386         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
387         if err != nil {
388                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
389                 return nil, err
390         }
391
392         //
393         // Wake subs request
394         //
395         go c.handleSubscriptionCreate(subs, trans)
396         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
397
398         err = nil
399         if event != nil {
400                 switch themsg := event.(type) {
401                 case *e2ap.E2APSubscriptionResponse:
402                         trans.Release()
403                         return themsg, nil
404                 case *e2ap.E2APSubscriptionFailure:
405                         err = fmt.Errorf("E2 SubscriptionFailure received")
406                         return nil, err
407                 default:
408                         err = fmt.Errorf("unexpected E2 subscription response received")
409                         break
410                 }
411         } else {
412                 err = fmt.Errorf("E2 subscription response timeout")
413         }
414
415         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
416         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
417         return nil, err
418 }
419
420 //-------------------------------------------------------------------
421 //
422 //-------------------------------------------------------------------
423 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
424
425         c.CntRecvMsg++
426         c.UpdateCounter(cRestSubDelReqFromXapp)
427
428         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
429
430         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
431         if err != nil {
432                 xapp.Logger.Error("%s", err.Error())
433                 if restSubscription == nil {
434                         // Subscription was not found
435                         return nil
436                 } else {
437                         if restSubscription.SubReqOngoing == true {
438                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
439                                 xapp.Logger.Error("%s", err.Error())
440                                 return err
441                         } else if restSubscription.SubDelReqOngoing == true {
442                                 // Previous request for same restSubId still ongoing
443                                 return nil
444                         }
445                 }
446         }
447
448         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
449         go func() {
450                 for _, instanceId := range restSubscription.InstanceIds {
451                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
452
453                         if err != nil {
454                                 xapp.Logger.Error("%s", err.Error())
455                                 //return err
456                         }
457                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
458                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
459                         restSubscription.DeleteE2InstanceId(instanceId)
460                 }
461                 c.registry.DeleteRESTSubscription(&restSubId)
462         }()
463
464         c.UpdateCounter(cRestSubDelRespToXapp)
465
466         return nil
467 }
468
469 //-------------------------------------------------------------------
470 //
471 //-------------------------------------------------------------------
472 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
473
474         var xAppEventInstanceID int64
475         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
476         if err != nil {
477                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
478                         restSubId, instanceId, idstring(err, nil))
479                 return xAppEventInstanceID, nil
480         }
481
482         xAppEventInstanceID = int64(subs.ReqId.Id)
483         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
484         if trans == nil {
485                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
486                 xapp.Logger.Error("%s", err.Error())
487         }
488         defer trans.Release()
489
490         err = c.tracker.Track(trans)
491         if err != nil {
492                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
493                 xapp.Logger.Error("%s", err.Error())
494                 return xAppEventInstanceID, &time.ParseError{}
495         }
496         //
497         // Wake subs delete
498         //
499         go c.handleSubscriptionDelete(subs, trans)
500         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
501
502         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
503
504         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
505
506         return xAppEventInstanceID, nil
507 }
508
509 //-------------------------------------------------------------------
510 //
511 //-------------------------------------------------------------------
512 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
513         xapp.Logger.Info("QueryHandler() called")
514
515         c.CntRecvMsg++
516
517         return c.registry.QueryHandler()
518 }
519
520 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
521         xapp.Logger.Info("TestRestHandler() called")
522
523         pathParams := mux.Vars(r)
524         s := pathParams["testId"]
525
526         // This can be used to delete single subscription from db
527         if contains := strings.Contains(s, "deletesubid="); contains == true {
528                 var splits = strings.Split(s, "=")
529                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
530                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
531                         c.RemoveSubscriptionFromSdl(uint32(subId))
532                         return
533                 }
534         }
535
536         // This can be used to remove all subscriptions db from
537         if s == "emptydb" {
538                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
539                 c.RemoveAllSubscriptionsFromSdl()
540                 return
541         }
542
543         // This is meant to cause submgr's restart in testing
544         if s == "restart" {
545                 xapp.Logger.Info("os.Exit(1) called")
546                 os.Exit(1)
547         }
548
549         xapp.Logger.Info("Unsupported rest command received %s", s)
550 }
551
552 //-------------------------------------------------------------------
553 //
554 //-------------------------------------------------------------------
555
556 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
557         params := &xapp.RMRParams{}
558         params.Mtype = trans.GetMtype()
559         params.SubId = int(subs.GetReqId().InstanceId)
560         params.Xid = ""
561         params.Meid = subs.GetMeid()
562         params.Src = ""
563         params.PayloadLen = len(trans.Payload.Buf)
564         params.Payload = trans.Payload.Buf
565         params.Mbuf = nil
566         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
567         err = c.SendWithRetry(params, false, 5)
568         if err != nil {
569                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
570         }
571         return err
572 }
573
574 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
575
576         params := &xapp.RMRParams{}
577         params.Mtype = trans.GetMtype()
578         params.SubId = int(subs.GetReqId().InstanceId)
579         params.Xid = trans.GetXid()
580         params.Meid = trans.GetMeid()
581         params.Src = ""
582         params.PayloadLen = len(trans.Payload.Buf)
583         params.Payload = trans.Payload.Buf
584         params.Mbuf = nil
585         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
586         err = c.SendWithRetry(params, false, 5)
587         if err != nil {
588                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
589         }
590         return err
591 }
592
593 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
594         if c.RMRClient == nil {
595                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
596                 xapp.Logger.Error("%s", err.Error())
597                 return
598         }
599         c.CntRecvMsg++
600
601         defer c.RMRClient.Free(msg.Mbuf)
602
603         // xapp-frame might use direct access to c buffer and
604         // when msg.Mbuf is freed, someone might take it into use
605         // and payload data might be invalid inside message handle function
606         //
607         // subscriptions won't load system a lot so there is no
608         // real performance hit by cloning buffer into new go byte slice
609         cPay := append(msg.Payload[:0:0], msg.Payload...)
610         msg.Payload = cPay
611         msg.PayloadLen = len(cPay)
612
613         switch msg.Mtype {
614         case xapp.RIC_SUB_REQ:
615                 go c.handleXAPPSubscriptionRequest(msg)
616         case xapp.RIC_SUB_RESP:
617                 go c.handleE2TSubscriptionResponse(msg)
618         case xapp.RIC_SUB_FAILURE:
619                 go c.handleE2TSubscriptionFailure(msg)
620         case xapp.RIC_SUB_DEL_REQ:
621                 go c.handleXAPPSubscriptionDeleteRequest(msg)
622         case xapp.RIC_SUB_DEL_RESP:
623                 go c.handleE2TSubscriptionDeleteResponse(msg)
624         case xapp.RIC_SUB_DEL_FAILURE:
625                 go c.handleE2TSubscriptionDeleteFailure(msg)
626         default:
627                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
628         }
629         return
630 }
631
632 //-------------------------------------------------------------------
633 // handle from XAPP Subscription Request
634 //------------------------------------------------------------------
635 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
636         xapp.Logger.Info("MSG from XAPP: %s", params.String())
637         c.UpdateCounter(cSubReqFromXapp)
638
639         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
640         if err != nil {
641                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
642                 return
643         }
644
645         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
646         if trans == nil {
647                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
648                 return
649         }
650         defer trans.Release()
651
652         if err = c.tracker.Track(trans); err != nil {
653                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
654                 return
655         }
656
657         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
658         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
659         if err != nil {
660                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
661                 return
662         }
663
664         c.wakeSubscriptionRequest(subs, trans)
665 }
666
667 //-------------------------------------------------------------------
668 // Wake Subscription Request to E2node
669 //------------------------------------------------------------------
670 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
671
672         go c.handleSubscriptionCreate(subs, trans)
673         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
674         var err error
675         if event != nil {
676                 switch themsg := event.(type) {
677                 case *e2ap.E2APSubscriptionResponse:
678                         themsg.RequestId.Id = trans.RequestId.Id
679                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
680                         if err == nil {
681                                 trans.Release()
682                                 c.UpdateCounter(cSubRespToXapp)
683                                 c.rmrSendToXapp("", subs, trans)
684                                 return
685                         }
686                 case *e2ap.E2APSubscriptionFailure:
687                         themsg.RequestId.Id = trans.RequestId.Id
688                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
689                         if err == nil {
690                                 c.UpdateCounter(cSubFailToXapp)
691                                 c.rmrSendToXapp("", subs, trans)
692                         }
693                 default:
694                         break
695                 }
696         }
697         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
698         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
699 }
700
701 //-------------------------------------------------------------------
702 // handle from XAPP Subscription Delete Request
703 //------------------------------------------------------------------
704 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
705         xapp.Logger.Info("MSG from XAPP: %s", params.String())
706         c.UpdateCounter(cSubDelReqFromXapp)
707
708         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
709         if err != nil {
710                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
711                 return
712         }
713
714         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
715         if trans == nil {
716                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
717                 return
718         }
719         defer trans.Release()
720
721         err = c.tracker.Track(trans)
722         if err != nil {
723                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
724                 return
725         }
726
727         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
728         if err != nil {
729                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
730                 return
731         }
732
733         //
734         // Wake subs delete
735         //
736         go c.handleSubscriptionDelete(subs, trans)
737         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
738
739         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
740
741         if subs.NoRespToXapp == true {
742                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
743                 return
744         }
745
746         // Whatever is received success, fail or timeout, send successful delete response
747         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
748         subDelRespMsg.RequestId.Id = trans.RequestId.Id
749         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
750         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
751         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
752         if err == nil {
753                 c.UpdateCounter(cSubDelRespToXapp)
754                 c.rmrSendToXapp("", subs, trans)
755         }
756
757         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
758         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
759 }
760
761 //-------------------------------------------------------------------
762 // SUBS CREATE Handling
763 //-------------------------------------------------------------------
764 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
765
766         var removeSubscriptionFromDb bool = false
767         trans := c.tracker.NewSubsTransaction(subs)
768         subs.WaitTransactionTurn(trans)
769         defer subs.ReleaseTransactionTurn(trans)
770         defer trans.Release()
771
772         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
773
774         subRfMsg, valid := subs.GetCachedResponse()
775         if subRfMsg == nil && valid == true {
776                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
777                 switch event.(type) {
778                 case *e2ap.E2APSubscriptionResponse:
779                         subRfMsg, valid = subs.SetCachedResponse(event, true)
780                         subs.SubRespRcvd = true
781                 case *e2ap.E2APSubscriptionFailure:
782                         removeSubscriptionFromDb = true
783                         subRfMsg, valid = subs.SetCachedResponse(event, false)
784                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
785                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
786                 case *SubmgrRestartTestEvent:
787                         // This simulates that no response has been received and after restart subscriptions are restored from db
788                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
789                         return
790                 default:
791                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
792                         removeSubscriptionFromDb = true
793                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
794                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
795                 }
796                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
797         } else {
798                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
799         }
800
801         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
802         if valid == false {
803                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
804         }
805
806         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
807         parentTrans.SendEvent(subRfMsg, 0)
808 }
809
810 //-------------------------------------------------------------------
811 // SUBS DELETE Handling
812 //-------------------------------------------------------------------
813
814 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
815
816         trans := c.tracker.NewSubsTransaction(subs)
817         subs.WaitTransactionTurn(trans)
818         defer subs.ReleaseTransactionTurn(trans)
819         defer trans.Release()
820
821         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
822
823         subs.mutex.Lock()
824
825         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
826                 subs.valid = false
827                 subs.mutex.Unlock()
828                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
829         } else {
830                 subs.mutex.Unlock()
831         }
832         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
833         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
834         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
835         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
836         c.registry.UpdateSubscriptionToDb(subs, c)
837         parentTrans.SendEvent(nil, 0)
838 }
839
840 //-------------------------------------------------------------------
841 // send to E2T Subscription Request
842 //-------------------------------------------------------------------
843 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
844         var err error
845         var event interface{} = nil
846         var timedOut bool = false
847         const ricRequestorId = 123
848
849         subReqMsg := subs.SubReqMsg
850         subReqMsg.RequestId = subs.GetReqId().RequestId
851         subReqMsg.RequestId.Id = ricRequestorId
852         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
853         if err != nil {
854                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
855                 return event
856         }
857
858         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
859         c.WriteSubscriptionToDb(subs)
860
861         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
862                 desc := fmt.Sprintf("(retry %d)", retries)
863                 if retries == 0 {
864                         c.UpdateCounter(cSubReqToE2)
865                 } else {
866                         c.UpdateCounter(cSubReReqToE2)
867                 }
868                 c.rmrSendToE2T(desc, subs, trans)
869                 if subs.DoNotWaitSubResp == false {
870                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
871                         if timedOut {
872                                 c.UpdateCounter(cSubReqTimerExpiry)
873                                 continue
874                         }
875                 } else {
876                         // Simulating case where subscrition request has been sent but response has not been received before restart
877                         event = &SubmgrRestartTestEvent{}
878                 }
879                 break
880         }
881         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
882         return event
883 }
884
885 //-------------------------------------------------------------------
886 // send to E2T Subscription Delete Request
887 //-------------------------------------------------------------------
888
889 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
890         var err error
891         var event interface{}
892         var timedOut bool
893         const ricRequestorId = 123
894
895         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
896         subDelReqMsg.RequestId = subs.GetReqId().RequestId
897         subDelReqMsg.RequestId.Id = ricRequestorId
898         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
899         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
900         if err != nil {
901                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
902                 return event
903         }
904
905         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
906                 desc := fmt.Sprintf("(retry %d)", retries)
907                 if retries == 0 {
908                         c.UpdateCounter(cSubDelReqToE2)
909                 } else {
910                         c.UpdateCounter(cSubDelReReqToE2)
911                 }
912                 c.rmrSendToE2T(desc, subs, trans)
913                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
914                 if timedOut {
915                         c.UpdateCounter(cSubDelReqTimerExpiry)
916                         continue
917                 }
918                 break
919         }
920         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
921         return event
922 }
923
924 //-------------------------------------------------------------------
925 // handle from E2T Subscription Response
926 //-------------------------------------------------------------------
927 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
928         xapp.Logger.Info("MSG from E2T: %s", params.String())
929         c.UpdateCounter(cSubRespFromE2)
930
931         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
932         if err != nil {
933                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
934                 return
935         }
936         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
937         if err != nil {
938                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
939                 return
940         }
941         trans := subs.GetTransaction()
942         if trans == nil {
943                 err = fmt.Errorf("Ongoing transaction not found")
944                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
945                 return
946         }
947         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
948         if sendOk == false {
949                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
950                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
951         }
952         return
953 }
954
955 //-------------------------------------------------------------------
956 // handle from E2T Subscription Failure
957 //-------------------------------------------------------------------
958 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
959         xapp.Logger.Info("MSG from E2T: %s", params.String())
960         c.UpdateCounter(cSubFailFromE2)
961         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
962         if err != nil {
963                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
964                 return
965         }
966         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
967         if err != nil {
968                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
969                 return
970         }
971         trans := subs.GetTransaction()
972         if trans == nil {
973                 err = fmt.Errorf("Ongoing transaction not found")
974                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
975                 return
976         }
977         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
978         if sendOk == false {
979                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
980                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
981         }
982         return
983 }
984
985 //-------------------------------------------------------------------
986 // handle from E2T Subscription Delete Response
987 //-------------------------------------------------------------------
988 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
989         xapp.Logger.Info("MSG from E2T: %s", params.String())
990         c.UpdateCounter(cSubDelRespFromE2)
991         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
992         if err != nil {
993                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
994                 return
995         }
996         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
997         if err != nil {
998                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
999                 return
1000         }
1001         trans := subs.GetTransaction()
1002         if trans == nil {
1003                 err = fmt.Errorf("Ongoing transaction not found")
1004                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1005                 return
1006         }
1007         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1008         if sendOk == false {
1009                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1010                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1011         }
1012         return
1013 }
1014
1015 //-------------------------------------------------------------------
1016 // handle from E2T Subscription Delete Failure
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1019         xapp.Logger.Info("MSG from E2T: %s", params.String())
1020         c.UpdateCounter(cSubDelFailFromE2)
1021         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1022         if err != nil {
1023                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1024                 return
1025         }
1026         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1027         if err != nil {
1028                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1029                 return
1030         }
1031         trans := subs.GetTransaction()
1032         if trans == nil {
1033                 err = fmt.Errorf("Ongoing transaction not found")
1034                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1035                 return
1036         }
1037         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1038         if sendOk == false {
1039                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1040                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1041         }
1042         return
1043 }
1044
1045 //-------------------------------------------------------------------
1046 //
1047 //-------------------------------------------------------------------
1048 func typeofSubsMessage(v interface{}) string {
1049         if v == nil {
1050                 return "NIL"
1051         }
1052         switch v.(type) {
1053         //case *e2ap.E2APSubscriptionRequest:
1054         //      return "SubReq"
1055         case *e2ap.E2APSubscriptionResponse:
1056                 return "SubResp"
1057         case *e2ap.E2APSubscriptionFailure:
1058                 return "SubFail"
1059         //case *e2ap.E2APSubscriptionDeleteRequest:
1060         //      return "SubDelReq"
1061         case *e2ap.E2APSubscriptionDeleteResponse:
1062                 return "SubDelResp"
1063         case *e2ap.E2APSubscriptionDeleteFailure:
1064                 return "SubDelFail"
1065         default:
1066                 return "Unknown"
1067         }
1068 }
1069
1070 //-------------------------------------------------------------------
1071 //
1072 //-------------------------------------------------------------------
1073 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1074         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1075         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1076         if err != nil {
1077                 xapp.Logger.Error("%v", err)
1078         }
1079 }
1080
1081 //-------------------------------------------------------------------
1082 //
1083 //-------------------------------------------------------------------
1084 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1085
1086         if removeSubscriptionFromDb == true {
1087                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1088                 c.RemoveSubscriptionFromDb(subs)
1089         } else {
1090                 // Update is needed for successful response and merge case here
1091                 if subs.RetryFromXapp == false {
1092                         c.WriteSubscriptionToDb(subs)
1093                 }
1094         }
1095         subs.RetryFromXapp = false
1096 }
1097
1098 //-------------------------------------------------------------------
1099 //
1100 //-------------------------------------------------------------------
1101 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1102         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1103         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1104         if err != nil {
1105                 xapp.Logger.Error("%v", err)
1106         }
1107 }
1108
1109 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1110
1111         const ricRequestorId = 123
1112         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1113
1114         // Send delete for every endpoint in the subscription
1115         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1116         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1117         subDelReqMsg.RequestId.Id = ricRequestorId
1118         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1119         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1120         if err != nil {
1121                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1122                 return
1123         }
1124         for _, endPoint := range subs.EpList.Endpoints {
1125                 params := &xapp.RMRParams{}
1126                 params.Mtype = mType
1127                 params.SubId = int(subs.GetReqId().InstanceId)
1128                 params.Xid = ""
1129                 params.Meid = subs.Meid
1130                 params.Src = endPoint.String()
1131                 params.PayloadLen = len(payload.Buf)
1132                 params.Payload = payload.Buf
1133                 params.Mbuf = nil
1134                 subs.DeleteFromDb = true
1135                 c.handleXAPPSubscriptionDeleteRequest(params)
1136         }
1137 }
1138
1139 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1140
1141         fmt.Println("CRESTSubscriptionRequest")
1142         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1143
1144         if p.ClientEndpoint.HTTPPort != nil {
1145                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1146         } else {
1147                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1148         }
1149
1150         if p.ClientEndpoint.RMRPort != nil {
1151                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1152         } else {
1153                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1154         }
1155
1156         if p.Meid != nil {
1157                 fmt.Printf("  Meid = %s\n", *p.Meid)
1158         } else {
1159                 fmt.Println("  Meid = nil")
1160         }
1161
1162         for _, subscriptionDetail := range p.SubscriptionDetails {
1163                 if p.RANFunctionID != nil {
1164                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1165                 } else {
1166                         fmt.Println("  RANFunctionID = nil")
1167                 }
1168                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1169                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1170
1171                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1172                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1173                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1174                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1175
1176                         if actionToBeSetup.SubsequentAction != nil {
1177                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1178                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1179                         } else {
1180                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1181                         }
1182                 }
1183         }
1184 }