58521213f71016c0fa2b025722dff624835bd820
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75
76 type Control struct {
77         *xapp.RMRClient
78         e2ap              *E2ap
79         registry          *Registry
80         tracker           *Tracker
81         restDuplicateCtrl *DuplicateCtrl
82         e2IfState         *E2IfState
83         e2IfStateDb       XappRnibInterface
84         e2SubsDb          Sdlnterface
85         restSubsDb        Sdlnterface
86         CntRecvMsg        uint64
87         ResetTestFlag     bool
88         Counters          map[string]xapp.Counter
89         LoggerLevel       int
90         UTTesting         bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         restDuplicateCtrl := new(DuplicateCtrl)
137         restDuplicateCtrl.Init()
138
139         e2IfState := new(E2IfState)
140
141         c := &Control{e2ap: new(E2ap),
142                 registry:          registry,
143                 tracker:           tracker,
144                 restDuplicateCtrl: restDuplicateCtrl,
145                 e2IfState:         e2IfState,
146                 e2IfStateDb:       CreateXappRnibIfInstance(),
147                 e2SubsDb:          CreateSdl(),
148                 restSubsDb:        CreateRESTSdl(),
149                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
150                 LoggerLevel:       1,
151         }
152
153         e2IfState.Init(c)
154         c.ReadConfigParameters("")
155
156         // Register REST handler for testing support
157         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
160
161         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
163
164         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
167
168         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
170
171         if readSubsFromDb == "true" {
172                 // Read subscriptions from db
173                 err := c.ReadE2Subscriptions()
174                 if err != nil {
175                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
176                 }
177                 err = c.ReadRESTSubscriptions()
178                 if err != nil {
179                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
180                 }
181         }
182
183         go func() {
184                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
185                 if err != nil {
186                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
187                 }
188         }()
189         return c
190 }
191
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193         subscriptions, err := c.registry.QueryHandler()
194         if err != nil {
195                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
196         }
197
198         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
199 }
200
201 //-------------------------------------------------------------------
202 //
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205         xapp.Logger.Debug("RESTQueryHandler() called")
206
207         c.CntRecvMsg++
208
209         return c.registry.QueryHandler()
210 }
211
212 //-------------------------------------------------------------------
213 //
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
216         var err error
217         var subIds []uint32
218         var register map[uint32]*Subscription
219         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220                 xapp.Logger.Debug("Reading E2 subscriptions from db")
221                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
222                 if err != nil {
223                         xapp.Logger.Error("%v", err)
224                         <-time.After(1 * time.Second)
225                 } else {
226                         c.registry.subIds = subIds
227                         c.registry.register = register
228                         go c.HandleUncompletedSubscriptions(register)
229                         return nil
230                 }
231         }
232         xapp.Logger.Debug("Continuing without retring")
233         return err
234 }
235
236 //-------------------------------------------------------------------
237 //
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
240
241         xapp.Logger.Debug("ReadRESTSubscriptions()")
242         var err error
243         var restSubscriptions map[string]*RESTSubscription
244         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245                 xapp.Logger.Debug("Reading REST subscriptions from db")
246                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
247                 if err != nil {
248                         xapp.Logger.Error("%v", err)
249                         <-time.After(1 * time.Second)
250                 } else {
251                         // Fix REST subscriptions ongoing status after restart
252                         for restSubId, restSubscription := range restSubscriptions {
253                                 restSubscription.SubReqOngoing = false
254                                 restSubscription.SubDelReqOngoing = false
255                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
256                                 if err != nil {
257                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
258                                 }
259                         }
260                         c.registry.restSubscriptions = restSubscriptions
261                         return nil
262                 }
263         }
264         xapp.Logger.Debug("Continuing without retring")
265         return err
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
272
273         xapp.Logger.Debug("ReadConfigParameters")
274
275         c.LoggerLevel = int(xapp.Logger.GetLevel())
276         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
278
279         // viper.GetDuration returns nanoseconds
280         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281         if e2tSubReqTimeout == 0 {
282                 e2tSubReqTimeout = 2000 * 1000000
283                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
284         }
285         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
286
287         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288         if e2tSubDelReqTime == 0 {
289                 e2tSubDelReqTime = 2000 * 1000000
290                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
291         }
292         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
293
294         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295         if e2tRecvMsgTimeout == 0 {
296                 e2tRecvMsgTimeout = 2000 * 1000000
297                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
298         }
299         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
300
301         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302         if e2tMaxSubReqTryCount == 0 {
303                 e2tMaxSubReqTryCount = 1
304                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
305         }
306         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
307
308         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309         if e2tMaxSubDelReqTryCount == 0 {
310                 e2tMaxSubDelReqTryCount = 1
311                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
312         }
313         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
314
315         checkE2State = viper.GetString("controls.checkE2State")
316         if checkE2State == "" {
317                 checkE2State = "true"
318                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
319         }
320         xapp.Logger.Debug("checkE2State= %v", checkE2State)
321
322         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323         if readSubsFromDb == "" {
324                 readSubsFromDb = "true"
325                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
326         }
327         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
328
329         dbTryCount = viper.GetInt("controls.dbTryCount")
330         if dbTryCount == 0 {
331                 dbTryCount = 200
332                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
333         }
334         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
335
336         dbRetryForever = viper.GetString("controls.dbRetryForever")
337         if dbRetryForever == "" {
338                 dbRetryForever = "true"
339                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
340         }
341         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
342
343         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344         // value 100ms used currently only in unittests.
345         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346         if waitRouteCleanup_ms == 0 {
347                 waitRouteCleanup_ms = 5000 * 1000000
348                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
349         }
350         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
351 }
352
353 //-------------------------------------------------------------------
354 //
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
357
358         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359         for subId, subs := range register {
360                 if subs.SubRespRcvd == false {
361                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362                         if subs.PolicyUpdate == false {
363                                 subs.NoRespToXapp = true
364                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365                                 c.SendSubscriptionDeleteReq(subs, false)
366                         }
367                 }
368         }
369 }
370
371 func (c *Control) ReadyCB(data interface{}) {
372         if c.RMRClient == nil {
373                 c.RMRClient = xapp.Rmr
374         }
375 }
376
377 func (c *Control) Run() {
378         xapp.SetReadyCB(c.ReadyCB, nil)
379         xapp.AddConfigChangeListener(c.ReadConfigParameters)
380         xapp.Run(c)
381 }
382
383 //-------------------------------------------------------------------
384 //
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
387
388         var restSubId string
389         var restSubscription *RESTSubscription
390         var err error
391
392         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393         if p.SubscriptionID == "" {
394                 // Subscription does not contain REST subscription Id
395                 if exists {
396                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397                         if restSubscription != nil {
398                                 // Subscription not found
399                                 restSubId = prevRestSubsId
400                                 if err == nil {
401                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
402                                 } else {
403                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
404                                 }
405                         } else {
406                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
408                         }
409                 }
410
411                 if restSubscription == nil {
412                         restSubId = ksuid.New().String()
413                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
414                 }
415         } else {
416                 // Subscription contains REST subscription Id
417                 restSubId = p.SubscriptionID
418
419                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
421                 if err != nil {
422                         // Subscription with id in REST request does not exist
423                         xapp.Logger.Error("%s", err.Error())
424                         c.UpdateCounter(cRestSubFailToXapp)
425                         return nil, "", err
426                 }
427
428                 if !exists {
429                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
430                 } else {
431                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
432                 }
433         }
434
435         return restSubscription, restSubId, nil
436 }
437
438 //-------------------------------------------------------------------
439 //
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
442
443         c.CntRecvMsg++
444         c.UpdateCounter(cRestSubReqFromXapp)
445
446         subResp := models.SubscriptionResponse{}
447         p := params.(*models.SubscriptionParams)
448
449         if c.LoggerLevel > 2 {
450                 c.PrintRESTSubscriptionRequest(p)
451         }
452
453         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
454                 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
455                         xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
456                 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
457                         xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
458                 }
459                 c.UpdateCounter(cRestReqRejDueE2Down)
460                 return nil, common.SubscribeServiceUnavailableCode
461         }
462
463         if p.ClientEndpoint == nil {
464                 err := fmt.Errorf("ClientEndpoint == nil")
465                 xapp.Logger.Error("%v", err)
466                 c.UpdateCounter(cRestSubFailToXapp)
467                 return nil, common.SubscribeBadRequestCode
468         }
469
470         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
471         if err != nil {
472                 xapp.Logger.Error("%s", err)
473                 c.UpdateCounter(cRestSubFailToXapp)
474                 return nil, common.SubscribeBadRequestCode
475         }
476         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
477         if err != nil {
478                 xapp.Logger.Error("%s", err.Error())
479                 c.UpdateCounter(cRestSubFailToXapp)
480                 return nil, common.SubscribeBadRequestCode
481         }
482
483         md5sum, err := CalculateRequestMd5sum(params)
484         if err != nil {
485                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
486         }
487
488         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
489         if err != nil {
490                 xapp.Logger.Error("Subscription with id in REST request does not exist")
491                 return nil, common.SubscribeNotFoundCode
492         }
493
494         subResp.SubscriptionID = &restSubId
495         subReqList := e2ap.SubscriptionRequestList{}
496         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
497         if err != nil {
498                 xapp.Logger.Error("%s", err.Error())
499                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
500                 c.registry.DeleteRESTSubscription(&restSubId)
501                 c.UpdateCounter(cRestSubFailToXapp)
502                 return nil, common.SubscribeBadRequestCode
503         }
504
505         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
506         if duplicate {
507                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
508                 xapp.Logger.Debug("%s", err)
509                 c.registry.DeleteRESTSubscription(&restSubId)
510                 c.UpdateCounter(cRestSubRespToXapp)
511                 return &subResp, common.SubscribeCreatedCode
512         }
513
514         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
515         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
516
517         c.UpdateCounter(cRestSubRespToXapp)
518         return &subResp, common.SubscribeCreatedCode
519 }
520
521 //-------------------------------------------------------------------
522 //
523 //-------------------------------------------------------------------
524 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
525
526         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
527         if p == nil || p.E2SubscriptionDirectives == nil {
528                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
529                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
530                 e2SubscriptionDirectives.CreateRMRRoute = true
531                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
532         } else {
533                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
534                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
535                 } else {
536                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
537                 }
538                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
539                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
540                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
541                 } else {
542                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
543                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
544                         } else {
545                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
546                         }
547                 }
548                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
549         }
550         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
551         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
552         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
553         return e2SubscriptionDirectives, nil
554 }
555
556 //-------------------------------------------------------------------
557 //
558 //-------------------------------------------------------------------
559
560 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
561         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
562
563         c.SubscriptionProcessingStartDelay()
564         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
565
566         var xAppEventInstanceID int64
567         var e2EventInstanceID int64
568         errorInfo := &ErrorInfo{}
569
570         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
571
572         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
573                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
574                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
575
576                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
577                 if trans == nil {
578                         // Send notification to xApp that prosessing of a Subscription Request has failed.
579                         err := fmt.Errorf("Tracking failure")
580                         errorInfo.ErrorCause = err.Error()
581                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
582                         continue
583                 }
584
585                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
586
587                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
588
589                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
590                 trans.Release()
591
592                 if err != nil {
593                         if err.Error() == "TEST: restart event received" {
594                                 // This is just for UT cases. Stop here subscription processing
595                                 return
596                         }
597                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
598                 } else {
599                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
600                         restSubscription.AddMd5Sum(md5sum)
601                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
602                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
603                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
604                 }
605         }
606 }
607
608 //-------------------------------------------------------------------
609 //
610 //------------------------------------------------------------------
611 func (c *Control) SubscriptionProcessingStartDelay() {
612         if c.UTTesting == true {
613                 // This is temporary fix for the UT problem that notification arrives before subscription response
614                 // Correct fix would be to allow notification come before response and process it correctly
615                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
616                 <-time.After(time.Millisecond * 50)
617                 xapp.Logger.Debug("Continuing after delay")
618         }
619 }
620
621 //-------------------------------------------------------------------
622 //
623 //------------------------------------------------------------------
624 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
625         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
626
627         errorInfo := ErrorInfo{}
628
629         err := c.tracker.Track(trans)
630         if err != nil {
631                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
632                 errorInfo.ErrorCause = err.Error()
633                 err = fmt.Errorf("Tracking failure")
634                 return nil, &errorInfo, err
635         }
636
637         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
638         if err != nil {
639                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
640                 return nil, &errorInfo, err
641         }
642
643         //
644         // Wake subs request
645         //
646         subs.OngoingReqCount++
647         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
648         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
649         subs.OngoingReqCount--
650
651         err = nil
652         if event != nil {
653                 switch themsg := event.(type) {
654                 case *e2ap.E2APSubscriptionResponse:
655                         trans.Release()
656                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
657                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
658                                 return themsg, &errorInfo, nil
659                         } else {
660                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
661                                 c.RemoveSubscriptionFromDb(subs)
662                                 err = fmt.Errorf("E2 interface down")
663                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
664                         }
665                 case *e2ap.E2APSubscriptionFailure:
666                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
667                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
668                 case *PackSubscriptionRequestErrortEvent:
669                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
670                         errorInfo = themsg.ErrorInfo
671                 case *SDLWriteErrortEvent:
672                         err = fmt.Errorf("SDL write failure")
673                         errorInfo = themsg.ErrorInfo
674                 case *SubmgrRestartTestEvent:
675                         err = fmt.Errorf("TEST: restart event received")
676                         xapp.Logger.Debug("%s", err)
677                         return nil, &errorInfo, err
678                 default:
679                         err = fmt.Errorf("Unexpected E2 subscription response received")
680                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
681                         break
682                 }
683         } else {
684                 // Timer expiry
685                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
686                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
687                 if subs.PolicyUpdate == true {
688                         return nil, &errorInfo, err
689                 }
690         }
691
692         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
693         // If policy type subscription fails we cannot remove it only internally. Once subscription has been created
694         // successfully, it must be deleted on both sides.
695         if subs.PolicyUpdate == false {
696                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
697         }
698
699         return nil, &errorInfo, err
700 }
701
702 //-------------------------------------------------------------------
703 //
704 //-------------------------------------------------------------------
705 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
706         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
707
708         // Send notification to xApp that prosessing of a Subscription Request has failed.
709         e2EventInstanceID := (int64)(0)
710         if errorInfo.ErrorSource == "" {
711                 // Submgr is default source of error
712                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
713         }
714         resp := &models.SubscriptionResponse{
715                 SubscriptionID: restSubId,
716                 SubscriptionInstances: []*models.SubscriptionInstance{
717                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
718                                 ErrorCause:          errorInfo.ErrorCause,
719                                 ErrorSource:         errorInfo.ErrorSource,
720                                 TimeoutType:         errorInfo.TimeoutType,
721                                 XappEventInstanceID: &xAppEventInstanceID},
722                 },
723         }
724         // Mark REST subscription request processed.
725         restSubscription.SetProcessed(err)
726         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
727         if trans != nil {
728                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
729                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
730         } else {
731                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
732                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
733         }
734
735         c.UpdateCounter(cRestSubFailNotifToXapp)
736         err = xapp.Subscription.Notify(resp, *clientEndpoint)
737         if err != nil {
738                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
739         }
740
741         // E2 is down. Delete completely processed request safely now
742         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
743                 c.registry.DeleteRESTSubscription(restSubId)
744                 c.RemoveRESTSubscriptionFromDb(*restSubId)
745         }
746 }
747
748 //-------------------------------------------------------------------
749 //
750 //-------------------------------------------------------------------
751 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
752         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
753
754         // Store successfully processed InstanceId for deletion
755         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
756         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
757
758         // Send notification to xApp that a Subscription Request has been processed.
759         resp := &models.SubscriptionResponse{
760                 SubscriptionID: restSubId,
761                 SubscriptionInstances: []*models.SubscriptionInstance{
762                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
763                                 ErrorCause:          errorInfo.ErrorCause,
764                                 ErrorSource:         errorInfo.ErrorSource,
765                                 XappEventInstanceID: &xAppEventInstanceID},
766                 },
767         }
768         // Mark REST subscription request processesd.
769         restSubscription.SetProcessed(nil)
770         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
771         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
772                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
773         c.UpdateCounter(cRestSubNotifToXapp)
774         err := xapp.Subscription.Notify(resp, *clientEndpoint)
775         if err != nil {
776                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
777         }
778
779         // E2 is down. Delete completely processed request safely now
780         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
781                 c.registry.DeleteRESTSubscription(restSubId)
782                 c.RemoveRESTSubscriptionFromDb(*restSubId)
783         }
784 }
785
786 //-------------------------------------------------------------------
787 //
788 //-------------------------------------------------------------------
789 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
790
791         c.CntRecvMsg++
792         c.UpdateCounter(cRestSubDelReqFromXapp)
793
794         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
795
796         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
797         if err != nil {
798                 xapp.Logger.Error("%s", err.Error())
799                 if restSubscription == nil {
800                         // Subscription was not found
801                         c.UpdateCounter(cRestSubDelRespToXapp)
802                         return common.UnsubscribeNoContentCode
803                 } else {
804                         if restSubscription.SubReqOngoing == true {
805                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
806                                 xapp.Logger.Error("%s", err.Error())
807                                 c.UpdateCounter(cRestSubDelFailToXapp)
808                                 return common.UnsubscribeBadRequestCode
809                         } else if restSubscription.SubDelReqOngoing == true {
810                                 // Previous request for same restSubId still ongoing
811                                 c.UpdateCounter(cRestSubDelRespToXapp)
812                                 return common.UnsubscribeNoContentCode
813                         }
814                 }
815         }
816
817         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
818         go func() {
819                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
820                 for _, instanceId := range restSubscription.InstanceIds {
821                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
822
823                         if err != nil {
824                                 xapp.Logger.Error("%s", err.Error())
825                         }
826                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
827                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
828                         restSubscription.DeleteE2InstanceId(instanceId)
829                 }
830                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
831                 c.registry.DeleteRESTSubscription(&restSubId)
832                 c.RemoveRESTSubscriptionFromDb(restSubId)
833         }()
834
835         c.UpdateCounter(cRestSubDelRespToXapp)
836         return common.UnsubscribeNoContentCode
837 }
838
839 //-------------------------------------------------------------------
840 //
841 //-------------------------------------------------------------------
842 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
843
844         var xAppEventInstanceID int64
845         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
846         if err != nil {
847                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
848                         restSubId, instanceId, idstring(err, nil))
849                 return xAppEventInstanceID, nil
850         }
851
852         xAppEventInstanceID = int64(subs.ReqId.Id)
853         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
854         if trans == nil {
855                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
856                 xapp.Logger.Error("%s", err.Error())
857         }
858         defer trans.Release()
859
860         err = c.tracker.Track(trans)
861         if err != nil {
862                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
863                 xapp.Logger.Error("%s", err.Error())
864                 return xAppEventInstanceID, &time.ParseError{}
865         }
866         //
867         // Wake subs delete
868         //
869         subs.OngoingDelCount++
870         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
871         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
872         subs.OngoingDelCount--
873
874         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
875
876         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
877
878         return xAppEventInstanceID, nil
879 }
880
881 //-------------------------------------------------------------------
882 //
883 //-------------------------------------------------------------------
884
885 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
886         params := &xapp.RMRParams{}
887         params.Mtype = trans.GetMtype()
888         params.SubId = int(subs.GetReqId().InstanceId)
889         params.Xid = ""
890         params.Meid = subs.GetMeid()
891         params.Src = ""
892         params.PayloadLen = len(trans.Payload.Buf)
893         params.Payload = trans.Payload.Buf
894         params.Mbuf = nil
895         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
896         err = c.SendWithRetry(params, false, 5)
897         if err != nil {
898                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
899         }
900         return err
901 }
902
903 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
904
905         params := &xapp.RMRParams{}
906         params.Mtype = trans.GetMtype()
907         params.SubId = int(subs.GetReqId().InstanceId)
908         params.Xid = trans.GetXid()
909         params.Meid = trans.GetMeid()
910         params.Src = ""
911         params.PayloadLen = len(trans.Payload.Buf)
912         params.Payload = trans.Payload.Buf
913         params.Mbuf = nil
914         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
915         err = c.SendWithRetry(params, false, 5)
916         if err != nil {
917                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
918         }
919         return err
920 }
921
922 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
923         if c.RMRClient == nil {
924                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
925                 xapp.Logger.Error("%s", err.Error())
926                 return
927         }
928         c.CntRecvMsg++
929
930         defer c.RMRClient.Free(msg.Mbuf)
931
932         // xapp-frame might use direct access to c buffer and
933         // when msg.Mbuf is freed, someone might take it into use
934         // and payload data might be invalid inside message handle function
935         //
936         // subscriptions won't load system a lot so there is no
937         // real performance hit by cloning buffer into new go byte slice
938         cPay := append(msg.Payload[:0:0], msg.Payload...)
939         msg.Payload = cPay
940         msg.PayloadLen = len(cPay)
941
942         switch msg.Mtype {
943         case xapp.RIC_SUB_REQ:
944                 go c.handleXAPPSubscriptionRequest(msg)
945         case xapp.RIC_SUB_RESP:
946                 go c.handleE2TSubscriptionResponse(msg)
947         case xapp.RIC_SUB_FAILURE:
948                 go c.handleE2TSubscriptionFailure(msg)
949         case xapp.RIC_SUB_DEL_REQ:
950                 go c.handleXAPPSubscriptionDeleteRequest(msg)
951         case xapp.RIC_SUB_DEL_RESP:
952                 go c.handleE2TSubscriptionDeleteResponse(msg)
953         case xapp.RIC_SUB_DEL_FAILURE:
954                 go c.handleE2TSubscriptionDeleteFailure(msg)
955         case xapp.RIC_SUB_DEL_REQUIRED:
956                 go c.handleE2TSubscriptionDeleteRequired(msg)
957         default:
958                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
959         }
960         return
961 }
962
963 //-------------------------------------------------------------------
964 // handle from XAPP Subscription Request
965 //------------------------------------------------------------------
966 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
967         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
968         c.UpdateCounter(cSubReqFromXapp)
969
970         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
971                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
972                 return
973         }
974
975         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
976         if err != nil {
977                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
978                 return
979         }
980
981         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
982         if trans == nil {
983                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
984                 return
985         }
986         defer trans.Release()
987
988         if err = c.tracker.Track(trans); err != nil {
989                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
990                 return
991         }
992
993         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
994         if err != nil {
995                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
996                 return
997         }
998
999         c.wakeSubscriptionRequest(subs, trans)
1000 }
1001
1002 //-------------------------------------------------------------------
1003 // Wake Subscription Request to E2node
1004 //------------------------------------------------------------------
1005 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1006
1007         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1008         if err != nil {
1009                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1010         }
1011         subs.OngoingReqCount++
1012         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1013         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1014         subs.OngoingReqCount--
1015         if event != nil {
1016                 switch themsg := event.(type) {
1017                 case *e2ap.E2APSubscriptionResponse:
1018                         themsg.RequestId.Id = trans.RequestId.Id
1019                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1020                         if err == nil {
1021                                 trans.Release()
1022                                 c.UpdateCounter(cSubRespToXapp)
1023                                 err := c.rmrSendToXapp("", subs, trans)
1024                                 if err != nil {
1025                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1026                                 }
1027                                 return
1028                         }
1029                 case *e2ap.E2APSubscriptionFailure:
1030                         themsg.RequestId.Id = trans.RequestId.Id
1031                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1032                         if err == nil {
1033                                 c.UpdateCounter(cSubFailToXapp)
1034                                 c.rmrSendToXapp("", subs, trans)
1035                         }
1036                 default:
1037                         break
1038                 }
1039         }
1040         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1041 }
1042
1043 //-------------------------------------------------------------------
1044 // handle from XAPP Subscription Delete Request
1045 //------------------------------------------------------------------
1046 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1047         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1048         c.UpdateCounter(cSubDelReqFromXapp)
1049
1050         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1051                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1052                 return
1053         }
1054
1055         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1056         if err != nil {
1057                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1058                 return
1059         }
1060
1061         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1062         if trans == nil {
1063                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1064                 return
1065         }
1066         defer trans.Release()
1067
1068         err = c.tracker.Track(trans)
1069         if err != nil {
1070                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1071                 return
1072         }
1073
1074         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1075         if err != nil {
1076                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1077                 return
1078         }
1079
1080         //
1081         // Wake subs delete
1082         //
1083         subs.OngoingDelCount++
1084         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1085         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1086         subs.OngoingDelCount--
1087
1088         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1089
1090         if subs.NoRespToXapp == true {
1091                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1092                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1093                 return
1094         }
1095
1096         // Whatever is received success, fail or timeout, send successful delete response
1097         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1098         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1099         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1100         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1101         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1102         if err == nil {
1103                 c.UpdateCounter(cSubDelRespToXapp)
1104                 err := c.rmrSendToXapp("", subs, trans)
1105                 if err != nil {
1106                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1107                 }
1108         }
1109 }
1110
1111 //-------------------------------------------------------------------
1112 // SUBS CREATE Handling
1113 //-------------------------------------------------------------------
1114 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1115
1116         var event interface{} = nil
1117         var removeSubscriptionFromDb bool = false
1118         trans := c.tracker.NewSubsTransaction(subs)
1119         subs.WaitTransactionTurn(trans)
1120         defer subs.ReleaseTransactionTurn(trans)
1121         defer trans.Release()
1122
1123         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1124
1125         subRfMsg, valid := subs.GetCachedResponse()
1126         if subRfMsg == nil && valid == true {
1127                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1128                 switch event.(type) {
1129                 case *e2ap.E2APSubscriptionResponse:
1130                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1131                         subs.SubRespRcvd = true
1132                 case *e2ap.E2APSubscriptionFailure:
1133                         if subs.PolicyUpdate == false {
1134                                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1135                         } else {
1136                                 // In policy update case where subscription has already been created successfully in Gnb
1137                                 // we cannot delete subscription internally in submgr
1138                                 subRfMsg, valid = subs.SetCachedResponse(event, true)
1139                         }
1140                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1141                 case *SubmgrRestartTestEvent:
1142                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1143                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1144                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1145                         parentTrans.SendEvent(subRfMsg, 0)
1146                         return
1147                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1148                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1149                 default:
1150                         // Timer expiry
1151                         if subs.PolicyUpdate == false {
1152                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1153                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1154                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1155                         } else {
1156                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1157                         }
1158                 }
1159                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1160         } else {
1161                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1162         }
1163         xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate)
1164         xapp.Logger.Debug("subs: %v", subs)
1165
1166         if valid == false {
1167                 removeSubscriptionFromDb = true
1168         }
1169
1170         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1171         if err != nil {
1172                 valid = false
1173                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1174
1175         }
1176
1177         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1178         if valid == false {
1179                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1180         }
1181
1182         parentTrans.SendEvent(subRfMsg, 0)
1183 }
1184
1185 //-------------------------------------------------------------------
1186 // SUBS DELETE Handling
1187 //-------------------------------------------------------------------
1188
1189 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1190
1191         trans := c.tracker.NewSubsTransaction(subs)
1192         subs.WaitTransactionTurn(trans)
1193         defer subs.ReleaseTransactionTurn(trans)
1194         defer trans.Release()
1195
1196         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1197
1198         subs.mutex.Lock()
1199
1200         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1201                 subs.valid = false
1202                 subs.mutex.Unlock()
1203                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1204         } else {
1205                 subs.mutex.Unlock()
1206         }
1207
1208         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1209         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1210         parentTrans.SendEvent(nil, 0)
1211 }
1212
1213 //-------------------------------------------------------------------
1214 // send to E2T Subscription Request
1215 //-------------------------------------------------------------------
1216 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1217         var err error
1218         var event interface{} = nil
1219         var timedOut bool = false
1220         const ricRequestorId = 123
1221
1222         subReqMsg := subs.SubReqMsg
1223         subReqMsg.RequestId = subs.GetReqId().RequestId
1224         subReqMsg.RequestId.Id = ricRequestorId
1225         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1226         if err != nil {
1227                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1228                 return &PackSubscriptionRequestErrortEvent{
1229                         ErrorInfo{
1230                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1231                                 ErrorCause:  err.Error(),
1232                         },
1233                 }
1234         }
1235
1236         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1237         err = c.WriteSubscriptionToDb(subs)
1238         if err != nil {
1239                 return &SDLWriteErrortEvent{
1240                         ErrorInfo{
1241                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1242                                 ErrorCause:  err.Error(),
1243                         },
1244                 }
1245         }
1246
1247         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1248                 desc := fmt.Sprintf("(retry %d)", retries)
1249                 if retries == 0 {
1250                         c.UpdateCounter(cSubReqToE2)
1251                 } else {
1252                         c.UpdateCounter(cSubReReqToE2)
1253                 }
1254                 err := c.rmrSendToE2T(desc, subs, trans)
1255                 if err != nil {
1256                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1257                 }
1258
1259                 if subs.DoNotWaitSubResp == false {
1260                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1261                         if timedOut {
1262                                 c.UpdateCounter(cSubReqTimerExpiry)
1263                                 continue
1264                         }
1265                 } else {
1266                         // Simulating case where subscrition request has been sent but response has not been received before restart
1267                         event = &SubmgrRestartTestEvent{}
1268                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1269                 }
1270                 break
1271         }
1272         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1273         return event
1274 }
1275
1276 //-------------------------------------------------------------------
1277 // send to E2T Subscription Delete Request
1278 //-------------------------------------------------------------------
1279
1280 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1281         var err error
1282         var event interface{}
1283         var timedOut bool
1284         const ricRequestorId = 123
1285
1286         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1287         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1288         subDelReqMsg.RequestId.Id = ricRequestorId
1289         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1290         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1291         if err != nil {
1292                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1293                 return event
1294         }
1295
1296         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1297                 desc := fmt.Sprintf("(retry %d)", retries)
1298                 if retries == 0 {
1299                         c.UpdateCounter(cSubDelReqToE2)
1300                 } else {
1301                         c.UpdateCounter(cSubDelReReqToE2)
1302                 }
1303                 err := c.rmrSendToE2T(desc, subs, trans)
1304                 if err != nil {
1305                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1306                 }
1307                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1308                 if timedOut {
1309                         c.UpdateCounter(cSubDelReqTimerExpiry)
1310                         continue
1311                 }
1312                 break
1313         }
1314         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1315         return event
1316 }
1317
1318 //-------------------------------------------------------------------
1319 // handle from E2T Subscription Response
1320 //-------------------------------------------------------------------
1321 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1322         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1323         c.UpdateCounter(cSubRespFromE2)
1324
1325         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1326         if err != nil {
1327                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1328                 return
1329         }
1330         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1331         if err != nil {
1332                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1333                 return
1334         }
1335         trans := subs.GetTransaction()
1336         if trans == nil {
1337                 err = fmt.Errorf("Ongoing transaction not found")
1338                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1339                 return
1340         }
1341         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1342         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1343         if sendOk == false {
1344                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1345                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1346         }
1347         return
1348 }
1349
1350 //-------------------------------------------------------------------
1351 // handle from E2T Subscription Failure
1352 //-------------------------------------------------------------------
1353 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1354         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1355         c.UpdateCounter(cSubFailFromE2)
1356         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1357         if err != nil {
1358                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1359                 return
1360         }
1361         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1362         if err != nil {
1363                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1364                 return
1365         }
1366         trans := subs.GetTransaction()
1367         if trans == nil {
1368                 err = fmt.Errorf("Ongoing transaction not found")
1369                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1370                 return
1371         }
1372         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1373         if sendOk == false {
1374                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1375                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1376         }
1377         return
1378 }
1379
1380 //-------------------------------------------------------------------
1381 // handle from E2T Subscription Delete Response
1382 //-------------------------------------------------------------------
1383 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1384         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1385         c.UpdateCounter(cSubDelRespFromE2)
1386         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1387         if err != nil {
1388                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1389                 return
1390         }
1391         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1392         if err != nil {
1393                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1394                 return
1395         }
1396         trans := subs.GetTransaction()
1397         if trans == nil {
1398                 err = fmt.Errorf("Ongoing transaction not found")
1399                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1400                 return
1401         }
1402         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1403         if sendOk == false {
1404                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1405                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1406         }
1407         return
1408 }
1409
1410 //-------------------------------------------------------------------
1411 // handle from E2T Subscription Delete Failure
1412 //-------------------------------------------------------------------
1413 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1414         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1415         c.UpdateCounter(cSubDelFailFromE2)
1416         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1417         if err != nil {
1418                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1419                 return
1420         }
1421         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1422         if err != nil {
1423                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1424                 return
1425         }
1426         trans := subs.GetTransaction()
1427         if trans == nil {
1428                 err = fmt.Errorf("Ongoing transaction not found")
1429                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1430                 return
1431         }
1432         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1433         if sendOk == false {
1434                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1435                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1436         }
1437         return
1438 }
1439
1440 //-------------------------------------------------------------------
1441 //
1442 //-------------------------------------------------------------------
1443 func typeofSubsMessage(v interface{}) string {
1444         if v == nil {
1445                 return "NIL"
1446         }
1447         switch v.(type) {
1448         //case *e2ap.E2APSubscriptionRequest:
1449         //      return "SubReq"
1450         case *e2ap.E2APSubscriptionResponse:
1451                 return "SubResp"
1452         case *e2ap.E2APSubscriptionFailure:
1453                 return "SubFail"
1454         //case *e2ap.E2APSubscriptionDeleteRequest:
1455         //      return "SubDelReq"
1456         case *e2ap.E2APSubscriptionDeleteResponse:
1457                 return "SubDelResp"
1458         case *e2ap.E2APSubscriptionDeleteFailure:
1459                 return "SubDelFail"
1460         default:
1461                 return "Unknown"
1462         }
1463 }
1464
1465 //-------------------------------------------------------------------
1466 //
1467 //-------------------------------------------------------------------
1468 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1469         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1470         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1471         if err != nil {
1472                 xapp.Logger.Error("%v", err)
1473                 return err
1474         }
1475         return nil
1476 }
1477
1478 //-------------------------------------------------------------------
1479 //
1480 //-------------------------------------------------------------------
1481 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1482
1483         if removeSubscriptionFromDb == true {
1484                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1485                 c.RemoveSubscriptionFromDb(subs)
1486         } else {
1487                 // Update is needed for successful response and merge case here
1488                 if subs.RetryFromXapp == false {
1489                         err := c.WriteSubscriptionToDb(subs)
1490                         return err
1491                 }
1492         }
1493         subs.RetryFromXapp = false
1494         return nil
1495 }
1496
1497 //-------------------------------------------------------------------
1498 //
1499 //-------------------------------------------------------------------
1500 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1501         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1502         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1503         if err != nil {
1504                 xapp.Logger.Error("%v", err)
1505         }
1506 }
1507
1508 //-------------------------------------------------------------------
1509 //
1510 //-------------------------------------------------------------------
1511 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1512         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1513         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1514         if err != nil {
1515                 xapp.Logger.Error("%v", err)
1516         }
1517 }
1518
1519 //-------------------------------------------------------------------
1520 //
1521 //-------------------------------------------------------------------
1522 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1523
1524         if removeRestSubscriptionFromDb == true {
1525                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1526                 c.RemoveRESTSubscriptionFromDb(restSubId)
1527         } else {
1528                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1529         }
1530 }
1531
1532 //-------------------------------------------------------------------
1533 //
1534 //-------------------------------------------------------------------
1535 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1536         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1537         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1538         if err != nil {
1539                 xapp.Logger.Error("%v", err)
1540         }
1541 }
1542
1543 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1544
1545         if c.UTTesting == true {
1546                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1547                 c.registry.mutex = new(sync.Mutex)
1548         }
1549
1550         const ricRequestorId = 123
1551         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1552
1553         // Send delete for every endpoint in the subscription
1554         if subs.PolicyUpdate == false {
1555                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1556                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1557                 subDelReqMsg.RequestId.Id = ricRequestorId
1558                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1559                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1560                 if err != nil {
1561                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1562                         return
1563                 }
1564                 for _, endPoint := range subs.EpList.Endpoints {
1565                         params := &xapp.RMRParams{}
1566                         params.Mtype = mType
1567                         params.SubId = int(subs.GetReqId().InstanceId)
1568                         params.Xid = ""
1569                         params.Meid = subs.Meid
1570                         params.Src = endPoint.String()
1571                         params.PayloadLen = len(payload.Buf)
1572                         params.Payload = payload.Buf
1573                         params.Mbuf = nil
1574                         subs.DeleteFromDb = true
1575                         if !e2SubsDelRequired {
1576                                 c.handleXAPPSubscriptionDeleteRequest(params)
1577                         } else {
1578                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1579                         }
1580                 }
1581         }
1582 }
1583
1584 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1585
1586         fmt.Println("CRESTSubscriptionRequest")
1587
1588         if p == nil {
1589                 return
1590         }
1591
1592         if p.SubscriptionID != "" {
1593                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1594         } else {
1595                 fmt.Println("  SubscriptionID = ''")
1596         }
1597
1598         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1599
1600         if p.ClientEndpoint.HTTPPort != nil {
1601                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1602         } else {
1603                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1604         }
1605
1606         if p.ClientEndpoint.RMRPort != nil {
1607                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1608         } else {
1609                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1610         }
1611
1612         if p.Meid != nil {
1613                 fmt.Printf("  Meid = %s\n", *p.Meid)
1614         } else {
1615                 fmt.Println("  Meid = nil")
1616         }
1617
1618         if p.E2SubscriptionDirectives == nil {
1619                 fmt.Println("  E2SubscriptionDirectives = nil")
1620         } else {
1621                 fmt.Println("  E2SubscriptionDirectives")
1622                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1623                         fmt.Println("    E2RetryCount == nil")
1624                 } else {
1625                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1626                 }
1627                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1628                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1629         }
1630         for _, subscriptionDetail := range p.SubscriptionDetails {
1631                 if p.RANFunctionID != nil {
1632                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1633                 } else {
1634                         fmt.Println("  RANFunctionID = nil")
1635                 }
1636                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1637                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1638
1639                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1640                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1641                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1642                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1643
1644                         if actionToBeSetup.SubsequentAction != nil {
1645                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1646                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1647                         } else {
1648                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1649                         }
1650                 }
1651         }
1652 }
1653
1654 //-------------------------------------------------------------------
1655 // handle from E2T Subscription Delete Required
1656 //-------------------------------------------------------------------
1657 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1658         xapp.Logger.Info("MSG from E2T: %s", params.String())
1659         c.UpdateCounter(cSubDelRequFromE2)
1660         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1661         if err != nil {
1662                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1663                 //c.sendE2TErrorIndication(nil)
1664                 return
1665         }
1666         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1667         var subDB = []*Subscription{}
1668         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1669                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1670                 if err != nil {
1671                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1672                         continue
1673                 }
1674                 // Check if Delete Subscription Already triggered
1675                 if subs.OngoingDelCount > 0 {
1676                         continue
1677                 }
1678                 subDB = append(subDB, subs)
1679                 for _, endpoint := range subs.EpList.Endpoints {
1680                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1681                 }
1682                 // Sending Subscription Delete Request to E2T
1683                 //      c.SendSubscriptionDeleteReq(subs, true)
1684         }
1685         for _, subsTobeRemove := range subDB {
1686                 // Sending Subscription Delete Request to E2T
1687                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1688         }
1689 }
1690
1691 //-----------------------------------------------------------------
1692 // Initiate RIC Subscription Delete Request after receiving
1693 // RIC Subscription Delete Required from E2T
1694 //-----------------------------------------------------------------
1695 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1696         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1697         c.UpdateCounter(cSubDelReqToE2)
1698
1699         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1700                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1701                 return
1702         }
1703
1704         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1705         if trans == nil {
1706                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1707                 return
1708         }
1709         defer trans.Release()
1710
1711         err := c.tracker.Track(trans)
1712         if err != nil {
1713                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1714                 return
1715         }
1716
1717         //
1718         // Wake subs delete
1719         //
1720         subs.OngoingDelCount++
1721         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1722         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1723         subs.OngoingDelCount--
1724
1725         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1726
1727         if subs.NoRespToXapp == true {
1728                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1729                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1730                 return
1731         }
1732 }