Fix for Submgr crash added
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var dbRetryForever string
75 var dbTryCount int
76
77 type Control struct {
78         *xapp.RMRClient
79         e2ap              *E2ap
80         registry          *Registry
81         tracker           *Tracker
82         restDuplicateCtrl *DuplicateCtrl
83         e2SubsDb          Sdlnterface
84         restSubsDb        Sdlnterface
85         CntRecvMsg        uint64
86         ResetTestFlag     bool
87         Counters          map[string]xapp.Counter
88         LoggerLevel       uint32
89         UTTesting         bool
90 }
91
92 type RMRMeid struct {
93         PlmnID  string
94         EnbID   string
95         RanName string
96 }
97
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
100
101 func init() {
102         xapp.Logger.Info("SUBMGR")
103         viper.AutomaticEnv()
104         viper.SetEnvPrefix("submgr")
105         viper.AllowEmptyEnv(true)
106 }
107
108 func NewControl() *Control {
109
110         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
111         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
112
113         registry := new(Registry)
114         registry.Initialize()
115         registry.rtmgrClient = &rtmgrClient
116
117         tracker := new(Tracker)
118         tracker.Init()
119
120         restDuplicateCtrl := new(DuplicateCtrl)
121         restDuplicateCtrl.Init()
122
123         c := &Control{e2ap: new(E2ap),
124                 registry:          registry,
125                 tracker:           tracker,
126                 restDuplicateCtrl: restDuplicateCtrl,
127                 e2SubsDb:          CreateSdl(),
128                 restSubsDb:        CreateRESTSdl(),
129                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
130                 LoggerLevel:       4,
131         }
132         c.ReadConfigParameters("")
133
134         // Register REST handler for testing support
135         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
136         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
137         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
138
139         if readSubsFromDb == "false" {
140                 return c
141         }
142
143         // Read subscriptions from db
144         c.ReadE2Subscriptions()
145         c.ReadRESTSubscriptions()
146
147         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
148
149         return c
150 }
151
152 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
153         subscriptions, _ := c.registry.QueryHandler()
154         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
155 }
156
157 //-------------------------------------------------------------------
158 //
159 //-------------------------------------------------------------------
160 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
161         xapp.Logger.Info("GetAllRestSubscriptions() called")
162         response := c.registry.GetAllRestSubscriptions()
163         w.Write(response)
164 }
165
166 //-------------------------------------------------------------------
167 //
168 //-------------------------------------------------------------------
169 func (c *Control) ReadE2Subscriptions() error {
170         var err error
171         var subIds []uint32
172         var register map[uint32]*Subscription
173         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
174                 xapp.Logger.Info("Reading E2 subscriptions from db")
175                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
176                 if err != nil {
177                         xapp.Logger.Error("%v", err)
178                         <-time.After(1 * time.Second)
179                 } else {
180                         c.registry.subIds = subIds
181                         c.registry.register = register
182                         c.HandleUncompletedSubscriptions(register)
183                         return nil
184                 }
185         }
186         xapp.Logger.Info("Continuing without retring")
187         return err
188 }
189
190 //-------------------------------------------------------------------
191 //
192 //-------------------------------------------------------------------
193 func (c *Control) ReadRESTSubscriptions() error {
194         var err error
195         var restSubscriptions map[string]*RESTSubscription
196         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
197                 xapp.Logger.Info("Reading REST subscriptions from db")
198                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
199                 if err != nil {
200                         xapp.Logger.Error("%v", err)
201                         <-time.After(1 * time.Second)
202                 } else {
203                         c.registry.restSubscriptions = restSubscriptions
204                         return nil
205                 }
206         }
207         xapp.Logger.Info("Continuing without retring")
208         return err
209 }
210
211 //-------------------------------------------------------------------
212 //
213 //-------------------------------------------------------------------
214 func (c *Control) ReadConfigParameters(f string) {
215
216         // viper.GetDuration returns nanoseconds
217         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
218         if e2tSubReqTimeout == 0 {
219                 e2tSubReqTimeout = 2000 * 1000000
220         }
221         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
222         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
223         if e2tSubDelReqTime == 0 {
224                 e2tSubDelReqTime = 2000 * 1000000
225         }
226         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
227         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
228         if e2tRecvMsgTimeout == 0 {
229                 e2tRecvMsgTimeout = 2000 * 1000000
230         }
231         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
232
233         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
234         if e2tMaxSubReqTryCount == 0 {
235                 e2tMaxSubReqTryCount = 1
236         }
237         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
238
239         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
240         if e2tMaxSubDelReqTryCount == 0 {
241                 e2tMaxSubDelReqTryCount = 1
242         }
243         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
244
245         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
246         if readSubsFromDb == "" {
247                 readSubsFromDb = "true"
248         }
249         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
250
251         dbTryCount = viper.GetInt("controls.dbTryCount")
252         if dbTryCount == 0 {
253                 dbTryCount = 200
254         }
255         xapp.Logger.Info("dbTryCount %v", dbTryCount)
256
257         dbRetryForever = viper.GetString("controls.dbRetryForever")
258         if dbRetryForever == "" {
259                 dbRetryForever = "true"
260         }
261         xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
262
263         c.LoggerLevel = viper.GetUint32("logger.level")
264         if c.LoggerLevel == 0 {
265                 c.LoggerLevel = 3
266         }
267         xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
268
269         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
270         // value 100ms used currently only in unittests.
271         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
272         if waitRouteCleanup_ms == 0 {
273                 waitRouteCleanup_ms = 5000 * 1000000
274         }
275         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
276 }
277
278 //-------------------------------------------------------------------
279 //
280 //-------------------------------------------------------------------
281 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
282
283         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
284         for subId, subs := range register {
285                 if subs.SubRespRcvd == false {
286                         subs.NoRespToXapp = true
287                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
288                         c.SendSubscriptionDeleteReq(subs)
289                 }
290         }
291 }
292
293 func (c *Control) ReadyCB(data interface{}) {
294         if c.RMRClient == nil {
295                 c.RMRClient = xapp.Rmr
296         }
297 }
298
299 func (c *Control) Run() {
300         xapp.SetReadyCB(c.ReadyCB, nil)
301         xapp.AddConfigChangeListener(c.ReadConfigParameters)
302         xapp.Run(c)
303 }
304
305 //-------------------------------------------------------------------
306 //
307 //-------------------------------------------------------------------
308 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
309
310         var restSubId string
311         var restSubscription *RESTSubscription
312         var err error
313
314         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
315         if p.SubscriptionID == "" {
316                 if exists {
317                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
318                         if restSubscription != nil {
319                                 restSubId = prevRestSubsId
320                                 if err == nil {
321                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
322                                 } else {
323                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
324                                 }
325                         } else {
326                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
327                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
328                         }
329                 }
330
331                 if restSubscription == nil {
332                         restSubId = ksuid.New().String()
333                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
334                         if err != nil {
335                                 xapp.Logger.Error("%s", err.Error())
336                                 c.UpdateCounter(cRestSubFailToXapp)
337                                 return nil, "", err
338                         }
339                 }
340         } else {
341                 restSubId = p.SubscriptionID
342
343                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
344
345                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
346                 if err != nil {
347                         xapp.Logger.Error("%s", err.Error())
348                         c.UpdateCounter(cRestSubFailToXapp)
349                         return nil, "", err
350                 }
351
352                 if !exists {
353                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
354                 } else {
355                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
356                 }
357         }
358
359         return restSubscription, restSubId, nil
360 }
361
362 //-------------------------------------------------------------------
363 //
364 //-------------------------------------------------------------------
365 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
366
367         c.CntRecvMsg++
368         c.UpdateCounter(cRestSubReqFromXapp)
369
370         subResp := models.SubscriptionResponse{}
371         p := params.(*models.SubscriptionParams)
372
373         if c.LoggerLevel > 2 {
374                 c.PrintRESTSubscriptionRequest(p)
375         }
376
377         if p.ClientEndpoint == nil {
378                 xapp.Logger.Error("ClientEndpoint == nil")
379                 c.UpdateCounter(cRestSubFailToXapp)
380                 return nil, fmt.Errorf("")
381         }
382
383         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
384         if err != nil {
385                 xapp.Logger.Error("%s", err.Error())
386                 c.UpdateCounter(cRestSubFailToXapp)
387                 return nil, err
388         }
389
390         md5sum, err := CalculateRequestMd5sum(params)
391         if err != nil {
392                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
393         }
394
395         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
396         if err != nil {
397                 xapp.Logger.Error("Failed to get/allocate REST subscription")
398                 return nil, err
399         }
400
401         subResp.SubscriptionID = &restSubId
402         subReqList := e2ap.SubscriptionRequestList{}
403         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
404         if err != nil {
405                 xapp.Logger.Error("%s", err.Error())
406                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
407                 c.registry.DeleteRESTSubscription(&restSubId)
408                 c.UpdateCounter(cRestSubFailToXapp)
409                 return nil, err
410         }
411
412         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
413         if duplicate {
414                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
415                 c.UpdateCounter(cRestSubRespToXapp)
416                 return &subResp, nil
417         }
418
419         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
420
421         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
422
423         c.UpdateCounter(cRestSubRespToXapp)
424         return &subResp, nil
425 }
426
427 //-------------------------------------------------------------------
428 //
429 //-------------------------------------------------------------------
430
431 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
432         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
433
434         c.SubscriptionProcessingStartDelay()
435         xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
436
437         var xAppEventInstanceID int64
438         var e2EventInstanceID int64
439
440         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
441
442         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
443                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
444                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
445
446                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
447                 if trans == nil {
448                         // Send notification to xApp that prosessing of a Subscription Request has failed.
449                         err := fmt.Errorf("Tracking failure")
450                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
451                         continue
452                 }
453
454                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
455
456                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
457
458                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
459                 trans.Release()
460
461                 if err != nil {
462                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
463                 } else {
464                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
465                         restSubscription.AddMd5Sum(md5sum)
466                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
467                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
468                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
469                 }
470         }
471 }
472
473 //-------------------------------------------------------------------
474 //
475 //------------------------------------------------------------------
476 func (c *Control) SubscriptionProcessingStartDelay() {
477         if c.UTTesting == true {
478                 // This is temporary fix for the UT problem that notification arrives before subscription response
479                 // Correct fix would be to allow notification come before response and process it correctly
480                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
481                 <-time.After(time.Millisecond * 50)
482                 xapp.Logger.Debug("Continuing after delay")
483         }
484 }
485
486 //-------------------------------------------------------------------
487 //
488 //------------------------------------------------------------------
489 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
490         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
491
492         err := c.tracker.Track(trans)
493         if err != nil {
494                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
495                 err = fmt.Errorf("Tracking failure")
496                 return nil, err
497         }
498
499         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
500         if err != nil {
501                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
502                 return nil, err
503         }
504
505         //
506         // Wake subs request
507         //
508         go c.handleSubscriptionCreate(subs, trans)
509         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
510
511         err = nil
512         if event != nil {
513                 switch themsg := event.(type) {
514                 case *e2ap.E2APSubscriptionResponse:
515                         trans.Release()
516                         return themsg, nil
517                 case *e2ap.E2APSubscriptionFailure:
518                         err = fmt.Errorf("E2 SubscriptionFailure received")
519                         return nil, err
520                 default:
521                         err = fmt.Errorf("unexpected E2 subscription response received")
522                         break
523                 }
524         } else {
525                 err = fmt.Errorf("E2 subscription response timeout")
526         }
527
528         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
529         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
530         return nil, err
531 }
532
533 //-------------------------------------------------------------------
534 //
535 //-------------------------------------------------------------------
536 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
537         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
538
539         // Send notification to xApp that prosessing of a Subscription Request has failed.
540         e2EventInstanceID := (int64)(0)
541         errorCause := err.Error()
542         resp := &models.SubscriptionResponse{
543                 SubscriptionID: restSubId,
544                 SubscriptionInstances: []*models.SubscriptionInstance{
545                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
546                                 ErrorCause:          &errorCause,
547                                 XappEventInstanceID: &xAppEventInstanceID},
548                 },
549         }
550         // Mark REST subscription request processed.
551         restSubscription.SetProcessed(err)
552         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
553         if trans != nil {
554                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
555                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
556         } else {
557                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
558                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
559         }
560
561         c.UpdateCounter(cRestSubFailNotifToXapp)
562         xapp.Subscription.Notify(resp, *clientEndpoint)
563 }
564
565 //-------------------------------------------------------------------
566 //
567 //-------------------------------------------------------------------
568 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
569         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
570
571         // Store successfully processed InstanceId for deletion
572         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
573         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
574
575         // Send notification to xApp that a Subscription Request has been processed.
576         resp := &models.SubscriptionResponse{
577                 SubscriptionID: restSubId,
578                 SubscriptionInstances: []*models.SubscriptionInstance{
579                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
580                                 ErrorCause:          nil,
581                                 XappEventInstanceID: &xAppEventInstanceID},
582                 },
583         }
584         // Mark REST subscription request processesd.
585         restSubscription.SetProcessed(nil)
586         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
587         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
588                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
589
590         c.UpdateCounter(cRestSubNotifToXapp)
591         xapp.Subscription.Notify(resp, *clientEndpoint)
592 }
593
594 //-------------------------------------------------------------------
595 //
596 //-------------------------------------------------------------------
597 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
598
599         c.CntRecvMsg++
600         c.UpdateCounter(cRestSubDelReqFromXapp)
601
602         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
603
604         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
605         if err != nil {
606                 xapp.Logger.Error("%s", err.Error())
607                 if restSubscription == nil {
608                         // Subscription was not found
609                         return nil
610                 } else {
611                         if restSubscription.SubReqOngoing == true {
612                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
613                                 xapp.Logger.Error("%s", err.Error())
614                                 return err
615                         } else if restSubscription.SubDelReqOngoing == true {
616                                 // Previous request for same restSubId still ongoing
617                                 return nil
618                         }
619                 }
620         }
621
622         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
623         go func() {
624                 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
625                 for _, instanceId := range restSubscription.InstanceIds {
626                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
627
628                         if err != nil {
629                                 xapp.Logger.Error("%s", err.Error())
630                                 //return err
631                         }
632                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
633                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
634                         restSubscription.DeleteE2InstanceId(instanceId)
635                 }
636                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
637                 c.registry.DeleteRESTSubscription(&restSubId)
638                 c.RemoveRESTSubscriptionFromDb(restSubId)
639         }()
640
641         c.UpdateCounter(cRestSubDelRespToXapp)
642
643         return nil
644 }
645
646 //-------------------------------------------------------------------
647 //
648 //-------------------------------------------------------------------
649 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
650
651         var xAppEventInstanceID int64
652         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
653         if err != nil {
654                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
655                         restSubId, instanceId, idstring(err, nil))
656                 return xAppEventInstanceID, nil
657         }
658
659         xAppEventInstanceID = int64(subs.ReqId.Id)
660         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
661         if trans == nil {
662                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
663                 xapp.Logger.Error("%s", err.Error())
664         }
665         defer trans.Release()
666
667         err = c.tracker.Track(trans)
668         if err != nil {
669                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
670                 xapp.Logger.Error("%s", err.Error())
671                 return xAppEventInstanceID, &time.ParseError{}
672         }
673         //
674         // Wake subs delete
675         //
676         go c.handleSubscriptionDelete(subs, trans)
677         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
678
679         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
680
681         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
682
683         return xAppEventInstanceID, nil
684 }
685
686 //-------------------------------------------------------------------
687 //
688 //-------------------------------------------------------------------
689 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
690         xapp.Logger.Info("QueryHandler() called")
691
692         c.CntRecvMsg++
693
694         return c.registry.QueryHandler()
695 }
696
697 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
698         xapp.Logger.Info("TestRestHandler() called")
699
700         pathParams := mux.Vars(r)
701         s := pathParams["testId"]
702
703         // This can be used to delete single subscription from db
704         if contains := strings.Contains(s, "deletesubid="); contains == true {
705                 var splits = strings.Split(s, "=")
706                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
707                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
708                         c.RemoveSubscriptionFromSdl(uint32(subId))
709                         return
710                 }
711         }
712
713         // This can be used to remove all subscriptions db from
714         if s == "emptydb" {
715                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
716                 c.RemoveAllSubscriptionsFromSdl()
717                 c.RemoveAllRESTSubscriptionsFromSdl()
718                 return
719         }
720
721         // This is meant to cause submgr's restart in testing
722         if s == "restart" {
723                 xapp.Logger.Info("os.Exit(1) called")
724                 os.Exit(1)
725         }
726
727         xapp.Logger.Info("Unsupported rest command received %s", s)
728 }
729
730 //-------------------------------------------------------------------
731 //
732 //-------------------------------------------------------------------
733
734 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
735         params := &xapp.RMRParams{}
736         params.Mtype = trans.GetMtype()
737         params.SubId = int(subs.GetReqId().InstanceId)
738         params.Xid = ""
739         params.Meid = subs.GetMeid()
740         params.Src = ""
741         params.PayloadLen = len(trans.Payload.Buf)
742         params.Payload = trans.Payload.Buf
743         params.Mbuf = nil
744         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
745         err = c.SendWithRetry(params, false, 5)
746         if err != nil {
747                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
748         }
749         return err
750 }
751
752 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
753
754         params := &xapp.RMRParams{}
755         params.Mtype = trans.GetMtype()
756         params.SubId = int(subs.GetReqId().InstanceId)
757         params.Xid = trans.GetXid()
758         params.Meid = trans.GetMeid()
759         params.Src = ""
760         params.PayloadLen = len(trans.Payload.Buf)
761         params.Payload = trans.Payload.Buf
762         params.Mbuf = nil
763         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
764         err = c.SendWithRetry(params, false, 5)
765         if err != nil {
766                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
767         }
768         return err
769 }
770
771 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
772         if c.RMRClient == nil {
773                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
774                 xapp.Logger.Error("%s", err.Error())
775                 return
776         }
777         c.CntRecvMsg++
778
779         defer c.RMRClient.Free(msg.Mbuf)
780
781         // xapp-frame might use direct access to c buffer and
782         // when msg.Mbuf is freed, someone might take it into use
783         // and payload data might be invalid inside message handle function
784         //
785         // subscriptions won't load system a lot so there is no
786         // real performance hit by cloning buffer into new go byte slice
787         cPay := append(msg.Payload[:0:0], msg.Payload...)
788         msg.Payload = cPay
789         msg.PayloadLen = len(cPay)
790
791         switch msg.Mtype {
792         case xapp.RIC_SUB_REQ:
793                 go c.handleXAPPSubscriptionRequest(msg)
794         case xapp.RIC_SUB_RESP:
795                 go c.handleE2TSubscriptionResponse(msg)
796         case xapp.RIC_SUB_FAILURE:
797                 go c.handleE2TSubscriptionFailure(msg)
798         case xapp.RIC_SUB_DEL_REQ:
799                 go c.handleXAPPSubscriptionDeleteRequest(msg)
800         case xapp.RIC_SUB_DEL_RESP:
801                 go c.handleE2TSubscriptionDeleteResponse(msg)
802         case xapp.RIC_SUB_DEL_FAILURE:
803                 go c.handleE2TSubscriptionDeleteFailure(msg)
804         default:
805                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
806         }
807         return
808 }
809
810 //-------------------------------------------------------------------
811 // handle from XAPP Subscription Request
812 //------------------------------------------------------------------
813 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
814         xapp.Logger.Info("MSG from XAPP: %s", params.String())
815         c.UpdateCounter(cSubReqFromXapp)
816
817         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
818         if err != nil {
819                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
820                 return
821         }
822
823         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
824         if trans == nil {
825                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
826                 return
827         }
828         defer trans.Release()
829
830         if err = c.tracker.Track(trans); err != nil {
831                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
832                 return
833         }
834
835         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
836         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
837         if err != nil {
838                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
839                 return
840         }
841
842         c.wakeSubscriptionRequest(subs, trans)
843 }
844
845 //-------------------------------------------------------------------
846 // Wake Subscription Request to E2node
847 //------------------------------------------------------------------
848 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
849
850         go c.handleSubscriptionCreate(subs, trans)
851         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
852         var err error
853         if event != nil {
854                 switch themsg := event.(type) {
855                 case *e2ap.E2APSubscriptionResponse:
856                         themsg.RequestId.Id = trans.RequestId.Id
857                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
858                         if err == nil {
859                                 trans.Release()
860                                 c.UpdateCounter(cSubRespToXapp)
861                                 c.rmrSendToXapp("", subs, trans)
862                                 return
863                         }
864                 case *e2ap.E2APSubscriptionFailure:
865                         themsg.RequestId.Id = trans.RequestId.Id
866                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
867                         if err == nil {
868                                 c.UpdateCounter(cSubFailToXapp)
869                                 c.rmrSendToXapp("", subs, trans)
870                         }
871                 default:
872                         break
873                 }
874         }
875         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
876         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
877 }
878
879 //-------------------------------------------------------------------
880 // handle from XAPP Subscription Delete Request
881 //------------------------------------------------------------------
882 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
883         xapp.Logger.Info("MSG from XAPP: %s", params.String())
884         c.UpdateCounter(cSubDelReqFromXapp)
885
886         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
887         if err != nil {
888                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
889                 return
890         }
891
892         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
893         if trans == nil {
894                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
895                 return
896         }
897         defer trans.Release()
898
899         err = c.tracker.Track(trans)
900         if err != nil {
901                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
902                 return
903         }
904
905         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
906         if err != nil {
907                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
908                 return
909         }
910
911         //
912         // Wake subs delete
913         //
914         go c.handleSubscriptionDelete(subs, trans)
915         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
916
917         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
918
919         if subs.NoRespToXapp == true {
920                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
921                 return
922         }
923
924         // Whatever is received success, fail or timeout, send successful delete response
925         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
926         subDelRespMsg.RequestId.Id = trans.RequestId.Id
927         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
928         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
929         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
930         if err == nil {
931                 c.UpdateCounter(cSubDelRespToXapp)
932                 c.rmrSendToXapp("", subs, trans)
933         }
934
935         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
936         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
937 }
938
939 //-------------------------------------------------------------------
940 // SUBS CREATE Handling
941 //-------------------------------------------------------------------
942 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
943
944         var removeSubscriptionFromDb bool = false
945         trans := c.tracker.NewSubsTransaction(subs)
946         subs.WaitTransactionTurn(trans)
947         defer subs.ReleaseTransactionTurn(trans)
948         defer trans.Release()
949
950         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
951
952         subRfMsg, valid := subs.GetCachedResponse()
953         if subRfMsg == nil && valid == true {
954                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
955                 switch event.(type) {
956                 case *e2ap.E2APSubscriptionResponse:
957                         subRfMsg, valid = subs.SetCachedResponse(event, true)
958                         subs.SubRespRcvd = true
959                 case *e2ap.E2APSubscriptionFailure:
960                         removeSubscriptionFromDb = true
961                         subRfMsg, valid = subs.SetCachedResponse(event, false)
962                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
963                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
964                 case *SubmgrRestartTestEvent:
965                         // This simulates that no response has been received and after restart subscriptions are restored from db
966                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
967                         return
968                 default:
969                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
970                         removeSubscriptionFromDb = true
971                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
972                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
973                 }
974                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
975         } else {
976                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
977         }
978
979         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
980         if valid == false {
981                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
982         }
983
984         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
985         parentTrans.SendEvent(subRfMsg, 0)
986 }
987
988 //-------------------------------------------------------------------
989 // SUBS DELETE Handling
990 //-------------------------------------------------------------------
991
992 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
993
994         trans := c.tracker.NewSubsTransaction(subs)
995         subs.WaitTransactionTurn(trans)
996         defer subs.ReleaseTransactionTurn(trans)
997         defer trans.Release()
998
999         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1000
1001         subs.mutex.Lock()
1002
1003         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1004                 subs.valid = false
1005                 subs.mutex.Unlock()
1006                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1007         } else {
1008                 subs.mutex.Unlock()
1009         }
1010         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1011         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1012         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1013         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1014         c.registry.UpdateSubscriptionToDb(subs, c)
1015         parentTrans.SendEvent(nil, 0)
1016 }
1017
1018 //-------------------------------------------------------------------
1019 // send to E2T Subscription Request
1020 //-------------------------------------------------------------------
1021 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1022         var err error
1023         var event interface{} = nil
1024         var timedOut bool = false
1025         const ricRequestorId = 123
1026
1027         subReqMsg := subs.SubReqMsg
1028         subReqMsg.RequestId = subs.GetReqId().RequestId
1029         subReqMsg.RequestId.Id = ricRequestorId
1030         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1031         if err != nil {
1032                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1033                 return event
1034         }
1035
1036         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1037         c.WriteSubscriptionToDb(subs)
1038
1039         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1040                 desc := fmt.Sprintf("(retry %d)", retries)
1041                 if retries == 0 {
1042                         c.UpdateCounter(cSubReqToE2)
1043                 } else {
1044                         c.UpdateCounter(cSubReReqToE2)
1045                 }
1046                 c.rmrSendToE2T(desc, subs, trans)
1047                 if subs.DoNotWaitSubResp == false {
1048                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1049                         if timedOut {
1050                                 c.UpdateCounter(cSubReqTimerExpiry)
1051                                 continue
1052                         }
1053                 } else {
1054                         // Simulating case where subscrition request has been sent but response has not been received before restart
1055                         event = &SubmgrRestartTestEvent{}
1056                 }
1057                 break
1058         }
1059         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1060         return event
1061 }
1062
1063 //-------------------------------------------------------------------
1064 // send to E2T Subscription Delete Request
1065 //-------------------------------------------------------------------
1066
1067 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1068         var err error
1069         var event interface{}
1070         var timedOut bool
1071         const ricRequestorId = 123
1072
1073         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1074         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1075         subDelReqMsg.RequestId.Id = ricRequestorId
1076         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1077         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1078         if err != nil {
1079                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1080                 return event
1081         }
1082
1083         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1084                 desc := fmt.Sprintf("(retry %d)", retries)
1085                 if retries == 0 {
1086                         c.UpdateCounter(cSubDelReqToE2)
1087                 } else {
1088                         c.UpdateCounter(cSubDelReReqToE2)
1089                 }
1090                 c.rmrSendToE2T(desc, subs, trans)
1091                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1092                 if timedOut {
1093                         c.UpdateCounter(cSubDelReqTimerExpiry)
1094                         continue
1095                 }
1096                 break
1097         }
1098         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1099         return event
1100 }
1101
1102 //-------------------------------------------------------------------
1103 // handle from E2T Subscription Response
1104 //-------------------------------------------------------------------
1105 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1106         xapp.Logger.Info("MSG from E2T: %s", params.String())
1107         c.UpdateCounter(cSubRespFromE2)
1108
1109         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1110         if err != nil {
1111                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1112                 return
1113         }
1114         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1115         if err != nil {
1116                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1117                 return
1118         }
1119         trans := subs.GetTransaction()
1120         if trans == nil {
1121                 err = fmt.Errorf("Ongoing transaction not found")
1122                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1123                 return
1124         }
1125         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1126         if sendOk == false {
1127                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1128                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1129         }
1130         return
1131 }
1132
1133 //-------------------------------------------------------------------
1134 // handle from E2T Subscription Failure
1135 //-------------------------------------------------------------------
1136 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1137         xapp.Logger.Info("MSG from E2T: %s", params.String())
1138         c.UpdateCounter(cSubFailFromE2)
1139         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1140         if err != nil {
1141                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1142                 return
1143         }
1144         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1145         if err != nil {
1146                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1147                 return
1148         }
1149         trans := subs.GetTransaction()
1150         if trans == nil {
1151                 err = fmt.Errorf("Ongoing transaction not found")
1152                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1153                 return
1154         }
1155         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1156         if sendOk == false {
1157                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1158                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1159         }
1160         return
1161 }
1162
1163 //-------------------------------------------------------------------
1164 // handle from E2T Subscription Delete Response
1165 //-------------------------------------------------------------------
1166 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1167         xapp.Logger.Info("MSG from E2T: %s", params.String())
1168         c.UpdateCounter(cSubDelRespFromE2)
1169         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1170         if err != nil {
1171                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1172                 return
1173         }
1174         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1175         if err != nil {
1176                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1177                 return
1178         }
1179         trans := subs.GetTransaction()
1180         if trans == nil {
1181                 err = fmt.Errorf("Ongoing transaction not found")
1182                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1183                 return
1184         }
1185         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1186         if sendOk == false {
1187                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1188                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1189         }
1190         return
1191 }
1192
1193 //-------------------------------------------------------------------
1194 // handle from E2T Subscription Delete Failure
1195 //-------------------------------------------------------------------
1196 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1197         xapp.Logger.Info("MSG from E2T: %s", params.String())
1198         c.UpdateCounter(cSubDelFailFromE2)
1199         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1200         if err != nil {
1201                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1202                 return
1203         }
1204         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1205         if err != nil {
1206                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1207                 return
1208         }
1209         trans := subs.GetTransaction()
1210         if trans == nil {
1211                 err = fmt.Errorf("Ongoing transaction not found")
1212                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1213                 return
1214         }
1215         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1216         if sendOk == false {
1217                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1218                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1219         }
1220         return
1221 }
1222
1223 //-------------------------------------------------------------------
1224 //
1225 //-------------------------------------------------------------------
1226 func typeofSubsMessage(v interface{}) string {
1227         if v == nil {
1228                 return "NIL"
1229         }
1230         switch v.(type) {
1231         //case *e2ap.E2APSubscriptionRequest:
1232         //      return "SubReq"
1233         case *e2ap.E2APSubscriptionResponse:
1234                 return "SubResp"
1235         case *e2ap.E2APSubscriptionFailure:
1236                 return "SubFail"
1237         //case *e2ap.E2APSubscriptionDeleteRequest:
1238         //      return "SubDelReq"
1239         case *e2ap.E2APSubscriptionDeleteResponse:
1240                 return "SubDelResp"
1241         case *e2ap.E2APSubscriptionDeleteFailure:
1242                 return "SubDelFail"
1243         default:
1244                 return "Unknown"
1245         }
1246 }
1247
1248 //-------------------------------------------------------------------
1249 //
1250 //-------------------------------------------------------------------
1251 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1252         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1253         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1254         if err != nil {
1255                 xapp.Logger.Error("%v", err)
1256         }
1257 }
1258
1259 //-------------------------------------------------------------------
1260 //
1261 //-------------------------------------------------------------------
1262 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1263
1264         if removeSubscriptionFromDb == true {
1265                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1266                 c.RemoveSubscriptionFromDb(subs)
1267         } else {
1268                 // Update is needed for successful response and merge case here
1269                 if subs.RetryFromXapp == false {
1270                         c.WriteSubscriptionToDb(subs)
1271                 }
1272         }
1273         subs.RetryFromXapp = false
1274 }
1275
1276 //-------------------------------------------------------------------
1277 //
1278 //-------------------------------------------------------------------
1279 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1280         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1281         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1282         if err != nil {
1283                 xapp.Logger.Error("%v", err)
1284         }
1285 }
1286
1287 //-------------------------------------------------------------------
1288 //
1289 //-------------------------------------------------------------------
1290 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1291         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1292         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1293         if err != nil {
1294                 xapp.Logger.Error("%v", err)
1295         }
1296 }
1297
1298 //-------------------------------------------------------------------
1299 //
1300 //-------------------------------------------------------------------
1301 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1302
1303         if removeRestSubscriptionFromDb == true {
1304                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1305                 c.RemoveRESTSubscriptionFromDb(restSubId)
1306         } else {
1307                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1308         }
1309 }
1310
1311 //-------------------------------------------------------------------
1312 //
1313 //-------------------------------------------------------------------
1314 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1315         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1316         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1317         if err != nil {
1318                 xapp.Logger.Error("%v", err)
1319         }
1320 }
1321
1322 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1323
1324         const ricRequestorId = 123
1325         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1326
1327         // Send delete for every endpoint in the subscription
1328         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1329         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1330         subDelReqMsg.RequestId.Id = ricRequestorId
1331         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1332         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1333         if err != nil {
1334                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1335                 return
1336         }
1337         for _, endPoint := range subs.EpList.Endpoints {
1338                 params := &xapp.RMRParams{}
1339                 params.Mtype = mType
1340                 params.SubId = int(subs.GetReqId().InstanceId)
1341                 params.Xid = ""
1342                 params.Meid = subs.Meid
1343                 params.Src = endPoint.String()
1344                 params.PayloadLen = len(payload.Buf)
1345                 params.Payload = payload.Buf
1346                 params.Mbuf = nil
1347                 subs.DeleteFromDb = true
1348                 c.handleXAPPSubscriptionDeleteRequest(params)
1349         }
1350 }
1351
1352 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1353
1354         fmt.Println("CRESTSubscriptionRequest")
1355
1356         if p == nil {
1357                 return
1358         }
1359
1360         if p.SubscriptionID != "" {
1361                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1362         } else {
1363                 fmt.Println("  SubscriptionID = ''")
1364         }
1365
1366         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1367
1368         if p.ClientEndpoint.HTTPPort != nil {
1369                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1370         } else {
1371                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1372         }
1373
1374         if p.ClientEndpoint.RMRPort != nil {
1375                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1376         } else {
1377                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1378         }
1379
1380         if p.Meid != nil {
1381                 fmt.Printf("  Meid = %s\n", *p.Meid)
1382         } else {
1383                 fmt.Println("  Meid = nil")
1384         }
1385
1386         for _, subscriptionDetail := range p.SubscriptionDetails {
1387                 if p.RANFunctionID != nil {
1388                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1389                 } else {
1390                         fmt.Println("  RANFunctionID = nil")
1391                 }
1392                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1393                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1394
1395                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1396                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1397                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1398                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1399
1400                         if actionToBeSetup.SubsequentAction != nil {
1401                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1402                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1403                         } else {
1404                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1405                         }
1406                 }
1407         }
1408 }