227eb0c6fbd8c852fc2539958a13685201d45104
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75
76 type Control struct {
77         *xapp.RMRClient
78         e2ap              *E2ap
79         registry          *Registry
80         tracker           *Tracker
81         restDuplicateCtrl *DuplicateCtrl
82         e2IfState         *E2IfState
83         e2IfStateDb       XappRnibInterface
84         e2SubsDb          Sdlnterface
85         restSubsDb        Sdlnterface
86         CntRecvMsg        uint64
87         ResetTestFlag     bool
88         Counters          map[string]xapp.Counter
89         LoggerLevel       int
90         UTTesting         bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         restDuplicateCtrl := new(DuplicateCtrl)
137         restDuplicateCtrl.Init()
138
139         e2IfState := new(E2IfState)
140
141         c := &Control{e2ap: new(E2ap),
142                 registry:          registry,
143                 tracker:           tracker,
144                 restDuplicateCtrl: restDuplicateCtrl,
145                 e2IfState:         e2IfState,
146                 e2IfStateDb:       CreateXappRnibIfInstance(),
147                 e2SubsDb:          CreateSdl(),
148                 restSubsDb:        CreateRESTSdl(),
149                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
150                 LoggerLevel:       1,
151         }
152
153         e2IfState.Init(c)
154         c.ReadConfigParameters("")
155
156         // Register REST handler for testing support
157         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
160
161         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
163
164         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
167
168         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
170
171         if readSubsFromDb == "true" {
172                 // Read subscriptions from db
173                 err := c.ReadE2Subscriptions()
174                 if err != nil {
175                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
176                 }
177                 err = c.ReadRESTSubscriptions()
178                 if err != nil {
179                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
180                 }
181         }
182
183         go func() {
184                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
185                 if err != nil {
186                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
187                 }
188         }()
189         return c
190 }
191
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193         subscriptions, err := c.registry.QueryHandler()
194         if err != nil {
195                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
196         }
197
198         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
199 }
200
201 //-------------------------------------------------------------------
202 //
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205         xapp.Logger.Debug("RESTQueryHandler() called")
206
207         c.CntRecvMsg++
208
209         return c.registry.QueryHandler()
210 }
211
212 //-------------------------------------------------------------------
213 //
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
216         var err error
217         var subIds []uint32
218         var register map[uint32]*Subscription
219         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220                 xapp.Logger.Debug("Reading E2 subscriptions from db")
221                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
222                 if err != nil {
223                         xapp.Logger.Error("%v", err)
224                         <-time.After(1 * time.Second)
225                 } else {
226                         c.registry.subIds = subIds
227                         c.registry.register = register
228                         go c.HandleUncompletedSubscriptions(register)
229                         return nil
230                 }
231         }
232         xapp.Logger.Debug("Continuing without retring")
233         return err
234 }
235
236 //-------------------------------------------------------------------
237 //
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
240
241         xapp.Logger.Debug("ReadRESTSubscriptions()")
242         var err error
243         var restSubscriptions map[string]*RESTSubscription
244         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245                 xapp.Logger.Debug("Reading REST subscriptions from db")
246                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
247                 if err != nil {
248                         xapp.Logger.Error("%v", err)
249                         <-time.After(1 * time.Second)
250                 } else {
251                         // Fix REST subscriptions ongoing status after restart
252                         for restSubId, restSubscription := range restSubscriptions {
253                                 restSubscription.SubReqOngoing = false
254                                 restSubscription.SubDelReqOngoing = false
255                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
256                                 if err != nil {
257                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
258                                 }
259                         }
260                         c.registry.restSubscriptions = restSubscriptions
261                         return nil
262                 }
263         }
264         xapp.Logger.Debug("Continuing without retring")
265         return err
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
272
273         xapp.Logger.Debug("ReadConfigParameters")
274
275         c.LoggerLevel = int(xapp.Logger.GetLevel())
276         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
278
279         // viper.GetDuration returns nanoseconds
280         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281         if e2tSubReqTimeout == 0 {
282                 e2tSubReqTimeout = 2000 * 1000000
283                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
284         }
285         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
286
287         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288         if e2tSubDelReqTime == 0 {
289                 e2tSubDelReqTime = 2000 * 1000000
290                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
291         }
292         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
293
294         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295         if e2tRecvMsgTimeout == 0 {
296                 e2tRecvMsgTimeout = 2000 * 1000000
297                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
298         }
299         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
300
301         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302         if e2tMaxSubReqTryCount == 0 {
303                 e2tMaxSubReqTryCount = 1
304                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
305         }
306         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
307
308         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309         if e2tMaxSubDelReqTryCount == 0 {
310                 e2tMaxSubDelReqTryCount = 1
311                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
312         }
313         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
314
315         checkE2State = viper.GetString("controls.checkE2State")
316         if checkE2State == "" {
317                 checkE2State = "true"
318                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
319         }
320         xapp.Logger.Debug("checkE2State= %v", checkE2State)
321
322         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323         if readSubsFromDb == "" {
324                 readSubsFromDb = "true"
325                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
326         }
327         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
328
329         dbTryCount = viper.GetInt("controls.dbTryCount")
330         if dbTryCount == 0 {
331                 dbTryCount = 200
332                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
333         }
334         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
335
336         dbRetryForever = viper.GetString("controls.dbRetryForever")
337         if dbRetryForever == "" {
338                 dbRetryForever = "true"
339                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
340         }
341         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
342
343         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344         // value 100ms used currently only in unittests.
345         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346         if waitRouteCleanup_ms == 0 {
347                 waitRouteCleanup_ms = 5000 * 1000000
348                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
349         }
350         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
351 }
352
353 //-------------------------------------------------------------------
354 //
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
357
358         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359         for subId, subs := range register {
360                 if subs.SubRespRcvd == false {
361                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362                         if subs.PolicyUpdate == false {
363                                 subs.NoRespToXapp = true
364                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365                                 c.SendSubscriptionDeleteReq(subs, false)
366                         }
367                 }
368         }
369 }
370
371 func (c *Control) ReadyCB(data interface{}) {
372         if c.RMRClient == nil {
373                 c.RMRClient = xapp.Rmr
374         }
375 }
376
377 func (c *Control) Run() {
378         xapp.SetReadyCB(c.ReadyCB, nil)
379         xapp.AddConfigChangeListener(c.ReadConfigParameters)
380         xapp.Run(c)
381 }
382
383 //-------------------------------------------------------------------
384 //
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
387
388         var restSubId string
389         var restSubscription *RESTSubscription
390         var err error
391
392         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393         if p.SubscriptionID == "" {
394                 // Subscription does not contain REST subscription Id
395                 if exists {
396                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397                         if restSubscription != nil {
398                                 // Subscription not found
399                                 restSubId = prevRestSubsId
400                                 if err == nil {
401                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
402                                 } else {
403                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
404                                 }
405                         } else {
406                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
408                         }
409                 }
410
411                 if restSubscription == nil {
412                         restSubId = ksuid.New().String()
413                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
414                 }
415         } else {
416                 // Subscription contains REST subscription Id
417                 restSubId = p.SubscriptionID
418
419                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
421                 if err != nil {
422                         // Subscription with id in REST request does not exist
423                         xapp.Logger.Error("%s", err.Error())
424                         c.UpdateCounter(cRestSubFailToXapp)
425                         return nil, "", err
426                 }
427
428                 if !exists {
429                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
430                 } else {
431                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
432                 }
433         }
434
435         return restSubscription, restSubId, nil
436 }
437
438 //-------------------------------------------------------------------
439 //
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
442
443         c.CntRecvMsg++
444         c.UpdateCounter(cRestSubReqFromXapp)
445
446         subResp := models.SubscriptionResponse{}
447         p := params.(*models.SubscriptionParams)
448
449         if c.LoggerLevel > 2 {
450                 c.PrintRESTSubscriptionRequest(p)
451         }
452
453         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
454                 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
455                         xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
456                 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
457                         xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
458                 }
459                 c.UpdateCounter(cRestReqRejDueE2Down)
460                 return nil, common.SubscribeServiceUnavailableCode
461         }
462
463         if p.ClientEndpoint == nil {
464                 err := fmt.Errorf("ClientEndpoint == nil")
465                 xapp.Logger.Error("%v", err)
466                 c.UpdateCounter(cRestSubFailToXapp)
467                 return nil, common.SubscribeBadRequestCode
468         }
469
470         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
471         if err != nil {
472                 xapp.Logger.Error("%s", err)
473                 c.UpdateCounter(cRestSubFailToXapp)
474                 return nil, common.SubscribeBadRequestCode
475         }
476         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
477         if err != nil {
478                 xapp.Logger.Error("%s", err.Error())
479                 c.UpdateCounter(cRestSubFailToXapp)
480                 return nil, common.SubscribeBadRequestCode
481         }
482
483         md5sum, err := CalculateRequestMd5sum(params)
484         if err != nil {
485                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
486         }
487
488         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
489         if err != nil {
490                 xapp.Logger.Error("Subscription with id in REST request does not exist")
491                 return nil, common.SubscribeNotFoundCode
492         }
493
494         subResp.SubscriptionID = &restSubId
495         subReqList := e2ap.SubscriptionRequestList{}
496         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
497         if err != nil {
498                 xapp.Logger.Error("%s", err.Error())
499                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
500                 c.registry.DeleteRESTSubscription(&restSubId)
501                 c.UpdateCounter(cRestSubFailToXapp)
502                 return nil, common.SubscribeBadRequestCode
503         }
504
505         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
506         if duplicate {
507                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
508                 xapp.Logger.Debug("%s", err)
509                 c.registry.DeleteRESTSubscription(&restSubId)
510                 c.UpdateCounter(cRestSubRespToXapp)
511                 return &subResp, common.SubscribeCreatedCode
512         }
513
514         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
515         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
516
517         c.UpdateCounter(cRestSubRespToXapp)
518         return &subResp, common.SubscribeCreatedCode
519 }
520
521 //-------------------------------------------------------------------
522 //
523 //-------------------------------------------------------------------
524 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
525
526         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
527         if p == nil || p.E2SubscriptionDirectives == nil {
528                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
529                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
530                 e2SubscriptionDirectives.CreateRMRRoute = true
531                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
532         } else {
533                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
534                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
535                 } else {
536                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
537                 }
538                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
539                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
540                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
541                 } else {
542                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
543                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
544                         } else {
545                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
546                         }
547                 }
548                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
549         }
550         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
551         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
552         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
553         return e2SubscriptionDirectives, nil
554 }
555
556 //-------------------------------------------------------------------
557 //
558 //-------------------------------------------------------------------
559
560 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
561         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
562
563         c.SubscriptionProcessingStartDelay()
564         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
565
566         var xAppEventInstanceID int64
567         var e2EventInstanceID int64
568         errorInfo := &ErrorInfo{}
569
570         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
571
572         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
573                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
574                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
575
576                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
577                 if trans == nil {
578                         // Send notification to xApp that prosessing of a Subscription Request has failed.
579                         err := fmt.Errorf("Tracking failure")
580                         errorInfo.ErrorCause = err.Error()
581                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
582                         continue
583                 }
584
585                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
586
587                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
588
589                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
590                 trans.Release()
591
592                 if err != nil {
593                         if err.Error() == "TEST: restart event received" {
594                                 // This is just for UT cases. Stop here subscription processing
595                                 return
596                         }
597                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
598                 } else {
599                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
600                         restSubscription.AddMd5Sum(md5sum)
601                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
602                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
603                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
604                 }
605         }
606 }
607
608 //-------------------------------------------------------------------
609 //
610 //------------------------------------------------------------------
611 func (c *Control) SubscriptionProcessingStartDelay() {
612         if c.UTTesting == true {
613                 // This is temporary fix for the UT problem that notification arrives before subscription response
614                 // Correct fix would be to allow notification come before response and process it correctly
615                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
616                 <-time.After(time.Millisecond * 50)
617                 xapp.Logger.Debug("Continuing after delay")
618         }
619 }
620
621 //-------------------------------------------------------------------
622 //
623 //------------------------------------------------------------------
624 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
625         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
626
627         errorInfo := ErrorInfo{}
628
629         err := c.tracker.Track(trans)
630         if err != nil {
631                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
632                 errorInfo.ErrorCause = err.Error()
633                 err = fmt.Errorf("Tracking failure")
634                 return nil, &errorInfo, err
635         }
636
637         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
638         if err != nil {
639                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
640                 return nil, &errorInfo, err
641         }
642
643         //
644         // Wake subs request
645         //
646         subs.OngoingReqCount++
647         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
648         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
649         subs.OngoingReqCount--
650
651         err = nil
652         if event != nil {
653                 switch themsg := event.(type) {
654                 case *e2ap.E2APSubscriptionResponse:
655                         trans.Release()
656                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
657                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
658                                 return themsg, &errorInfo, nil
659                         } else {
660                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
661                                 c.RemoveSubscriptionFromDb(subs)
662                                 err = fmt.Errorf("E2 interface down")
663                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
664                         }
665                 case *e2ap.E2APSubscriptionFailure:
666                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
667                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
668                 case *PackSubscriptionRequestErrortEvent:
669                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
670                         errorInfo = themsg.ErrorInfo
671                 case *SDLWriteErrortEvent:
672                         err = fmt.Errorf("SDL write failure")
673                         errorInfo = themsg.ErrorInfo
674                 case *SubmgrRestartTestEvent:
675                         err = fmt.Errorf("TEST: restart event received")
676                         xapp.Logger.Debug("%s", err)
677                         return nil, &errorInfo, err
678                 default:
679                         err = fmt.Errorf("Unexpected E2 subscription response received")
680                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
681                         break
682                 }
683         } else {
684                 // Timer expiry
685                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
686                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
687                 if subs.PolicyUpdate == true {
688                         return nil, &errorInfo, err
689                 }
690         }
691
692         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
693         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
694
695         return nil, &errorInfo, err
696 }
697
698 //-------------------------------------------------------------------
699 //
700 //-------------------------------------------------------------------
701 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
702         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
703
704         // Send notification to xApp that prosessing of a Subscription Request has failed.
705         e2EventInstanceID := (int64)(0)
706         if errorInfo.ErrorSource == "" {
707                 // Submgr is default source of error
708                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
709         }
710         resp := &models.SubscriptionResponse{
711                 SubscriptionID: restSubId,
712                 SubscriptionInstances: []*models.SubscriptionInstance{
713                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
714                                 ErrorCause:          errorInfo.ErrorCause,
715                                 ErrorSource:         errorInfo.ErrorSource,
716                                 TimeoutType:         errorInfo.TimeoutType,
717                                 XappEventInstanceID: &xAppEventInstanceID},
718                 },
719         }
720         // Mark REST subscription request processed.
721         restSubscription.SetProcessed(err)
722         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
723         if trans != nil {
724                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
725                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
726         } else {
727                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
728                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
729         }
730
731         c.UpdateCounter(cRestSubFailNotifToXapp)
732         err = xapp.Subscription.Notify(resp, *clientEndpoint)
733         if err != nil {
734                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
735         }
736
737         // E2 is down. Delete completely processed request safely now
738         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
739                 c.registry.DeleteRESTSubscription(restSubId)
740                 c.RemoveRESTSubscriptionFromDb(*restSubId)
741         }
742 }
743
744 //-------------------------------------------------------------------
745 //
746 //-------------------------------------------------------------------
747 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
748         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
749
750         // Store successfully processed InstanceId for deletion
751         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
752         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
753
754         // Send notification to xApp that a Subscription Request has been processed.
755         resp := &models.SubscriptionResponse{
756                 SubscriptionID: restSubId,
757                 SubscriptionInstances: []*models.SubscriptionInstance{
758                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
759                                 ErrorCause:          errorInfo.ErrorCause,
760                                 ErrorSource:         errorInfo.ErrorSource,
761                                 XappEventInstanceID: &xAppEventInstanceID},
762                 },
763         }
764         // Mark REST subscription request processesd.
765         restSubscription.SetProcessed(nil)
766         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
767         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
768                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
769         c.UpdateCounter(cRestSubNotifToXapp)
770         err := xapp.Subscription.Notify(resp, *clientEndpoint)
771         if err != nil {
772                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
773         }
774
775         // E2 is down. Delete completely processed request safely now
776         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
777                 c.registry.DeleteRESTSubscription(restSubId)
778                 c.RemoveRESTSubscriptionFromDb(*restSubId)
779         }
780 }
781
782 //-------------------------------------------------------------------
783 //
784 //-------------------------------------------------------------------
785 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
786
787         c.CntRecvMsg++
788         c.UpdateCounter(cRestSubDelReqFromXapp)
789
790         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
791
792         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
793         if err != nil {
794                 xapp.Logger.Error("%s", err.Error())
795                 if restSubscription == nil {
796                         // Subscription was not found
797                         c.UpdateCounter(cRestSubDelRespToXapp)
798                         return common.UnsubscribeNoContentCode
799                 } else {
800                         if restSubscription.SubReqOngoing == true {
801                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
802                                 xapp.Logger.Error("%s", err.Error())
803                                 c.UpdateCounter(cRestSubDelFailToXapp)
804                                 return common.UnsubscribeBadRequestCode
805                         } else if restSubscription.SubDelReqOngoing == true {
806                                 // Previous request for same restSubId still ongoing
807                                 c.UpdateCounter(cRestSubDelRespToXapp)
808                                 return common.UnsubscribeNoContentCode
809                         }
810                 }
811         }
812
813         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
814         go func() {
815                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
816                 for _, instanceId := range restSubscription.InstanceIds {
817                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
818
819                         if err != nil {
820                                 xapp.Logger.Error("%s", err.Error())
821                         }
822                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
823                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
824                         restSubscription.DeleteE2InstanceId(instanceId)
825                 }
826                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
827                 c.registry.DeleteRESTSubscription(&restSubId)
828                 c.RemoveRESTSubscriptionFromDb(restSubId)
829         }()
830
831         c.UpdateCounter(cRestSubDelRespToXapp)
832         return common.UnsubscribeNoContentCode
833 }
834
835 //-------------------------------------------------------------------
836 //
837 //-------------------------------------------------------------------
838 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
839
840         var xAppEventInstanceID int64
841         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
842         if err != nil {
843                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
844                         restSubId, instanceId, idstring(err, nil))
845                 return xAppEventInstanceID, nil
846         }
847
848         xAppEventInstanceID = int64(subs.ReqId.Id)
849         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
850         if trans == nil {
851                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
852                 xapp.Logger.Error("%s", err.Error())
853         }
854         defer trans.Release()
855
856         err = c.tracker.Track(trans)
857         if err != nil {
858                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
859                 xapp.Logger.Error("%s", err.Error())
860                 return xAppEventInstanceID, &time.ParseError{}
861         }
862         //
863         // Wake subs delete
864         //
865         subs.OngoingDelCount++
866         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
867         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
868         subs.OngoingDelCount--
869
870         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
871
872         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
873
874         return xAppEventInstanceID, nil
875 }
876
877 //-------------------------------------------------------------------
878 //
879 //-------------------------------------------------------------------
880
881 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
882         params := &xapp.RMRParams{}
883         params.Mtype = trans.GetMtype()
884         params.SubId = int(subs.GetReqId().InstanceId)
885         params.Xid = ""
886         params.Meid = subs.GetMeid()
887         params.Src = ""
888         params.PayloadLen = len(trans.Payload.Buf)
889         params.Payload = trans.Payload.Buf
890         params.Mbuf = nil
891         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
892         err = c.SendWithRetry(params, false, 5)
893         if err != nil {
894                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
895         }
896         return err
897 }
898
899 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
900
901         params := &xapp.RMRParams{}
902         params.Mtype = trans.GetMtype()
903         params.SubId = int(subs.GetReqId().InstanceId)
904         params.Xid = trans.GetXid()
905         params.Meid = trans.GetMeid()
906         params.Src = ""
907         params.PayloadLen = len(trans.Payload.Buf)
908         params.Payload = trans.Payload.Buf
909         params.Mbuf = nil
910         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
911         err = c.SendWithRetry(params, false, 5)
912         if err != nil {
913                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
914         }
915         return err
916 }
917
918 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
919         if c.RMRClient == nil {
920                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
921                 xapp.Logger.Error("%s", err.Error())
922                 return
923         }
924         c.CntRecvMsg++
925
926         defer c.RMRClient.Free(msg.Mbuf)
927
928         // xapp-frame might use direct access to c buffer and
929         // when msg.Mbuf is freed, someone might take it into use
930         // and payload data might be invalid inside message handle function
931         //
932         // subscriptions won't load system a lot so there is no
933         // real performance hit by cloning buffer into new go byte slice
934         cPay := append(msg.Payload[:0:0], msg.Payload...)
935         msg.Payload = cPay
936         msg.PayloadLen = len(cPay)
937
938         switch msg.Mtype {
939         case xapp.RIC_SUB_REQ:
940                 go c.handleXAPPSubscriptionRequest(msg)
941         case xapp.RIC_SUB_RESP:
942                 go c.handleE2TSubscriptionResponse(msg)
943         case xapp.RIC_SUB_FAILURE:
944                 go c.handleE2TSubscriptionFailure(msg)
945         case xapp.RIC_SUB_DEL_REQ:
946                 go c.handleXAPPSubscriptionDeleteRequest(msg)
947         case xapp.RIC_SUB_DEL_RESP:
948                 go c.handleE2TSubscriptionDeleteResponse(msg)
949         case xapp.RIC_SUB_DEL_FAILURE:
950                 go c.handleE2TSubscriptionDeleteFailure(msg)
951         case xapp.RIC_SUB_DEL_REQUIRED:
952                 go c.handleE2TSubscriptionDeleteRequired(msg)
953         default:
954                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
955         }
956         return
957 }
958
959 //-------------------------------------------------------------------
960 // handle from XAPP Subscription Request
961 //------------------------------------------------------------------
962 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
963         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
964         c.UpdateCounter(cSubReqFromXapp)
965
966         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
967                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
968                 return
969         }
970
971         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
972         if err != nil {
973                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
974                 return
975         }
976
977         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
978         if trans == nil {
979                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
980                 return
981         }
982         defer trans.Release()
983
984         if err = c.tracker.Track(trans); err != nil {
985                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
986                 return
987         }
988
989         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
990         if err != nil {
991                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
992                 return
993         }
994
995         c.wakeSubscriptionRequest(subs, trans)
996 }
997
998 //-------------------------------------------------------------------
999 // Wake Subscription Request to E2node
1000 //------------------------------------------------------------------
1001 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1002
1003         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1004         if err != nil {
1005                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1006         }
1007         subs.OngoingReqCount++
1008         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1009         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1010         subs.OngoingReqCount--
1011         if event != nil {
1012                 switch themsg := event.(type) {
1013                 case *e2ap.E2APSubscriptionResponse:
1014                         themsg.RequestId.Id = trans.RequestId.Id
1015                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1016                         if err == nil {
1017                                 trans.Release()
1018                                 c.UpdateCounter(cSubRespToXapp)
1019                                 err := c.rmrSendToXapp("", subs, trans)
1020                                 if err != nil {
1021                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1022                                 }
1023                                 return
1024                         }
1025                 case *e2ap.E2APSubscriptionFailure:
1026                         themsg.RequestId.Id = trans.RequestId.Id
1027                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1028                         if err == nil {
1029                                 c.UpdateCounter(cSubFailToXapp)
1030                                 c.rmrSendToXapp("", subs, trans)
1031                         }
1032                 default:
1033                         break
1034                 }
1035         }
1036         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1037 }
1038
1039 //-------------------------------------------------------------------
1040 // handle from XAPP Subscription Delete Request
1041 //------------------------------------------------------------------
1042 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1043         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1044         c.UpdateCounter(cSubDelReqFromXapp)
1045
1046         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1047                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1048                 return
1049         }
1050
1051         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1052         if err != nil {
1053                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1054                 return
1055         }
1056
1057         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1058         if trans == nil {
1059                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1060                 return
1061         }
1062         defer trans.Release()
1063
1064         err = c.tracker.Track(trans)
1065         if err != nil {
1066                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1067                 return
1068         }
1069
1070         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1071         if err != nil {
1072                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1073                 return
1074         }
1075
1076         //
1077         // Wake subs delete
1078         //
1079         subs.OngoingDelCount++
1080         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1081         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1082         subs.OngoingDelCount--
1083
1084         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1085
1086         if subs.NoRespToXapp == true {
1087                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1088                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1089                 return
1090         }
1091
1092         // Whatever is received success, fail or timeout, send successful delete response
1093         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1094         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1095         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1096         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1097         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1098         if err == nil {
1099                 c.UpdateCounter(cSubDelRespToXapp)
1100                 err := c.rmrSendToXapp("", subs, trans)
1101                 if err != nil {
1102                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1103                 }
1104         }
1105 }
1106
1107 //-------------------------------------------------------------------
1108 // SUBS CREATE Handling
1109 //-------------------------------------------------------------------
1110 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1111
1112         var event interface{} = nil
1113         var removeSubscriptionFromDb bool = false
1114         trans := c.tracker.NewSubsTransaction(subs)
1115         subs.WaitTransactionTurn(trans)
1116         defer subs.ReleaseTransactionTurn(trans)
1117         defer trans.Release()
1118
1119         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1120
1121         subRfMsg, valid := subs.GetCachedResponse()
1122         if subRfMsg == nil && valid == true {
1123                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1124                 switch event.(type) {
1125                 case *e2ap.E2APSubscriptionResponse:
1126                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1127                         subs.SubRespRcvd = true
1128                 case *e2ap.E2APSubscriptionFailure:
1129                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1130                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1131                 case *SubmgrRestartTestEvent:
1132                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1133                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1134                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1135                         parentTrans.SendEvent(subRfMsg, 0)
1136                         return
1137                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1138                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1139                 default:
1140                         // Timer expiry
1141                         if subs.PolicyUpdate == false {
1142                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1143                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1144                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1145                         } else {
1146                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1147                         }
1148                 }
1149                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1150         } else {
1151                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1152         }
1153         if valid == false {
1154                 removeSubscriptionFromDb = true
1155         }
1156
1157         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1158         if err != nil {
1159                 valid = false
1160                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1161
1162         }
1163
1164         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1165         if valid == false {
1166                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1167         }
1168
1169         parentTrans.SendEvent(subRfMsg, 0)
1170 }
1171
1172 //-------------------------------------------------------------------
1173 // SUBS DELETE Handling
1174 //-------------------------------------------------------------------
1175
1176 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1177
1178         trans := c.tracker.NewSubsTransaction(subs)
1179         subs.WaitTransactionTurn(trans)
1180         defer subs.ReleaseTransactionTurn(trans)
1181         defer trans.Release()
1182
1183         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1184
1185         subs.mutex.Lock()
1186
1187         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1188                 subs.valid = false
1189                 subs.mutex.Unlock()
1190                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1191         } else {
1192                 subs.mutex.Unlock()
1193         }
1194
1195         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1196         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1197         parentTrans.SendEvent(nil, 0)
1198 }
1199
1200 //-------------------------------------------------------------------
1201 // send to E2T Subscription Request
1202 //-------------------------------------------------------------------
1203 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1204         var err error
1205         var event interface{} = nil
1206         var timedOut bool = false
1207         const ricRequestorId = 123
1208
1209         subReqMsg := subs.SubReqMsg
1210         subReqMsg.RequestId = subs.GetReqId().RequestId
1211         subReqMsg.RequestId.Id = ricRequestorId
1212         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1213         if err != nil {
1214                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1215                 return &PackSubscriptionRequestErrortEvent{
1216                         ErrorInfo{
1217                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1218                                 ErrorCause:  err.Error(),
1219                         },
1220                 }
1221         }
1222
1223         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1224         err = c.WriteSubscriptionToDb(subs)
1225         if err != nil {
1226                 return &SDLWriteErrortEvent{
1227                         ErrorInfo{
1228                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1229                                 ErrorCause:  err.Error(),
1230                         },
1231                 }
1232         }
1233
1234         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1235                 desc := fmt.Sprintf("(retry %d)", retries)
1236                 if retries == 0 {
1237                         c.UpdateCounter(cSubReqToE2)
1238                 } else {
1239                         c.UpdateCounter(cSubReReqToE2)
1240                 }
1241                 err := c.rmrSendToE2T(desc, subs, trans)
1242                 if err != nil {
1243                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1244                 }
1245
1246                 if subs.DoNotWaitSubResp == false {
1247                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1248                         if timedOut {
1249                                 c.UpdateCounter(cSubReqTimerExpiry)
1250                                 continue
1251                         }
1252                 } else {
1253                         // Simulating case where subscrition request has been sent but response has not been received before restart
1254                         event = &SubmgrRestartTestEvent{}
1255                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1256                 }
1257                 break
1258         }
1259         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1260         return event
1261 }
1262
1263 //-------------------------------------------------------------------
1264 // send to E2T Subscription Delete Request
1265 //-------------------------------------------------------------------
1266
1267 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1268         var err error
1269         var event interface{}
1270         var timedOut bool
1271         const ricRequestorId = 123
1272
1273         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1274         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1275         subDelReqMsg.RequestId.Id = ricRequestorId
1276         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1277         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1278         if err != nil {
1279                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1280                 return event
1281         }
1282
1283         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1284                 desc := fmt.Sprintf("(retry %d)", retries)
1285                 if retries == 0 {
1286                         c.UpdateCounter(cSubDelReqToE2)
1287                 } else {
1288                         c.UpdateCounter(cSubDelReReqToE2)
1289                 }
1290                 err := c.rmrSendToE2T(desc, subs, trans)
1291                 if err != nil {
1292                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1293                 }
1294                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1295                 if timedOut {
1296                         c.UpdateCounter(cSubDelReqTimerExpiry)
1297                         continue
1298                 }
1299                 break
1300         }
1301         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1302         return event
1303 }
1304
1305 //-------------------------------------------------------------------
1306 // handle from E2T Subscription Response
1307 //-------------------------------------------------------------------
1308 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1309         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1310         c.UpdateCounter(cSubRespFromE2)
1311
1312         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1313         if err != nil {
1314                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1315                 return
1316         }
1317         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1318         if err != nil {
1319                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1320                 return
1321         }
1322         trans := subs.GetTransaction()
1323         if trans == nil {
1324                 err = fmt.Errorf("Ongoing transaction not found")
1325                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1326                 return
1327         }
1328         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1329         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1330         if sendOk == false {
1331                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1332                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1333         }
1334         return
1335 }
1336
1337 //-------------------------------------------------------------------
1338 // handle from E2T Subscription Failure
1339 //-------------------------------------------------------------------
1340 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1341         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1342         c.UpdateCounter(cSubFailFromE2)
1343         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1344         if err != nil {
1345                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1346                 return
1347         }
1348         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1349         if err != nil {
1350                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1351                 return
1352         }
1353         trans := subs.GetTransaction()
1354         if trans == nil {
1355                 err = fmt.Errorf("Ongoing transaction not found")
1356                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1357                 return
1358         }
1359         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1360         if sendOk == false {
1361                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1362                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1363         }
1364         return
1365 }
1366
1367 //-------------------------------------------------------------------
1368 // handle from E2T Subscription Delete Response
1369 //-------------------------------------------------------------------
1370 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1371         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1372         c.UpdateCounter(cSubDelRespFromE2)
1373         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1374         if err != nil {
1375                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1376                 return
1377         }
1378         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1379         if err != nil {
1380                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1381                 return
1382         }
1383         trans := subs.GetTransaction()
1384         if trans == nil {
1385                 err = fmt.Errorf("Ongoing transaction not found")
1386                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1387                 return
1388         }
1389         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1390         if sendOk == false {
1391                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1392                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1393         }
1394         return
1395 }
1396
1397 //-------------------------------------------------------------------
1398 // handle from E2T Subscription Delete Failure
1399 //-------------------------------------------------------------------
1400 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1401         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1402         c.UpdateCounter(cSubDelFailFromE2)
1403         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1404         if err != nil {
1405                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1406                 return
1407         }
1408         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1409         if err != nil {
1410                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1411                 return
1412         }
1413         trans := subs.GetTransaction()
1414         if trans == nil {
1415                 err = fmt.Errorf("Ongoing transaction not found")
1416                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1417                 return
1418         }
1419         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1420         if sendOk == false {
1421                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1422                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1423         }
1424         return
1425 }
1426
1427 //-------------------------------------------------------------------
1428 //
1429 //-------------------------------------------------------------------
1430 func typeofSubsMessage(v interface{}) string {
1431         if v == nil {
1432                 return "NIL"
1433         }
1434         switch v.(type) {
1435         //case *e2ap.E2APSubscriptionRequest:
1436         //      return "SubReq"
1437         case *e2ap.E2APSubscriptionResponse:
1438                 return "SubResp"
1439         case *e2ap.E2APSubscriptionFailure:
1440                 return "SubFail"
1441         //case *e2ap.E2APSubscriptionDeleteRequest:
1442         //      return "SubDelReq"
1443         case *e2ap.E2APSubscriptionDeleteResponse:
1444                 return "SubDelResp"
1445         case *e2ap.E2APSubscriptionDeleteFailure:
1446                 return "SubDelFail"
1447         default:
1448                 return "Unknown"
1449         }
1450 }
1451
1452 //-------------------------------------------------------------------
1453 //
1454 //-------------------------------------------------------------------
1455 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1456         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1457         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1458         if err != nil {
1459                 xapp.Logger.Error("%v", err)
1460                 return err
1461         }
1462         return nil
1463 }
1464
1465 //-------------------------------------------------------------------
1466 //
1467 //-------------------------------------------------------------------
1468 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1469
1470         if removeSubscriptionFromDb == true {
1471                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1472                 c.RemoveSubscriptionFromDb(subs)
1473         } else {
1474                 // Update is needed for successful response and merge case here
1475                 if subs.RetryFromXapp == false {
1476                         err := c.WriteSubscriptionToDb(subs)
1477                         return err
1478                 }
1479         }
1480         subs.RetryFromXapp = false
1481         return nil
1482 }
1483
1484 //-------------------------------------------------------------------
1485 //
1486 //-------------------------------------------------------------------
1487 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1488         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1489         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1490         if err != nil {
1491                 xapp.Logger.Error("%v", err)
1492         }
1493 }
1494
1495 //-------------------------------------------------------------------
1496 //
1497 //-------------------------------------------------------------------
1498 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1499         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1500         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1501         if err != nil {
1502                 xapp.Logger.Error("%v", err)
1503         }
1504 }
1505
1506 //-------------------------------------------------------------------
1507 //
1508 //-------------------------------------------------------------------
1509 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1510
1511         if removeRestSubscriptionFromDb == true {
1512                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1513                 c.RemoveRESTSubscriptionFromDb(restSubId)
1514         } else {
1515                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1516         }
1517 }
1518
1519 //-------------------------------------------------------------------
1520 //
1521 //-------------------------------------------------------------------
1522 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1523         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1524         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1525         if err != nil {
1526                 xapp.Logger.Error("%v", err)
1527         }
1528 }
1529
1530 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1531
1532         if c.UTTesting == true {
1533                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1534                 c.registry.mutex = new(sync.Mutex)
1535         }
1536
1537         const ricRequestorId = 123
1538         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1539
1540         // Send delete for every endpoint in the subscription
1541         if subs.PolicyUpdate == false {
1542                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1543                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1544                 subDelReqMsg.RequestId.Id = ricRequestorId
1545                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1546                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1547                 if err != nil {
1548                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1549                         return
1550                 }
1551                 for _, endPoint := range subs.EpList.Endpoints {
1552                         params := &xapp.RMRParams{}
1553                         params.Mtype = mType
1554                         params.SubId = int(subs.GetReqId().InstanceId)
1555                         params.Xid = ""
1556                         params.Meid = subs.Meid
1557                         params.Src = endPoint.String()
1558                         params.PayloadLen = len(payload.Buf)
1559                         params.Payload = payload.Buf
1560                         params.Mbuf = nil
1561                         subs.DeleteFromDb = true
1562                         if !e2SubsDelRequired {
1563                                 c.handleXAPPSubscriptionDeleteRequest(params)
1564                         } else {
1565                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1566                         }
1567                 }
1568         }
1569 }
1570
1571 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1572
1573         fmt.Println("CRESTSubscriptionRequest")
1574
1575         if p == nil {
1576                 return
1577         }
1578
1579         if p.SubscriptionID != "" {
1580                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1581         } else {
1582                 fmt.Println("  SubscriptionID = ''")
1583         }
1584
1585         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1586
1587         if p.ClientEndpoint.HTTPPort != nil {
1588                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1589         } else {
1590                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1591         }
1592
1593         if p.ClientEndpoint.RMRPort != nil {
1594                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1595         } else {
1596                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1597         }
1598
1599         if p.Meid != nil {
1600                 fmt.Printf("  Meid = %s\n", *p.Meid)
1601         } else {
1602                 fmt.Println("  Meid = nil")
1603         }
1604
1605         if p.E2SubscriptionDirectives == nil {
1606                 fmt.Println("  E2SubscriptionDirectives = nil")
1607         } else {
1608                 fmt.Println("  E2SubscriptionDirectives")
1609                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1610                         fmt.Println("    E2RetryCount == nil")
1611                 } else {
1612                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1613                 }
1614                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1615                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1616         }
1617         for _, subscriptionDetail := range p.SubscriptionDetails {
1618                 if p.RANFunctionID != nil {
1619                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1620                 } else {
1621                         fmt.Println("  RANFunctionID = nil")
1622                 }
1623                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1624                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1625
1626                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1627                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1628                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1629                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1630
1631                         if actionToBeSetup.SubsequentAction != nil {
1632                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1633                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1634                         } else {
1635                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1636                         }
1637                 }
1638         }
1639 }
1640
1641 //-------------------------------------------------------------------
1642 // handle from E2T Subscription Delete Required
1643 //-------------------------------------------------------------------
1644 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1645         xapp.Logger.Info("MSG from E2T: %s", params.String())
1646         c.UpdateCounter(cSubDelRequFromE2)
1647         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1648         if err != nil {
1649                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1650                 //c.sendE2TErrorIndication(nil)
1651                 return
1652         }
1653         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1654         var subDB = []*Subscription{}
1655         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1656                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1657                 if err != nil {
1658                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1659                         continue
1660                 }
1661                 // Check if Delete Subscription Already triggered
1662                 if subs.OngoingDelCount > 0 {
1663                         continue
1664                 }
1665                 subDB = append(subDB, subs)
1666                 for _, endpoint := range subs.EpList.Endpoints {
1667                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1668                 }
1669                 // Sending Subscription Delete Request to E2T
1670                 //      c.SendSubscriptionDeleteReq(subs, true)
1671         }
1672         for _, subsTobeRemove := range subDB {
1673                 // Sending Subscription Delete Request to E2T
1674                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1675         }
1676 }
1677
1678 //-----------------------------------------------------------------
1679 // Initiate RIC Subscription Delete Request after receiving
1680 // RIC Subscription Delete Required from E2T
1681 //-----------------------------------------------------------------
1682 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1683         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1684         c.UpdateCounter(cSubDelReqToE2)
1685
1686         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1687                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1688                 return
1689         }
1690
1691         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1692         if trans == nil {
1693                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1694                 return
1695         }
1696         defer trans.Release()
1697
1698         err := c.tracker.Track(trans)
1699         if err != nil {
1700                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1701                 return
1702         }
1703
1704         //
1705         // Wake subs delete
1706         //
1707         subs.OngoingDelCount++
1708         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1709         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1710         subs.OngoingDelCount--
1711
1712         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1713
1714         if subs.NoRespToXapp == true {
1715                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1716                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1717                 return
1718         }
1719 }