RIC-851: Updated submgr to trigger Delete Subscription Procedure
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75
76 type Control struct {
77         *xapp.RMRClient
78         e2ap              *E2ap
79         registry          *Registry
80         tracker           *Tracker
81         restDuplicateCtrl *DuplicateCtrl
82         e2IfState         *E2IfState
83         e2IfStateDb       XappRnibInterface
84         e2SubsDb          Sdlnterface
85         restSubsDb        Sdlnterface
86         CntRecvMsg        uint64
87         ResetTestFlag     bool
88         Counters          map[string]xapp.Counter
89         LoggerLevel       int
90         UTTesting         bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         restDuplicateCtrl := new(DuplicateCtrl)
137         restDuplicateCtrl.Init()
138
139         e2IfState := new(E2IfState)
140
141         c := &Control{e2ap: new(E2ap),
142                 registry:          registry,
143                 tracker:           tracker,
144                 restDuplicateCtrl: restDuplicateCtrl,
145                 e2IfState:         e2IfState,
146                 e2IfStateDb:       CreateXappRnibIfInstance(),
147                 e2SubsDb:          CreateSdl(),
148                 restSubsDb:        CreateRESTSdl(),
149                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
150                 LoggerLevel:       1,
151         }
152
153         e2IfState.Init(c)
154         c.ReadConfigParameters("")
155
156         // Register REST handler for testing support
157         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
160
161         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
163
164         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
167
168         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
170
171         if readSubsFromDb == "true" {
172                 // Read subscriptions from db
173                 err := c.ReadE2Subscriptions()
174                 if err != nil {
175                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
176                 }
177                 err = c.ReadRESTSubscriptions()
178                 if err != nil {
179                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
180                 }
181         }
182
183         go func() {
184                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
185                 if err != nil {
186                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
187                 }
188         }()
189         return c
190 }
191
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193         subscriptions, err := c.registry.QueryHandler()
194         if err != nil {
195                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
196         }
197
198         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
199 }
200
201 //-------------------------------------------------------------------
202 //
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205         xapp.Logger.Debug("RESTQueryHandler() called")
206
207         c.CntRecvMsg++
208
209         return c.registry.QueryHandler()
210 }
211
212 //-------------------------------------------------------------------
213 //
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
216         var err error
217         var subIds []uint32
218         var register map[uint32]*Subscription
219         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220                 xapp.Logger.Debug("Reading E2 subscriptions from db")
221                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
222                 if err != nil {
223                         xapp.Logger.Error("%v", err)
224                         <-time.After(1 * time.Second)
225                 } else {
226                         c.registry.subIds = subIds
227                         c.registry.register = register
228                         go c.HandleUncompletedSubscriptions(register)
229                         return nil
230                 }
231         }
232         xapp.Logger.Debug("Continuing without retring")
233         return err
234 }
235
236 //-------------------------------------------------------------------
237 //
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
240
241         xapp.Logger.Debug("ReadRESTSubscriptions()")
242         var err error
243         var restSubscriptions map[string]*RESTSubscription
244         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245                 xapp.Logger.Debug("Reading REST subscriptions from db")
246                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
247                 if err != nil {
248                         xapp.Logger.Error("%v", err)
249                         <-time.After(1 * time.Second)
250                 } else {
251                         // Fix REST subscriptions ongoing status after restart
252                         for restSubId, restSubscription := range restSubscriptions {
253                                 restSubscription.SubReqOngoing = false
254                                 restSubscription.SubDelReqOngoing = false
255                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
256                                 if err != nil {
257                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
258                                 }
259                         }
260                         c.registry.restSubscriptions = restSubscriptions
261                         return nil
262                 }
263         }
264         xapp.Logger.Debug("Continuing without retring")
265         return err
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
272
273         xapp.Logger.Debug("ReadConfigParameters")
274
275         c.LoggerLevel = int(xapp.Logger.GetLevel())
276         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
278
279         // viper.GetDuration returns nanoseconds
280         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281         if e2tSubReqTimeout == 0 {
282                 e2tSubReqTimeout = 2000 * 1000000
283                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
284         }
285         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
286
287         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288         if e2tSubDelReqTime == 0 {
289                 e2tSubDelReqTime = 2000 * 1000000
290                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
291         }
292         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
293
294         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295         if e2tRecvMsgTimeout == 0 {
296                 e2tRecvMsgTimeout = 2000 * 1000000
297                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
298         }
299         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
300
301         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302         if e2tMaxSubReqTryCount == 0 {
303                 e2tMaxSubReqTryCount = 1
304                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
305         }
306         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
307
308         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309         if e2tMaxSubDelReqTryCount == 0 {
310                 e2tMaxSubDelReqTryCount = 1
311                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
312         }
313         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
314
315         checkE2State = viper.GetString("controls.checkE2State")
316         if checkE2State == "" {
317                 checkE2State = "true"
318                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
319         }
320         xapp.Logger.Debug("checkE2State= %v", checkE2State)
321
322         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323         if readSubsFromDb == "" {
324                 readSubsFromDb = "true"
325                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
326         }
327         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
328
329         dbTryCount = viper.GetInt("controls.dbTryCount")
330         if dbTryCount == 0 {
331                 dbTryCount = 200
332                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
333         }
334         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
335
336         dbRetryForever = viper.GetString("controls.dbRetryForever")
337         if dbRetryForever == "" {
338                 dbRetryForever = "true"
339                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
340         }
341         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
342
343         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344         // value 100ms used currently only in unittests.
345         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346         if waitRouteCleanup_ms == 0 {
347                 waitRouteCleanup_ms = 5000 * 1000000
348                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
349         }
350         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
351 }
352
353 //-------------------------------------------------------------------
354 //
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
357
358         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359         for subId, subs := range register {
360                 if subs.SubRespRcvd == false {
361                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362                         if subs.PolicyUpdate == false {
363                                 subs.NoRespToXapp = true
364                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365                                 c.SendSubscriptionDeleteReq(subs, false)
366                         }
367                 }
368         }
369 }
370
371 func (c *Control) ReadyCB(data interface{}) {
372         if c.RMRClient == nil {
373                 c.RMRClient = xapp.Rmr
374         }
375 }
376
377 func (c *Control) Run() {
378         xapp.SetReadyCB(c.ReadyCB, nil)
379         xapp.AddConfigChangeListener(c.ReadConfigParameters)
380         xapp.Run(c)
381 }
382
383 //-------------------------------------------------------------------
384 //
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
387
388         var restSubId string
389         var restSubscription *RESTSubscription
390         var err error
391
392         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393         if p.SubscriptionID == "" {
394                 // Subscription does not contain REST subscription Id
395                 if exists {
396                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397                         if restSubscription != nil {
398                                 // Subscription not found
399                                 restSubId = prevRestSubsId
400                                 if err == nil {
401                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
402                                 } else {
403                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
404                                 }
405                         } else {
406                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
408                         }
409                 }
410
411                 if restSubscription == nil {
412                         restSubId = ksuid.New().String()
413                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
414                 }
415         } else {
416                 // Subscription contains REST subscription Id
417                 restSubId = p.SubscriptionID
418
419                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
421                 if err != nil {
422                         // Subscription with id in REST request does not exist
423                         xapp.Logger.Error("%s", err.Error())
424                         c.UpdateCounter(cRestSubFailToXapp)
425                         return nil, "", err
426                 }
427
428                 if !exists {
429                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
430                 } else {
431                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
432                 }
433         }
434
435         return restSubscription, restSubId, nil
436 }
437
438 //-------------------------------------------------------------------
439 //
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
442
443         c.CntRecvMsg++
444         c.UpdateCounter(cRestSubReqFromXapp)
445
446         subResp := models.SubscriptionResponse{}
447         p := params.(*models.SubscriptionParams)
448
449         if c.LoggerLevel > 2 {
450                 c.PrintRESTSubscriptionRequest(p)
451         }
452
453         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
454                 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
455                 c.UpdateCounter(cRestReqRejDueE2Down)
456                 return nil, common.SubscribeServiceUnavailableCode
457         }
458
459         if p.ClientEndpoint == nil {
460                 err := fmt.Errorf("ClientEndpoint == nil")
461                 xapp.Logger.Error("%v", err)
462                 c.UpdateCounter(cRestSubFailToXapp)
463                 return nil, common.SubscribeBadRequestCode
464         }
465
466         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
467         if err != nil {
468                 xapp.Logger.Error("%s", err)
469                 c.UpdateCounter(cRestSubFailToXapp)
470                 return nil, common.SubscribeBadRequestCode
471         }
472         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
473         if err != nil {
474                 xapp.Logger.Error("%s", err.Error())
475                 c.UpdateCounter(cRestSubFailToXapp)
476                 return nil, common.SubscribeBadRequestCode
477         }
478
479         md5sum, err := CalculateRequestMd5sum(params)
480         if err != nil {
481                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
482         }
483
484         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
485         if err != nil {
486                 xapp.Logger.Error("Subscription with id in REST request does not exist")
487                 return nil, common.SubscribeNotFoundCode
488         }
489
490         subResp.SubscriptionID = &restSubId
491         subReqList := e2ap.SubscriptionRequestList{}
492         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
493         if err != nil {
494                 xapp.Logger.Error("%s", err.Error())
495                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
496                 c.registry.DeleteRESTSubscription(&restSubId)
497                 c.UpdateCounter(cRestSubFailToXapp)
498                 return nil, common.SubscribeBadRequestCode
499         }
500
501         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
502         if duplicate {
503                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
504                 xapp.Logger.Debug("%s", err)
505                 c.registry.DeleteRESTSubscription(&restSubId)
506                 c.UpdateCounter(cRestSubRespToXapp)
507                 return &subResp, common.SubscribeCreatedCode
508         }
509
510         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
511         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
512
513         c.UpdateCounter(cRestSubRespToXapp)
514         return &subResp, common.SubscribeCreatedCode
515 }
516
517 //-------------------------------------------------------------------
518 //
519 //-------------------------------------------------------------------
520 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
521
522         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
523         if p == nil || p.E2SubscriptionDirectives == nil {
524                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
525                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
526                 e2SubscriptionDirectives.CreateRMRRoute = true
527                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
528         } else {
529                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
530                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
531                 } else {
532                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
533                 }
534                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
535                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
536                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
537                 } else {
538                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
539                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
540                         } else {
541                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
542                         }
543                 }
544                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
545         }
546         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
547         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
548         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
549         return e2SubscriptionDirectives, nil
550 }
551
552 //-------------------------------------------------------------------
553 //
554 //-------------------------------------------------------------------
555
556 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
557         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
558
559         c.SubscriptionProcessingStartDelay()
560         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
561
562         var xAppEventInstanceID int64
563         var e2EventInstanceID int64
564         errorInfo := &ErrorInfo{}
565
566         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
567
568         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
569                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
570                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
571
572                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
573                 if trans == nil {
574                         // Send notification to xApp that prosessing of a Subscription Request has failed.
575                         err := fmt.Errorf("Tracking failure")
576                         errorInfo.ErrorCause = err.Error()
577                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
578                         continue
579                 }
580
581                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
582
583                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
584
585                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
586                 trans.Release()
587
588                 if err != nil {
589                         if err.Error() == "TEST: restart event received" {
590                                 // This is just for UT cases. Stop here subscription processing
591                                 return
592                         }
593                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
594                 } else {
595                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
596                         restSubscription.AddMd5Sum(md5sum)
597                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
598                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
599                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
600                 }
601         }
602 }
603
604 //-------------------------------------------------------------------
605 //
606 //------------------------------------------------------------------
607 func (c *Control) SubscriptionProcessingStartDelay() {
608         if c.UTTesting == true {
609                 // This is temporary fix for the UT problem that notification arrives before subscription response
610                 // Correct fix would be to allow notification come before response and process it correctly
611                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
612                 <-time.After(time.Millisecond * 50)
613                 xapp.Logger.Debug("Continuing after delay")
614         }
615 }
616
617 //-------------------------------------------------------------------
618 //
619 //------------------------------------------------------------------
620 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
621         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
622
623         errorInfo := ErrorInfo{}
624
625         err := c.tracker.Track(trans)
626         if err != nil {
627                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
628                 errorInfo.ErrorCause = err.Error()
629                 err = fmt.Errorf("Tracking failure")
630                 return nil, &errorInfo, err
631         }
632
633         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
634         if err != nil {
635                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
636                 return nil, &errorInfo, err
637         }
638
639         //
640         // Wake subs request
641         //
642         subs.OngoingReqCount++
643         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
644         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
645         subs.OngoingReqCount--
646
647         err = nil
648         if event != nil {
649                 switch themsg := event.(type) {
650                 case *e2ap.E2APSubscriptionResponse:
651                         trans.Release()
652                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
653                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
654                                 return themsg, &errorInfo, nil
655                         } else {
656                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
657                                 c.RemoveSubscriptionFromDb(subs)
658                                 err = fmt.Errorf("E2 interface down")
659                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
660                         }
661                 case *e2ap.E2APSubscriptionFailure:
662                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
663                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
664                 case *PackSubscriptionRequestErrortEvent:
665                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
666                         errorInfo = themsg.ErrorInfo
667                 case *SDLWriteErrortEvent:
668                         err = fmt.Errorf("SDL write failure")
669                         errorInfo = themsg.ErrorInfo
670                 case *SubmgrRestartTestEvent:
671                         err = fmt.Errorf("TEST: restart event received")
672                         xapp.Logger.Debug("%s", err)
673                         return nil, &errorInfo, err
674                 default:
675                         err = fmt.Errorf("Unexpected E2 subscription response received")
676                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
677                         break
678                 }
679         } else {
680                 // Timer expiry
681                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
682                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
683                 if subs.PolicyUpdate == true {
684                         return nil, &errorInfo, err
685                 }
686         }
687
688         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
689         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
690
691         return nil, &errorInfo, err
692 }
693
694 //-------------------------------------------------------------------
695 //
696 //-------------------------------------------------------------------
697 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
698         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
699
700         // Send notification to xApp that prosessing of a Subscription Request has failed.
701         e2EventInstanceID := (int64)(0)
702         if errorInfo.ErrorSource == "" {
703                 // Submgr is default source of error
704                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
705         }
706         resp := &models.SubscriptionResponse{
707                 SubscriptionID: restSubId,
708                 SubscriptionInstances: []*models.SubscriptionInstance{
709                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
710                                 ErrorCause:          errorInfo.ErrorCause,
711                                 ErrorSource:         errorInfo.ErrorSource,
712                                 TimeoutType:         errorInfo.TimeoutType,
713                                 XappEventInstanceID: &xAppEventInstanceID},
714                 },
715         }
716         // Mark REST subscription request processed.
717         restSubscription.SetProcessed(err)
718         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
719         if trans != nil {
720                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
721                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
722         } else {
723                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
724                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
725         }
726
727         c.UpdateCounter(cRestSubFailNotifToXapp)
728         err = xapp.Subscription.Notify(resp, *clientEndpoint)
729         if err != nil {
730                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
731         }
732
733         // E2 is down. Delete completely processed request safely now
734         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
735                 c.registry.DeleteRESTSubscription(restSubId)
736                 c.RemoveRESTSubscriptionFromDb(*restSubId)
737         }
738 }
739
740 //-------------------------------------------------------------------
741 //
742 //-------------------------------------------------------------------
743 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
744         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
745
746         // Store successfully processed InstanceId for deletion
747         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
748         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
749
750         // Send notification to xApp that a Subscription Request has been processed.
751         resp := &models.SubscriptionResponse{
752                 SubscriptionID: restSubId,
753                 SubscriptionInstances: []*models.SubscriptionInstance{
754                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
755                                 ErrorCause:          errorInfo.ErrorCause,
756                                 ErrorSource:         errorInfo.ErrorSource,
757                                 XappEventInstanceID: &xAppEventInstanceID},
758                 },
759         }
760         // Mark REST subscription request processesd.
761         restSubscription.SetProcessed(nil)
762         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
763         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
764                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
765         c.UpdateCounter(cRestSubNotifToXapp)
766         err := xapp.Subscription.Notify(resp, *clientEndpoint)
767         if err != nil {
768                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
769         }
770
771         // E2 is down. Delete completely processed request safely now
772         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
773                 c.registry.DeleteRESTSubscription(restSubId)
774                 c.RemoveRESTSubscriptionFromDb(*restSubId)
775         }
776 }
777
778 //-------------------------------------------------------------------
779 //
780 //-------------------------------------------------------------------
781 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
782
783         c.CntRecvMsg++
784         c.UpdateCounter(cRestSubDelReqFromXapp)
785
786         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
787
788         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
789         if err != nil {
790                 xapp.Logger.Error("%s", err.Error())
791                 if restSubscription == nil {
792                         // Subscription was not found
793                         c.UpdateCounter(cRestSubDelRespToXapp)
794                         return common.UnsubscribeNoContentCode
795                 } else {
796                         if restSubscription.SubReqOngoing == true {
797                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
798                                 xapp.Logger.Error("%s", err.Error())
799                                 c.UpdateCounter(cRestSubDelFailToXapp)
800                                 return common.UnsubscribeBadRequestCode
801                         } else if restSubscription.SubDelReqOngoing == true {
802                                 // Previous request for same restSubId still ongoing
803                                 c.UpdateCounter(cRestSubDelRespToXapp)
804                                 return common.UnsubscribeNoContentCode
805                         }
806                 }
807         }
808
809         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
810         go func() {
811                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
812                 for _, instanceId := range restSubscription.InstanceIds {
813                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
814
815                         if err != nil {
816                                 xapp.Logger.Error("%s", err.Error())
817                         }
818                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
819                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
820                         restSubscription.DeleteE2InstanceId(instanceId)
821                 }
822                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
823                 c.registry.DeleteRESTSubscription(&restSubId)
824                 c.RemoveRESTSubscriptionFromDb(restSubId)
825         }()
826
827         c.UpdateCounter(cRestSubDelRespToXapp)
828         return common.UnsubscribeNoContentCode
829 }
830
831 //-------------------------------------------------------------------
832 //
833 //-------------------------------------------------------------------
834 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
835
836         var xAppEventInstanceID int64
837         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
838         if err != nil {
839                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
840                         restSubId, instanceId, idstring(err, nil))
841                 return xAppEventInstanceID, nil
842         }
843
844         xAppEventInstanceID = int64(subs.ReqId.Id)
845         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
846         if trans == nil {
847                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
848                 xapp.Logger.Error("%s", err.Error())
849         }
850         defer trans.Release()
851
852         err = c.tracker.Track(trans)
853         if err != nil {
854                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
855                 xapp.Logger.Error("%s", err.Error())
856                 return xAppEventInstanceID, &time.ParseError{}
857         }
858         //
859         // Wake subs delete
860         //
861         subs.OngoingDelCount++
862         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
863         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
864         subs.OngoingDelCount--
865
866         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
867
868         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
869
870         return xAppEventInstanceID, nil
871 }
872
873 //-------------------------------------------------------------------
874 //
875 //-------------------------------------------------------------------
876
877 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
878         params := &xapp.RMRParams{}
879         params.Mtype = trans.GetMtype()
880         params.SubId = int(subs.GetReqId().InstanceId)
881         params.Xid = ""
882         params.Meid = subs.GetMeid()
883         params.Src = ""
884         params.PayloadLen = len(trans.Payload.Buf)
885         params.Payload = trans.Payload.Buf
886         params.Mbuf = nil
887         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
888         err = c.SendWithRetry(params, false, 5)
889         if err != nil {
890                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
891         }
892         return err
893 }
894
895 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
896
897         params := &xapp.RMRParams{}
898         params.Mtype = trans.GetMtype()
899         params.SubId = int(subs.GetReqId().InstanceId)
900         params.Xid = trans.GetXid()
901         params.Meid = trans.GetMeid()
902         params.Src = ""
903         params.PayloadLen = len(trans.Payload.Buf)
904         params.Payload = trans.Payload.Buf
905         params.Mbuf = nil
906         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
907         err = c.SendWithRetry(params, false, 5)
908         if err != nil {
909                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
910         }
911         return err
912 }
913
914 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
915         if c.RMRClient == nil {
916                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
917                 xapp.Logger.Error("%s", err.Error())
918                 return
919         }
920         c.CntRecvMsg++
921
922         defer c.RMRClient.Free(msg.Mbuf)
923
924         // xapp-frame might use direct access to c buffer and
925         // when msg.Mbuf is freed, someone might take it into use
926         // and payload data might be invalid inside message handle function
927         //
928         // subscriptions won't load system a lot so there is no
929         // real performance hit by cloning buffer into new go byte slice
930         cPay := append(msg.Payload[:0:0], msg.Payload...)
931         msg.Payload = cPay
932         msg.PayloadLen = len(cPay)
933
934         switch msg.Mtype {
935         case xapp.RIC_SUB_REQ:
936                 go c.handleXAPPSubscriptionRequest(msg)
937         case xapp.RIC_SUB_RESP:
938                 go c.handleE2TSubscriptionResponse(msg)
939         case xapp.RIC_SUB_FAILURE:
940                 go c.handleE2TSubscriptionFailure(msg)
941         case xapp.RIC_SUB_DEL_REQ:
942                 go c.handleXAPPSubscriptionDeleteRequest(msg)
943         case xapp.RIC_SUB_DEL_RESP:
944                 go c.handleE2TSubscriptionDeleteResponse(msg)
945         case xapp.RIC_SUB_DEL_FAILURE:
946                 go c.handleE2TSubscriptionDeleteFailure(msg)
947         case xapp.RIC_SUB_DEL_REQUIRED:
948                 go c.handleE2TSubscriptionDeleteRequired(msg)
949         default:
950                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
951         }
952         return
953 }
954
955 //-------------------------------------------------------------------
956 // handle from XAPP Subscription Request
957 //------------------------------------------------------------------
958 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
959         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
960         c.UpdateCounter(cSubReqFromXapp)
961
962         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
963                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
964                 return
965         }
966
967         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
968         if err != nil {
969                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
970                 return
971         }
972
973         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
974         if trans == nil {
975                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
976                 return
977         }
978         defer trans.Release()
979
980         if err = c.tracker.Track(trans); err != nil {
981                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
982                 return
983         }
984
985         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
986         if err != nil {
987                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
988                 return
989         }
990
991         c.wakeSubscriptionRequest(subs, trans)
992 }
993
994 //-------------------------------------------------------------------
995 // Wake Subscription Request to E2node
996 //------------------------------------------------------------------
997 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
998
999         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1000         if err != nil {
1001                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1002         }
1003         subs.OngoingReqCount++
1004         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1005         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1006         subs.OngoingReqCount--
1007         if event != nil {
1008                 switch themsg := event.(type) {
1009                 case *e2ap.E2APSubscriptionResponse:
1010                         themsg.RequestId.Id = trans.RequestId.Id
1011                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1012                         if err == nil {
1013                                 trans.Release()
1014                                 c.UpdateCounter(cSubRespToXapp)
1015                                 err := c.rmrSendToXapp("", subs, trans)
1016                                 if err != nil {
1017                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1018                                 }
1019                                 return
1020                         }
1021                 case *e2ap.E2APSubscriptionFailure:
1022                         themsg.RequestId.Id = trans.RequestId.Id
1023                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1024                         if err == nil {
1025                                 c.UpdateCounter(cSubFailToXapp)
1026                                 c.rmrSendToXapp("", subs, trans)
1027                         }
1028                 default:
1029                         break
1030                 }
1031         }
1032         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1033 }
1034
1035 //-------------------------------------------------------------------
1036 // handle from XAPP Subscription Delete Request
1037 //------------------------------------------------------------------
1038 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1039         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1040         c.UpdateCounter(cSubDelReqFromXapp)
1041
1042         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1043                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1044                 return
1045         }
1046
1047         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1048         if err != nil {
1049                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1050                 return
1051         }
1052
1053         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1054         if trans == nil {
1055                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1056                 return
1057         }
1058         defer trans.Release()
1059
1060         err = c.tracker.Track(trans)
1061         if err != nil {
1062                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1063                 return
1064         }
1065
1066         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1067         if err != nil {
1068                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1069                 return
1070         }
1071
1072         //
1073         // Wake subs delete
1074         //
1075         subs.OngoingDelCount++
1076         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1077         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1078         subs.OngoingDelCount--
1079
1080         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1081
1082         if subs.NoRespToXapp == true {
1083                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1084                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1085                 return
1086         }
1087
1088         // Whatever is received success, fail or timeout, send successful delete response
1089         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1090         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1091         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1092         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1093         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1094         if err == nil {
1095                 c.UpdateCounter(cSubDelRespToXapp)
1096                 err := c.rmrSendToXapp("", subs, trans)
1097                 if err != nil {
1098                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1099                 }
1100         }
1101 }
1102
1103 //-------------------------------------------------------------------
1104 // SUBS CREATE Handling
1105 //-------------------------------------------------------------------
1106 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1107
1108         var event interface{} = nil
1109         var removeSubscriptionFromDb bool = false
1110         trans := c.tracker.NewSubsTransaction(subs)
1111         subs.WaitTransactionTurn(trans)
1112         defer subs.ReleaseTransactionTurn(trans)
1113         defer trans.Release()
1114
1115         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1116
1117         subRfMsg, valid := subs.GetCachedResponse()
1118         if subRfMsg == nil && valid == true {
1119                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1120                 switch event.(type) {
1121                 case *e2ap.E2APSubscriptionResponse:
1122                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1123                         subs.SubRespRcvd = true
1124                 case *e2ap.E2APSubscriptionFailure:
1125                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1126                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1127                 case *SubmgrRestartTestEvent:
1128                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1129                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1130                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1131                         parentTrans.SendEvent(subRfMsg, 0)
1132                         return
1133                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1134                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1135                 default:
1136                         // Timer expiry
1137                         if subs.PolicyUpdate == false {
1138                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1139                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1140                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1141                         } else {
1142                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1143                         }
1144                 }
1145                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1146         } else {
1147                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1148         }
1149         if valid == false {
1150                 removeSubscriptionFromDb = true
1151         }
1152
1153         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1154         if err != nil {
1155                 valid = false
1156                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1157
1158         }
1159
1160         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1161         if valid == false {
1162                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1163         }
1164
1165         parentTrans.SendEvent(subRfMsg, 0)
1166 }
1167
1168 //-------------------------------------------------------------------
1169 // SUBS DELETE Handling
1170 //-------------------------------------------------------------------
1171
1172 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1173
1174         trans := c.tracker.NewSubsTransaction(subs)
1175         subs.WaitTransactionTurn(trans)
1176         defer subs.ReleaseTransactionTurn(trans)
1177         defer trans.Release()
1178
1179         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1180
1181         subs.mutex.Lock()
1182
1183         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1184                 subs.valid = false
1185                 subs.mutex.Unlock()
1186                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1187         } else {
1188                 subs.mutex.Unlock()
1189         }
1190
1191         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1192         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1193         parentTrans.SendEvent(nil, 0)
1194 }
1195
1196 //-------------------------------------------------------------------
1197 // send to E2T Subscription Request
1198 //-------------------------------------------------------------------
1199 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1200         var err error
1201         var event interface{} = nil
1202         var timedOut bool = false
1203         const ricRequestorId = 123
1204
1205         subReqMsg := subs.SubReqMsg
1206         subReqMsg.RequestId = subs.GetReqId().RequestId
1207         subReqMsg.RequestId.Id = ricRequestorId
1208         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1209         if err != nil {
1210                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1211                 return &PackSubscriptionRequestErrortEvent{
1212                         ErrorInfo{
1213                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1214                                 ErrorCause:  err.Error(),
1215                         },
1216                 }
1217         }
1218
1219         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1220         err = c.WriteSubscriptionToDb(subs)
1221         if err != nil {
1222                 return &SDLWriteErrortEvent{
1223                         ErrorInfo{
1224                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1225                                 ErrorCause:  err.Error(),
1226                         },
1227                 }
1228         }
1229
1230         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1231                 desc := fmt.Sprintf("(retry %d)", retries)
1232                 if retries == 0 {
1233                         c.UpdateCounter(cSubReqToE2)
1234                 } else {
1235                         c.UpdateCounter(cSubReReqToE2)
1236                 }
1237                 err := c.rmrSendToE2T(desc, subs, trans)
1238                 if err != nil {
1239                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1240                 }
1241
1242                 if subs.DoNotWaitSubResp == false {
1243                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1244                         if timedOut {
1245                                 c.UpdateCounter(cSubReqTimerExpiry)
1246                                 continue
1247                         }
1248                 } else {
1249                         // Simulating case where subscrition request has been sent but response has not been received before restart
1250                         event = &SubmgrRestartTestEvent{}
1251                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1252                 }
1253                 break
1254         }
1255         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1256         return event
1257 }
1258
1259 //-------------------------------------------------------------------
1260 // send to E2T Subscription Delete Request
1261 //-------------------------------------------------------------------
1262
1263 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1264         var err error
1265         var event interface{}
1266         var timedOut bool
1267         const ricRequestorId = 123
1268
1269         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1270         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1271         subDelReqMsg.RequestId.Id = ricRequestorId
1272         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1273         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1274         if err != nil {
1275                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1276                 return event
1277         }
1278
1279         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1280                 desc := fmt.Sprintf("(retry %d)", retries)
1281                 if retries == 0 {
1282                         c.UpdateCounter(cSubDelReqToE2)
1283                 } else {
1284                         c.UpdateCounter(cSubDelReReqToE2)
1285                 }
1286                 err := c.rmrSendToE2T(desc, subs, trans)
1287                 if err != nil {
1288                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1289                 }
1290                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1291                 if timedOut {
1292                         c.UpdateCounter(cSubDelReqTimerExpiry)
1293                         continue
1294                 }
1295                 break
1296         }
1297         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1298         return event
1299 }
1300
1301 //-------------------------------------------------------------------
1302 // handle from E2T Subscription Response
1303 //-------------------------------------------------------------------
1304 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1305         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1306         c.UpdateCounter(cSubRespFromE2)
1307
1308         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1309         if err != nil {
1310                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1311                 return
1312         }
1313         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1314         if err != nil {
1315                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1316                 return
1317         }
1318         trans := subs.GetTransaction()
1319         if trans == nil {
1320                 err = fmt.Errorf("Ongoing transaction not found")
1321                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1322                 return
1323         }
1324         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1325         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1326         if sendOk == false {
1327                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1328                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1329         }
1330         return
1331 }
1332
1333 //-------------------------------------------------------------------
1334 // handle from E2T Subscription Failure
1335 //-------------------------------------------------------------------
1336 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1337         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1338         c.UpdateCounter(cSubFailFromE2)
1339         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1340         if err != nil {
1341                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1342                 return
1343         }
1344         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1345         if err != nil {
1346                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1347                 return
1348         }
1349         trans := subs.GetTransaction()
1350         if trans == nil {
1351                 err = fmt.Errorf("Ongoing transaction not found")
1352                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1353                 return
1354         }
1355         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1356         if sendOk == false {
1357                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1358                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1359         }
1360         return
1361 }
1362
1363 //-------------------------------------------------------------------
1364 // handle from E2T Subscription Delete Response
1365 //-------------------------------------------------------------------
1366 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1367         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1368         c.UpdateCounter(cSubDelRespFromE2)
1369         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1370         if err != nil {
1371                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1372                 return
1373         }
1374         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1375         if err != nil {
1376                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1377                 return
1378         }
1379         trans := subs.GetTransaction()
1380         if trans == nil {
1381                 err = fmt.Errorf("Ongoing transaction not found")
1382                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1383                 return
1384         }
1385         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1386         if sendOk == false {
1387                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1388                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1389         }
1390         return
1391 }
1392
1393 //-------------------------------------------------------------------
1394 // handle from E2T Subscription Delete Failure
1395 //-------------------------------------------------------------------
1396 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1397         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1398         c.UpdateCounter(cSubDelFailFromE2)
1399         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1400         if err != nil {
1401                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1402                 return
1403         }
1404         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1405         if err != nil {
1406                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1407                 return
1408         }
1409         trans := subs.GetTransaction()
1410         if trans == nil {
1411                 err = fmt.Errorf("Ongoing transaction not found")
1412                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1413                 return
1414         }
1415         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1416         if sendOk == false {
1417                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1418                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1419         }
1420         return
1421 }
1422
1423 //-------------------------------------------------------------------
1424 //
1425 //-------------------------------------------------------------------
1426 func typeofSubsMessage(v interface{}) string {
1427         if v == nil {
1428                 return "NIL"
1429         }
1430         switch v.(type) {
1431         //case *e2ap.E2APSubscriptionRequest:
1432         //      return "SubReq"
1433         case *e2ap.E2APSubscriptionResponse:
1434                 return "SubResp"
1435         case *e2ap.E2APSubscriptionFailure:
1436                 return "SubFail"
1437         //case *e2ap.E2APSubscriptionDeleteRequest:
1438         //      return "SubDelReq"
1439         case *e2ap.E2APSubscriptionDeleteResponse:
1440                 return "SubDelResp"
1441         case *e2ap.E2APSubscriptionDeleteFailure:
1442                 return "SubDelFail"
1443         default:
1444                 return "Unknown"
1445         }
1446 }
1447
1448 //-------------------------------------------------------------------
1449 //
1450 //-------------------------------------------------------------------
1451 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1452         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1453         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1454         if err != nil {
1455                 xapp.Logger.Error("%v", err)
1456                 return err
1457         }
1458         return nil
1459 }
1460
1461 //-------------------------------------------------------------------
1462 //
1463 //-------------------------------------------------------------------
1464 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1465
1466         if removeSubscriptionFromDb == true {
1467                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1468                 c.RemoveSubscriptionFromDb(subs)
1469         } else {
1470                 // Update is needed for successful response and merge case here
1471                 if subs.RetryFromXapp == false {
1472                         err := c.WriteSubscriptionToDb(subs)
1473                         return err
1474                 }
1475         }
1476         subs.RetryFromXapp = false
1477         return nil
1478 }
1479
1480 //-------------------------------------------------------------------
1481 //
1482 //-------------------------------------------------------------------
1483 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1484         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1485         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1486         if err != nil {
1487                 xapp.Logger.Error("%v", err)
1488         }
1489 }
1490
1491 //-------------------------------------------------------------------
1492 //
1493 //-------------------------------------------------------------------
1494 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1495         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1496         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1497         if err != nil {
1498                 xapp.Logger.Error("%v", err)
1499         }
1500 }
1501
1502 //-------------------------------------------------------------------
1503 //
1504 //-------------------------------------------------------------------
1505 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1506
1507         if removeRestSubscriptionFromDb == true {
1508                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1509                 c.RemoveRESTSubscriptionFromDb(restSubId)
1510         } else {
1511                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1512         }
1513 }
1514
1515 //-------------------------------------------------------------------
1516 //
1517 //-------------------------------------------------------------------
1518 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1519         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1520         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1521         if err != nil {
1522                 xapp.Logger.Error("%v", err)
1523         }
1524 }
1525
1526 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1527
1528         if c.UTTesting == true {
1529                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1530                 c.registry.mutex = new(sync.Mutex)
1531         }
1532
1533         const ricRequestorId = 123
1534         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1535
1536         // Send delete for every endpoint in the subscription
1537         if subs.PolicyUpdate == false {
1538                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1539                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1540                 subDelReqMsg.RequestId.Id = ricRequestorId
1541                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1542                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1543                 if err != nil {
1544                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1545                         return
1546                 }
1547                 for _, endPoint := range subs.EpList.Endpoints {
1548                         params := &xapp.RMRParams{}
1549                         params.Mtype = mType
1550                         params.SubId = int(subs.GetReqId().InstanceId)
1551                         params.Xid = ""
1552                         params.Meid = subs.Meid
1553                         params.Src = endPoint.String()
1554                         params.PayloadLen = len(payload.Buf)
1555                         params.Payload = payload.Buf
1556                         params.Mbuf = nil
1557                         subs.DeleteFromDb = true
1558                         if !e2SubsDelRequired {
1559                                 c.handleXAPPSubscriptionDeleteRequest(params)
1560                         } else {
1561                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1562                         }
1563                 }
1564         }
1565 }
1566
1567 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1568
1569         fmt.Println("CRESTSubscriptionRequest")
1570
1571         if p == nil {
1572                 return
1573         }
1574
1575         if p.SubscriptionID != "" {
1576                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1577         } else {
1578                 fmt.Println("  SubscriptionID = ''")
1579         }
1580
1581         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1582
1583         if p.ClientEndpoint.HTTPPort != nil {
1584                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1585         } else {
1586                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1587         }
1588
1589         if p.ClientEndpoint.RMRPort != nil {
1590                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1591         } else {
1592                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1593         }
1594
1595         if p.Meid != nil {
1596                 fmt.Printf("  Meid = %s\n", *p.Meid)
1597         } else {
1598                 fmt.Println("  Meid = nil")
1599         }
1600
1601         if p.E2SubscriptionDirectives == nil {
1602                 fmt.Println("  E2SubscriptionDirectives = nil")
1603         } else {
1604                 fmt.Println("  E2SubscriptionDirectives")
1605                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1606                         fmt.Println("    E2RetryCount == nil")
1607                 } else {
1608                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1609                 }
1610                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1611                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1612         }
1613         for _, subscriptionDetail := range p.SubscriptionDetails {
1614                 if p.RANFunctionID != nil {
1615                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1616                 } else {
1617                         fmt.Println("  RANFunctionID = nil")
1618                 }
1619                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1620                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1621
1622                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1623                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1624                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1625                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1626
1627                         if actionToBeSetup.SubsequentAction != nil {
1628                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1629                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1630                         } else {
1631                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1632                         }
1633                 }
1634         }
1635 }
1636
1637 //-------------------------------------------------------------------
1638 // handle from E2T Subscription Delete Required
1639 //-------------------------------------------------------------------
1640 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1641         xapp.Logger.Info("MSG from E2T: %s", params.String())
1642         c.UpdateCounter(cSubDelRequFromE2)
1643         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1644         if err != nil {
1645                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1646                 //c.sendE2TErrorIndication(nil)
1647                 return
1648         }
1649         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1650         var subDB = []*Subscription{}
1651         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1652                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1653                 if err != nil {
1654                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1655                         continue
1656                 }
1657                 // Check if Delete Subscription Already triggered
1658                 if subs.OngoingDelCount > 0 {
1659                         continue
1660                 }
1661                 subDB = append(subDB, subs)
1662                 for _, endpoint := range subs.EpList.Endpoints {
1663                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1664                 }
1665                 // Sending Subscription Delete Request to E2T
1666                 //      c.SendSubscriptionDeleteReq(subs, true)
1667         }
1668         for _, subsTobeRemove := range subDB {
1669                 // Sending Subscription Delete Request to E2T
1670                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1671         }
1672 }
1673
1674 //-----------------------------------------------------------------
1675 // Initiate RIC Subscription Delete Request after receiving
1676 // RIC Subscription Delete Required from E2T
1677 //-----------------------------------------------------------------
1678 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1679         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1680         c.UpdateCounter(cSubDelReqToE2)
1681
1682         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1683                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1684                 return
1685         }
1686
1687         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1688         if trans == nil {
1689                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1690                 return
1691         }
1692         defer trans.Release()
1693
1694         err := c.tracker.Track(trans)
1695         if err != nil {
1696                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1697                 return
1698         }
1699
1700         //
1701         // Wake subs delete
1702         //
1703         subs.OngoingDelCount++
1704         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1705         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1706         subs.OngoingDelCount--
1707
1708         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1709
1710         if subs.NoRespToXapp == true {
1711                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1712                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1713                 return
1714         }
1715 }