Merge "Support for handling unordered IEs in RIC Subscription Response messgae"
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75 var e2IEOrderCheckValue uint8
76
77 type Control struct {
78         *xapp.RMRClient
79         e2ap              *E2ap
80         registry          *Registry
81         tracker           *Tracker
82         restDuplicateCtrl *DuplicateCtrl
83         e2IfState         *E2IfState
84         e2IfStateDb       XappRnibInterface
85         e2SubsDb          Sdlnterface
86         restSubsDb        Sdlnterface
87         CntRecvMsg        uint64
88         ResetTestFlag     bool
89         Counters          map[string]xapp.Counter
90         LoggerLevel       int
91         UTTesting         bool
92 }
93
94 type RMRMeid struct {
95         PlmnID  string
96         EnbID   string
97         RanName string
98 }
99
100 type SubmgrRestartTestEvent struct{}
101 type SubmgrRestartUpEvent struct{}
102 type PackSubscriptionRequestErrortEvent struct {
103         ErrorInfo ErrorInfo
104 }
105
106 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
107         p.ErrorInfo = *errorInfo
108 }
109
110 type SDLWriteErrortEvent struct {
111         ErrorInfo ErrorInfo
112 }
113
114 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
115         s.ErrorInfo = *errorInfo
116 }
117
118 func init() {
119         xapp.Logger.Debug("SUBMGR")
120         viper.AutomaticEnv()
121         viper.SetEnvPrefix("submgr")
122         viper.AllowEmptyEnv(true)
123 }
124
125 func NewControl() *Control {
126
127         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
128         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129
130         registry := new(Registry)
131         registry.Initialize()
132         registry.rtmgrClient = &rtmgrClient
133
134         tracker := new(Tracker)
135         tracker.Init()
136
137         restDuplicateCtrl := new(DuplicateCtrl)
138         restDuplicateCtrl.Init()
139
140         e2IfState := new(E2IfState)
141
142         c := &Control{e2ap: new(E2ap),
143                 registry:          registry,
144                 tracker:           tracker,
145                 restDuplicateCtrl: restDuplicateCtrl,
146                 e2IfState:         e2IfState,
147                 e2IfStateDb:       CreateXappRnibIfInstance(),
148                 e2SubsDb:          CreateSdl(),
149                 restSubsDb:        CreateRESTSdl(),
150                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
151                 LoggerLevel:       1,
152         }
153
154         e2IfState.Init(c)
155         c.ReadConfigParameters("")
156
157         // Register REST handler for testing support
158         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
159         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
160         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161
162         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
163         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164
165         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
167         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
170         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171
172         if readSubsFromDb == "true" {
173                 // Read subscriptions from db
174                 err := c.ReadE2Subscriptions()
175                 if err != nil {
176                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177                 }
178                 err = c.ReadRESTSubscriptions()
179                 if err != nil {
180                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
181                 }
182         }
183
184         go func() {
185                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186                 if err != nil {
187                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
188                 }
189         }()
190         return c
191 }
192
193 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
194         subscriptions, err := c.registry.QueryHandler()
195         if err != nil {
196                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
197         }
198
199         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
200 }
201
202 //-------------------------------------------------------------------
203 //
204 //-------------------------------------------------------------------
205 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
206         xapp.Logger.Debug("RESTQueryHandler() called")
207
208         c.CntRecvMsg++
209
210         return c.registry.QueryHandler()
211 }
212
213 //-------------------------------------------------------------------
214 //
215 //-------------------------------------------------------------------
216 func (c *Control) ReadE2Subscriptions() error {
217         var err error
218         var subIds []uint32
219         var register map[uint32]*Subscription
220         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
221                 xapp.Logger.Debug("Reading E2 subscriptions from db")
222                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223                 if err != nil {
224                         xapp.Logger.Error("%v", err)
225                         <-time.After(1 * time.Second)
226                 } else {
227                         c.registry.subIds = subIds
228                         c.registry.register = register
229                         go c.HandleUncompletedSubscriptions(register)
230                         return nil
231                 }
232         }
233         xapp.Logger.Debug("Continuing without retring")
234         return err
235 }
236
237 //-------------------------------------------------------------------
238 //
239 //-------------------------------------------------------------------
240 func (c *Control) ReadRESTSubscriptions() error {
241
242         xapp.Logger.Debug("ReadRESTSubscriptions()")
243         var err error
244         var restSubscriptions map[string]*RESTSubscription
245         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
246                 xapp.Logger.Debug("Reading REST subscriptions from db")
247                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248                 if err != nil {
249                         xapp.Logger.Error("%v", err)
250                         <-time.After(1 * time.Second)
251                 } else {
252                         // Fix REST subscriptions ongoing status after restart
253                         for restSubId, restSubscription := range restSubscriptions {
254                                 restSubscription.SubReqOngoing = false
255                                 restSubscription.SubDelReqOngoing = false
256                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257                                 if err != nil {
258                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
259                                 }
260                         }
261                         c.registry.restSubscriptions = restSubscriptions
262                         return nil
263                 }
264         }
265         xapp.Logger.Debug("Continuing without retring")
266         return err
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272 func (c *Control) ReadConfigParameters(f string) {
273
274         xapp.Logger.Debug("ReadConfigParameters")
275
276         c.LoggerLevel = int(xapp.Logger.GetLevel())
277         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
278         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279
280         // viper.GetDuration returns nanoseconds
281         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
282         if e2tSubReqTimeout == 0 {
283                 e2tSubReqTimeout = 2000 * 1000000
284                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285         }
286         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287
288         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
289         if e2tSubDelReqTime == 0 {
290                 e2tSubDelReqTime = 2000 * 1000000
291                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292         }
293         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294
295         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
296         if e2tRecvMsgTimeout == 0 {
297                 e2tRecvMsgTimeout = 2000 * 1000000
298                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299         }
300         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301
302         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
303         if e2tMaxSubReqTryCount == 0 {
304                 e2tMaxSubReqTryCount = 1
305                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306         }
307         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308
309         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
310         if e2tMaxSubDelReqTryCount == 0 {
311                 e2tMaxSubDelReqTryCount = 1
312                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313         }
314         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315
316         checkE2State = viper.GetString("controls.checkE2State")
317         if checkE2State == "" {
318                 checkE2State = "true"
319                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320         }
321         xapp.Logger.Debug("checkE2State= %v", checkE2State)
322
323         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
324         if readSubsFromDb == "" {
325                 readSubsFromDb = "true"
326                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327         }
328         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329
330         dbTryCount = viper.GetInt("controls.dbTryCount")
331         if dbTryCount == 0 {
332                 dbTryCount = 200
333                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334         }
335         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336
337         dbRetryForever = viper.GetString("controls.dbRetryForever")
338         if dbRetryForever == "" {
339                 dbRetryForever = "true"
340                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341         }
342         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343
344         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
345         // value 100ms used currently only in unittests.
346         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
347         if waitRouteCleanup_ms == 0 {
348                 waitRouteCleanup_ms = 5000 * 1000000
349                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350         }
351         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
352
353         viper.SetDefault("controls.checkE2IEOrder", 1)
354         e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder"))
355         c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue)
356         xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue)
357 }
358
359 //-------------------------------------------------------------------
360 //
361 //-------------------------------------------------------------------
362 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
363
364         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
365         for subId, subs := range register {
366                 if subs.SubRespRcvd == false {
367                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
368                         if subs.PolicyUpdate == false {
369                                 subs.NoRespToXapp = true
370                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
371                                 c.SendSubscriptionDeleteReq(subs, false)
372                         }
373                 }
374         }
375 }
376
377 func (c *Control) ReadyCB(data interface{}) {
378         if c.RMRClient == nil {
379                 c.RMRClient = xapp.Rmr
380         }
381 }
382
383 func (c *Control) Run() {
384         xapp.SetReadyCB(c.ReadyCB, nil)
385         xapp.AddConfigChangeListener(c.ReadConfigParameters)
386         xapp.Run(c)
387 }
388
389 //-------------------------------------------------------------------
390 //
391 //-------------------------------------------------------------------
392 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
393
394         var restSubId string
395         var restSubscription *RESTSubscription
396         var err error
397
398         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
399         if p.SubscriptionID == "" {
400                 // Subscription does not contain REST subscription Id
401                 if exists {
402                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
403                         if restSubscription != nil {
404                                 // Subscription not found
405                                 restSubId = prevRestSubsId
406                                 if err == nil {
407                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
408                                 } else {
409                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
410                                 }
411                         } else {
412                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
413                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
414                         }
415                 }
416
417                 if restSubscription == nil {
418                         restSubId = ksuid.New().String()
419                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
420                 }
421         } else {
422                 // Subscription contains REST subscription Id
423                 restSubId = p.SubscriptionID
424
425                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
426                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
427                 if err != nil {
428                         // Subscription with id in REST request does not exist
429                         xapp.Logger.Error("%s", err.Error())
430                         c.UpdateCounter(cRestSubFailToXapp)
431                         return nil, "", err
432                 }
433
434                 if !exists {
435                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
436                 } else {
437                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
438                 }
439         }
440
441         return restSubscription, restSubId, nil
442 }
443
444 //-------------------------------------------------------------------
445 //
446 //-------------------------------------------------------------------
447 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
448
449         c.CntRecvMsg++
450         c.UpdateCounter(cRestSubReqFromXapp)
451
452         subResp := models.SubscriptionResponse{}
453         p := params.(*models.SubscriptionParams)
454
455         if c.LoggerLevel > 2 {
456                 c.PrintRESTSubscriptionRequest(p)
457         }
458
459         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
460                 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
461                         xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
462                 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
463                         xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
464                 }
465                 c.UpdateCounter(cRestReqRejDueE2Down)
466                 return nil, common.SubscribeServiceUnavailableCode
467         }
468
469         if p.ClientEndpoint == nil {
470                 err := fmt.Errorf("ClientEndpoint == nil")
471                 xapp.Logger.Error("%v", err)
472                 c.UpdateCounter(cRestSubFailToXapp)
473                 return nil, common.SubscribeBadRequestCode
474         }
475
476         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
477         if err != nil {
478                 xapp.Logger.Error("%s", err)
479                 c.UpdateCounter(cRestSubFailToXapp)
480                 return nil, common.SubscribeBadRequestCode
481         }
482         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
483         if err != nil {
484                 xapp.Logger.Error("%s", err.Error())
485                 c.UpdateCounter(cRestSubFailToXapp)
486                 return nil, common.SubscribeBadRequestCode
487         }
488
489         md5sum, err := CalculateRequestMd5sum(params)
490         if err != nil {
491                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
492         }
493
494         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
495         if err != nil {
496                 xapp.Logger.Error("Subscription with id in REST request does not exist")
497                 return nil, common.SubscribeNotFoundCode
498         }
499
500         subResp.SubscriptionID = &restSubId
501         subReqList := e2ap.SubscriptionRequestList{}
502         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
503         if err != nil {
504                 xapp.Logger.Error("%s", err.Error())
505                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
506                 c.registry.DeleteRESTSubscription(&restSubId)
507                 c.UpdateCounter(cRestSubFailToXapp)
508                 return nil, common.SubscribeBadRequestCode
509         }
510
511         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
512         if duplicate {
513                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
514                 xapp.Logger.Debug("%s", err)
515                 c.registry.DeleteRESTSubscription(&restSubId)
516                 c.UpdateCounter(cRestSubRespToXapp)
517                 return &subResp, common.SubscribeCreatedCode
518         }
519
520         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
521         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
522
523         c.UpdateCounter(cRestSubRespToXapp)
524         return &subResp, common.SubscribeCreatedCode
525 }
526
527 //-------------------------------------------------------------------
528 //
529 //-------------------------------------------------------------------
530 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
531
532         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
533         if p == nil || p.E2SubscriptionDirectives == nil {
534                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
535                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
536                 e2SubscriptionDirectives.CreateRMRRoute = true
537                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
538         } else {
539                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
540                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
541                 } else {
542                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
543                 }
544                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
545                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
546                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
547                 } else {
548                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
549                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
550                         } else {
551                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
552                         }
553                 }
554                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
555         }
556         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
557         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
558         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
559         return e2SubscriptionDirectives, nil
560 }
561
562 //-------------------------------------------------------------------
563 //
564 //-------------------------------------------------------------------
565
566 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
567         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
568
569         c.SubscriptionProcessingStartDelay()
570         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
571
572         var xAppEventInstanceID int64
573         var e2EventInstanceID int64
574         errorInfo := &ErrorInfo{}
575
576         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
577
578         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
579                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
580                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
581
582                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
583                 if trans == nil {
584                         // Send notification to xApp that prosessing of a Subscription Request has failed.
585                         err := fmt.Errorf("Tracking failure")
586                         errorInfo.ErrorCause = err.Error()
587                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
588                         continue
589                 }
590
591                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
592
593                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
594
595                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
596                 trans.Release()
597
598                 if err != nil {
599                         if err.Error() == "TEST: restart event received" {
600                                 // This is just for UT cases. Stop here subscription processing
601                                 return
602                         }
603                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
604                 } else {
605                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
606                         restSubscription.AddMd5Sum(md5sum)
607                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
608                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
609                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
610                 }
611         }
612 }
613
614 //-------------------------------------------------------------------
615 //
616 //------------------------------------------------------------------
617 func (c *Control) SubscriptionProcessingStartDelay() {
618         if c.UTTesting == true {
619                 // This is temporary fix for the UT problem that notification arrives before subscription response
620                 // Correct fix would be to allow notification come before response and process it correctly
621                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
622                 <-time.After(time.Millisecond * 50)
623                 xapp.Logger.Debug("Continuing after delay")
624         }
625 }
626
627 //-------------------------------------------------------------------
628 //
629 //------------------------------------------------------------------
630 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
631         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
632
633         errorInfo := ErrorInfo{}
634
635         err := c.tracker.Track(trans)
636         if err != nil {
637                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
638                 errorInfo.ErrorCause = err.Error()
639                 err = fmt.Errorf("Tracking failure")
640                 return nil, &errorInfo, err
641         }
642
643         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
644         if err != nil {
645                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
646                 return nil, &errorInfo, err
647         }
648
649         //
650         // Wake subs request
651         //
652         subs.OngoingReqCount++
653         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
654         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
655         subs.OngoingReqCount--
656
657         err = nil
658         if event != nil {
659                 switch themsg := event.(type) {
660                 case *e2ap.E2APSubscriptionResponse:
661                         trans.Release()
662                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
663                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
664                                 return themsg, &errorInfo, nil
665                         } else {
666                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
667                                 c.RemoveSubscriptionFromDb(subs)
668                                 err = fmt.Errorf("E2 interface down")
669                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
670                         }
671                 case *e2ap.E2APSubscriptionFailure:
672                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
673                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
674                 case *PackSubscriptionRequestErrortEvent:
675                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
676                         errorInfo = themsg.ErrorInfo
677                 case *SDLWriteErrortEvent:
678                         err = fmt.Errorf("SDL write failure")
679                         errorInfo = themsg.ErrorInfo
680                 case *SubmgrRestartTestEvent:
681                         err = fmt.Errorf("TEST: restart event received")
682                         xapp.Logger.Debug("%s", err)
683                         return nil, &errorInfo, err
684                 default:
685                         err = fmt.Errorf("Unexpected E2 subscription response received")
686                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
687                         break
688                 }
689         } else {
690                 // Timer expiry
691                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
692                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
693                 if subs.PolicyUpdate == true {
694                         return nil, &errorInfo, err
695                 }
696         }
697
698         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
699         // If policy type subscription fails we cannot remove it only internally. Once subscription has been created
700         // successfully, it must be deleted on both sides.
701         if subs.PolicyUpdate == false {
702                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
703         }
704
705         return nil, &errorInfo, err
706 }
707
708 //-------------------------------------------------------------------
709 //
710 //-------------------------------------------------------------------
711 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
712         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
713
714         // Send notification to xApp that prosessing of a Subscription Request has failed.
715         e2EventInstanceID := (int64)(0)
716         if errorInfo.ErrorSource == "" {
717                 // Submgr is default source of error
718                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
719         }
720         resp := &models.SubscriptionResponse{
721                 SubscriptionID: restSubId,
722                 SubscriptionInstances: []*models.SubscriptionInstance{
723                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
724                                 ErrorCause:          errorInfo.ErrorCause,
725                                 ErrorSource:         errorInfo.ErrorSource,
726                                 TimeoutType:         errorInfo.TimeoutType,
727                                 XappEventInstanceID: &xAppEventInstanceID},
728                 },
729         }
730         // Mark REST subscription request processed.
731         restSubscription.SetProcessed(err)
732         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
733         if trans != nil {
734                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
735                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
736         } else {
737                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
738                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
739         }
740
741         c.UpdateCounter(cRestSubFailNotifToXapp)
742         err = xapp.Subscription.Notify(resp, *clientEndpoint)
743         if err != nil {
744                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
745         }
746
747         // E2 is down. Delete completely processed request safely now
748         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
749                 c.registry.DeleteRESTSubscription(restSubId)
750                 c.RemoveRESTSubscriptionFromDb(*restSubId)
751         }
752 }
753
754 //-------------------------------------------------------------------
755 //
756 //-------------------------------------------------------------------
757 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
758         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
759
760         // Store successfully processed InstanceId for deletion
761         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
762         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
763
764         // Send notification to xApp that a Subscription Request has been processed.
765         resp := &models.SubscriptionResponse{
766                 SubscriptionID: restSubId,
767                 SubscriptionInstances: []*models.SubscriptionInstance{
768                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
769                                 ErrorCause:          errorInfo.ErrorCause,
770                                 ErrorSource:         errorInfo.ErrorSource,
771                                 XappEventInstanceID: &xAppEventInstanceID},
772                 },
773         }
774         // Mark REST subscription request processesd.
775         restSubscription.SetProcessed(nil)
776         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
777         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
778                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
779         c.UpdateCounter(cRestSubNotifToXapp)
780         err := xapp.Subscription.Notify(resp, *clientEndpoint)
781         if err != nil {
782                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
783         }
784
785         // E2 is down. Delete completely processed request safely now
786         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
787                 c.registry.DeleteRESTSubscription(restSubId)
788                 c.RemoveRESTSubscriptionFromDb(*restSubId)
789         }
790 }
791
792 //-------------------------------------------------------------------
793 //
794 //-------------------------------------------------------------------
795 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
796
797         c.CntRecvMsg++
798         c.UpdateCounter(cRestSubDelReqFromXapp)
799
800         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
801
802         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
803         if err != nil {
804                 xapp.Logger.Error("%s", err.Error())
805                 if restSubscription == nil {
806                         // Subscription was not found
807                         c.UpdateCounter(cRestSubDelRespToXapp)
808                         return common.UnsubscribeNoContentCode
809                 } else {
810                         if restSubscription.SubReqOngoing == true {
811                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
812                                 xapp.Logger.Error("%s", err.Error())
813                                 c.UpdateCounter(cRestSubDelFailToXapp)
814                                 return common.UnsubscribeBadRequestCode
815                         } else if restSubscription.SubDelReqOngoing == true {
816                                 // Previous request for same restSubId still ongoing
817                                 c.UpdateCounter(cRestSubDelRespToXapp)
818                                 return common.UnsubscribeNoContentCode
819                         }
820                 }
821         }
822
823         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
824         go func() {
825                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
826                 for _, instanceId := range restSubscription.InstanceIds {
827                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
828
829                         if err != nil {
830                                 xapp.Logger.Error("%s", err.Error())
831                         }
832                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
833                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
834                         restSubscription.DeleteE2InstanceId(instanceId)
835                 }
836                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
837                 c.registry.DeleteRESTSubscription(&restSubId)
838                 c.RemoveRESTSubscriptionFromDb(restSubId)
839         }()
840
841         c.UpdateCounter(cRestSubDelRespToXapp)
842         return common.UnsubscribeNoContentCode
843 }
844
845 //-------------------------------------------------------------------
846 //
847 //-------------------------------------------------------------------
848 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
849
850         var xAppEventInstanceID int64
851         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
852         if err != nil {
853                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
854                         restSubId, instanceId, idstring(err, nil))
855                 return xAppEventInstanceID, nil
856         }
857
858         xAppEventInstanceID = int64(subs.ReqId.Id)
859         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
860         if trans == nil {
861                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
862                 xapp.Logger.Error("%s", err.Error())
863         }
864         defer trans.Release()
865
866         err = c.tracker.Track(trans)
867         if err != nil {
868                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
869                 xapp.Logger.Error("%s", err.Error())
870                 return xAppEventInstanceID, &time.ParseError{}
871         }
872         //
873         // Wake subs delete
874         //
875         subs.OngoingDelCount++
876         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
877         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
878         subs.OngoingDelCount--
879
880         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
881
882         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
883
884         return xAppEventInstanceID, nil
885 }
886
887 //-------------------------------------------------------------------
888 //
889 //-------------------------------------------------------------------
890
891 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
892         params := &xapp.RMRParams{}
893         params.Mtype = trans.GetMtype()
894         params.SubId = int(subs.GetReqId().InstanceId)
895         params.Xid = ""
896         params.Meid = subs.GetMeid()
897         params.Src = ""
898         params.PayloadLen = len(trans.Payload.Buf)
899         params.Payload = trans.Payload.Buf
900         params.Mbuf = nil
901         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
902         err = c.SendWithRetry(params, false, 5)
903         if err != nil {
904                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
905         }
906         return err
907 }
908
909 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
910
911         params := &xapp.RMRParams{}
912         params.Mtype = trans.GetMtype()
913         params.SubId = int(subs.GetReqId().InstanceId)
914         params.Xid = trans.GetXid()
915         params.Meid = trans.GetMeid()
916         params.Src = ""
917         params.PayloadLen = len(trans.Payload.Buf)
918         params.Payload = trans.Payload.Buf
919         params.Mbuf = nil
920         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
921         err = c.SendWithRetry(params, false, 5)
922         if err != nil {
923                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
924         }
925         return err
926 }
927
928 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
929         if c.RMRClient == nil {
930                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
931                 xapp.Logger.Error("%s", err.Error())
932                 return
933         }
934         c.CntRecvMsg++
935
936         defer c.RMRClient.Free(msg.Mbuf)
937
938         // xapp-frame might use direct access to c buffer and
939         // when msg.Mbuf is freed, someone might take it into use
940         // and payload data might be invalid inside message handle function
941         //
942         // subscriptions won't load system a lot so there is no
943         // real performance hit by cloning buffer into new go byte slice
944         cPay := append(msg.Payload[:0:0], msg.Payload...)
945         msg.Payload = cPay
946         msg.PayloadLen = len(cPay)
947
948         switch msg.Mtype {
949         case xapp.RIC_SUB_REQ:
950                 go c.handleXAPPSubscriptionRequest(msg)
951         case xapp.RIC_SUB_RESP:
952                 go c.handleE2TSubscriptionResponse(msg)
953         case xapp.RIC_SUB_FAILURE:
954                 go c.handleE2TSubscriptionFailure(msg)
955         case xapp.RIC_SUB_DEL_REQ:
956                 go c.handleXAPPSubscriptionDeleteRequest(msg)
957         case xapp.RIC_SUB_DEL_RESP:
958                 go c.handleE2TSubscriptionDeleteResponse(msg)
959         case xapp.RIC_SUB_DEL_FAILURE:
960                 go c.handleE2TSubscriptionDeleteFailure(msg)
961         case xapp.RIC_SUB_DEL_REQUIRED:
962                 go c.handleE2TSubscriptionDeleteRequired(msg)
963         default:
964                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
965         }
966         return
967 }
968
969 //-------------------------------------------------------------------
970 // handle from XAPP Subscription Request
971 //------------------------------------------------------------------
972 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
973         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
974         c.UpdateCounter(cSubReqFromXapp)
975
976         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
977                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
978                 return
979         }
980
981         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
982         if err != nil {
983                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
984                 return
985         }
986
987         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
988         if trans == nil {
989                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
990                 return
991         }
992         defer trans.Release()
993
994         if err = c.tracker.Track(trans); err != nil {
995                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
996                 return
997         }
998
999         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
1000         if err != nil {
1001                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1002                 return
1003         }
1004
1005         c.wakeSubscriptionRequest(subs, trans)
1006 }
1007
1008 //-------------------------------------------------------------------
1009 // Wake Subscription Request to E2node
1010 //------------------------------------------------------------------
1011 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1012
1013         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1014         if err != nil {
1015                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1016         }
1017         subs.OngoingReqCount++
1018         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1019         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1020         subs.OngoingReqCount--
1021         if event != nil {
1022                 switch themsg := event.(type) {
1023                 case *e2ap.E2APSubscriptionResponse:
1024                         themsg.RequestId.Id = trans.RequestId.Id
1025                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1026                         if err == nil {
1027                                 trans.Release()
1028                                 c.UpdateCounter(cSubRespToXapp)
1029                                 err := c.rmrSendToXapp("", subs, trans)
1030                                 if err != nil {
1031                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1032                                 }
1033                                 return
1034                         }
1035                 case *e2ap.E2APSubscriptionFailure:
1036                         themsg.RequestId.Id = trans.RequestId.Id
1037                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1038                         if err == nil {
1039                                 c.UpdateCounter(cSubFailToXapp)
1040                                 c.rmrSendToXapp("", subs, trans)
1041                         }
1042                 default:
1043                         break
1044                 }
1045         }
1046         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1047 }
1048
1049 //-------------------------------------------------------------------
1050 // handle from XAPP Subscription Delete Request
1051 //------------------------------------------------------------------
1052 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1053         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1054         c.UpdateCounter(cSubDelReqFromXapp)
1055
1056         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1057                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1058                 return
1059         }
1060
1061         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1062         if err != nil {
1063                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1064                 return
1065         }
1066
1067         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1068         if trans == nil {
1069                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1070                 return
1071         }
1072         defer trans.Release()
1073
1074         err = c.tracker.Track(trans)
1075         if err != nil {
1076                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1077                 return
1078         }
1079
1080         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1081         if err != nil {
1082                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1083                 return
1084         }
1085
1086         //
1087         // Wake subs delete
1088         //
1089         subs.OngoingDelCount++
1090         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1091         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1092         subs.OngoingDelCount--
1093
1094         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1095
1096         if subs.NoRespToXapp == true {
1097                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1098                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1099                 return
1100         }
1101
1102         // Whatever is received success, fail or timeout, send successful delete response
1103         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1104         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1105         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1106         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1107         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1108         if err == nil {
1109                 c.UpdateCounter(cSubDelRespToXapp)
1110                 err := c.rmrSendToXapp("", subs, trans)
1111                 if err != nil {
1112                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1113                 }
1114         }
1115 }
1116
1117 //-------------------------------------------------------------------
1118 // SUBS CREATE Handling
1119 //-------------------------------------------------------------------
1120 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1121
1122         var event interface{} = nil
1123         var removeSubscriptionFromDb bool = false
1124         trans := c.tracker.NewSubsTransaction(subs)
1125         subs.WaitTransactionTurn(trans)
1126         defer subs.ReleaseTransactionTurn(trans)
1127         defer trans.Release()
1128
1129         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1130
1131         subRfMsg, valid := subs.GetCachedResponse()
1132         if subRfMsg == nil && valid == true {
1133                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1134                 switch event.(type) {
1135                 case *e2ap.E2APSubscriptionResponse:
1136                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1137                         subs.SubRespRcvd = true
1138                 case *e2ap.E2APSubscriptionFailure:
1139                         if subs.PolicyUpdate == false {
1140                                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1141                         } else {
1142                                 // In policy update case where subscription has already been created successfully in Gnb
1143                                 // we cannot delete subscription internally in submgr
1144                                 subRfMsg, valid = subs.SetCachedResponse(event, true)
1145                         }
1146                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1147                 case *SubmgrRestartTestEvent:
1148                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1149                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1150                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1151                         parentTrans.SendEvent(subRfMsg, 0)
1152                         return
1153                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1154                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1155                 default:
1156                         // Timer expiry
1157                         if subs.PolicyUpdate == false {
1158                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1159                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1160                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1161                         } else {
1162                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1163                         }
1164                 }
1165                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1166         } else {
1167                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1168         }
1169         xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate)
1170         xapp.Logger.Debug("subs: %v", subs)
1171
1172         if valid == false {
1173                 removeSubscriptionFromDb = true
1174         }
1175
1176         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1177         if err != nil {
1178                 valid = false
1179                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1180
1181         }
1182
1183         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1184         if valid == false {
1185                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1186         }
1187
1188         parentTrans.SendEvent(subRfMsg, 0)
1189 }
1190
1191 //-------------------------------------------------------------------
1192 // SUBS DELETE Handling
1193 //-------------------------------------------------------------------
1194
1195 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1196
1197         trans := c.tracker.NewSubsTransaction(subs)
1198         subs.WaitTransactionTurn(trans)
1199         defer subs.ReleaseTransactionTurn(trans)
1200         defer trans.Release()
1201
1202         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1203
1204         subs.mutex.Lock()
1205
1206         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1207                 subs.valid = false
1208                 subs.mutex.Unlock()
1209                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1210         } else {
1211                 subs.mutex.Unlock()
1212         }
1213
1214         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1215         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1216         parentTrans.SendEvent(nil, 0)
1217 }
1218
1219 //-------------------------------------------------------------------
1220 // send to E2T Subscription Request
1221 //-------------------------------------------------------------------
1222 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1223         var err error
1224         var event interface{} = nil
1225         var timedOut bool = false
1226         const ricRequestorId = 123
1227
1228         subReqMsg := subs.SubReqMsg
1229         subReqMsg.RequestId = subs.GetReqId().RequestId
1230         subReqMsg.RequestId.Id = ricRequestorId
1231         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1232         if err != nil {
1233                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1234                 return &PackSubscriptionRequestErrortEvent{
1235                         ErrorInfo{
1236                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1237                                 ErrorCause:  err.Error(),
1238                         },
1239                 }
1240         }
1241
1242         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1243         err = c.WriteSubscriptionToDb(subs)
1244         if err != nil {
1245                 return &SDLWriteErrortEvent{
1246                         ErrorInfo{
1247                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1248                                 ErrorCause:  err.Error(),
1249                         },
1250                 }
1251         }
1252
1253         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1254                 desc := fmt.Sprintf("(retry %d)", retries)
1255                 if retries == 0 {
1256                         c.UpdateCounter(cSubReqToE2)
1257                 } else {
1258                         c.UpdateCounter(cSubReReqToE2)
1259                 }
1260                 err := c.rmrSendToE2T(desc, subs, trans)
1261                 if err != nil {
1262                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1263                 }
1264
1265                 if subs.DoNotWaitSubResp == false {
1266                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1267                         if timedOut {
1268                                 c.UpdateCounter(cSubReqTimerExpiry)
1269                                 continue
1270                         }
1271                 } else {
1272                         // Simulating case where subscrition request has been sent but response has not been received before restart
1273                         event = &SubmgrRestartTestEvent{}
1274                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1275                 }
1276                 break
1277         }
1278         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1279         return event
1280 }
1281
1282 //-------------------------------------------------------------------
1283 // send to E2T Subscription Delete Request
1284 //-------------------------------------------------------------------
1285
1286 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1287         var err error
1288         var event interface{}
1289         var timedOut bool
1290         const ricRequestorId = 123
1291
1292         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1293         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1294         subDelReqMsg.RequestId.Id = ricRequestorId
1295         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1296         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1297         if err != nil {
1298                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1299                 return event
1300         }
1301
1302         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1303                 desc := fmt.Sprintf("(retry %d)", retries)
1304                 if retries == 0 {
1305                         c.UpdateCounter(cSubDelReqToE2)
1306                 } else {
1307                         c.UpdateCounter(cSubDelReReqToE2)
1308                 }
1309                 err := c.rmrSendToE2T(desc, subs, trans)
1310                 if err != nil {
1311                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1312                 }
1313                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1314                 if timedOut {
1315                         c.UpdateCounter(cSubDelReqTimerExpiry)
1316                         continue
1317                 }
1318                 break
1319         }
1320         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1321         return event
1322 }
1323
1324 //-------------------------------------------------------------------
1325 // handle from E2T Subscription Response
1326 //-------------------------------------------------------------------
1327 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1328         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1329         c.UpdateCounter(cSubRespFromE2)
1330
1331         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1332         if err != nil {
1333                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1334                 return
1335         }
1336         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1337         if err != nil {
1338                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1339                 return
1340         }
1341         trans := subs.GetTransaction()
1342         if trans == nil {
1343                 err = fmt.Errorf("Ongoing transaction not found")
1344                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1345                 return
1346         }
1347         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1348         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1349         if sendOk == false {
1350                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1351                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1352         }
1353         return
1354 }
1355
1356 //-------------------------------------------------------------------
1357 // handle from E2T Subscription Failure
1358 //-------------------------------------------------------------------
1359 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1360         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1361         c.UpdateCounter(cSubFailFromE2)
1362         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1363         if err != nil {
1364                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1365                 return
1366         }
1367         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1368         if err != nil {
1369                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1370                 return
1371         }
1372         trans := subs.GetTransaction()
1373         if trans == nil {
1374                 err = fmt.Errorf("Ongoing transaction not found")
1375                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1376                 return
1377         }
1378         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1379         if sendOk == false {
1380                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1381                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1382         }
1383         return
1384 }
1385
1386 //-------------------------------------------------------------------
1387 // handle from E2T Subscription Delete Response
1388 //-------------------------------------------------------------------
1389 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1390         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1391         c.UpdateCounter(cSubDelRespFromE2)
1392         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1393         if err != nil {
1394                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1395                 return
1396         }
1397         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1398         if err != nil {
1399                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1400                 return
1401         }
1402         trans := subs.GetTransaction()
1403         if trans == nil {
1404                 err = fmt.Errorf("Ongoing transaction not found")
1405                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1406                 return
1407         }
1408         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1409         if sendOk == false {
1410                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1411                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1412         }
1413         return
1414 }
1415
1416 //-------------------------------------------------------------------
1417 // handle from E2T Subscription Delete Failure
1418 //-------------------------------------------------------------------
1419 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1420         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1421         c.UpdateCounter(cSubDelFailFromE2)
1422         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1423         if err != nil {
1424                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1425                 return
1426         }
1427         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1428         if err != nil {
1429                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1430                 return
1431         }
1432         trans := subs.GetTransaction()
1433         if trans == nil {
1434                 err = fmt.Errorf("Ongoing transaction not found")
1435                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1436                 return
1437         }
1438         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1439         if sendOk == false {
1440                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1441                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1442         }
1443         return
1444 }
1445
1446 //-------------------------------------------------------------------
1447 //
1448 //-------------------------------------------------------------------
1449 func typeofSubsMessage(v interface{}) string {
1450         if v == nil {
1451                 return "NIL"
1452         }
1453         switch v.(type) {
1454         //case *e2ap.E2APSubscriptionRequest:
1455         //      return "SubReq"
1456         case *e2ap.E2APSubscriptionResponse:
1457                 return "SubResp"
1458         case *e2ap.E2APSubscriptionFailure:
1459                 return "SubFail"
1460         //case *e2ap.E2APSubscriptionDeleteRequest:
1461         //      return "SubDelReq"
1462         case *e2ap.E2APSubscriptionDeleteResponse:
1463                 return "SubDelResp"
1464         case *e2ap.E2APSubscriptionDeleteFailure:
1465                 return "SubDelFail"
1466         default:
1467                 return "Unknown"
1468         }
1469 }
1470
1471 //-------------------------------------------------------------------
1472 //
1473 //-------------------------------------------------------------------
1474 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1475         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1476         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1477         if err != nil {
1478                 xapp.Logger.Error("%v", err)
1479                 return err
1480         }
1481         return nil
1482 }
1483
1484 //-------------------------------------------------------------------
1485 //
1486 //-------------------------------------------------------------------
1487 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1488
1489         if removeSubscriptionFromDb == true {
1490                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1491                 c.RemoveSubscriptionFromDb(subs)
1492         } else {
1493                 // Update is needed for successful response and merge case here
1494                 if subs.RetryFromXapp == false {
1495                         err := c.WriteSubscriptionToDb(subs)
1496                         return err
1497                 }
1498         }
1499         subs.RetryFromXapp = false
1500         return nil
1501 }
1502
1503 //-------------------------------------------------------------------
1504 //
1505 //-------------------------------------------------------------------
1506 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1507         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1508         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1509         if err != nil {
1510                 xapp.Logger.Error("%v", err)
1511         }
1512 }
1513
1514 //-------------------------------------------------------------------
1515 //
1516 //-------------------------------------------------------------------
1517 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1518         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1519         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1520         if err != nil {
1521                 xapp.Logger.Error("%v", err)
1522         }
1523 }
1524
1525 //-------------------------------------------------------------------
1526 //
1527 //-------------------------------------------------------------------
1528 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1529
1530         if removeRestSubscriptionFromDb == true {
1531                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1532                 c.RemoveRESTSubscriptionFromDb(restSubId)
1533         } else {
1534                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1535         }
1536 }
1537
1538 //-------------------------------------------------------------------
1539 //
1540 //-------------------------------------------------------------------
1541 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1542         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1543         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1544         if err != nil {
1545                 xapp.Logger.Error("%v", err)
1546         }
1547 }
1548
1549 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1550
1551         if c.UTTesting == true {
1552                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1553                 c.registry.mutex = new(sync.Mutex)
1554         }
1555
1556         const ricRequestorId = 123
1557         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1558
1559         // Send delete for every endpoint in the subscription
1560         if subs.PolicyUpdate == false {
1561                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1562                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1563                 subDelReqMsg.RequestId.Id = ricRequestorId
1564                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1565                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1566                 if err != nil {
1567                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1568                         return
1569                 }
1570                 for _, endPoint := range subs.EpList.Endpoints {
1571                         params := &xapp.RMRParams{}
1572                         params.Mtype = mType
1573                         params.SubId = int(subs.GetReqId().InstanceId)
1574                         params.Xid = ""
1575                         params.Meid = subs.Meid
1576                         params.Src = endPoint.String()
1577                         params.PayloadLen = len(payload.Buf)
1578                         params.Payload = payload.Buf
1579                         params.Mbuf = nil
1580                         subs.DeleteFromDb = true
1581                         if !e2SubsDelRequired {
1582                                 c.handleXAPPSubscriptionDeleteRequest(params)
1583                         } else {
1584                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1585                         }
1586                 }
1587         }
1588 }
1589
1590 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1591
1592         fmt.Println("CRESTSubscriptionRequest")
1593
1594         if p == nil {
1595                 return
1596         }
1597
1598         if p.SubscriptionID != "" {
1599                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1600         } else {
1601                 fmt.Println("  SubscriptionID = ''")
1602         }
1603
1604         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1605
1606         if p.ClientEndpoint.HTTPPort != nil {
1607                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1608         } else {
1609                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1610         }
1611
1612         if p.ClientEndpoint.RMRPort != nil {
1613                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1614         } else {
1615                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1616         }
1617
1618         if p.Meid != nil {
1619                 fmt.Printf("  Meid = %s\n", *p.Meid)
1620         } else {
1621                 fmt.Println("  Meid = nil")
1622         }
1623
1624         if p.E2SubscriptionDirectives == nil {
1625                 fmt.Println("  E2SubscriptionDirectives = nil")
1626         } else {
1627                 fmt.Println("  E2SubscriptionDirectives")
1628                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1629                         fmt.Println("    E2RetryCount == nil")
1630                 } else {
1631                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1632                 }
1633                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1634                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1635         }
1636         for _, subscriptionDetail := range p.SubscriptionDetails {
1637                 if p.RANFunctionID != nil {
1638                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1639                 } else {
1640                         fmt.Println("  RANFunctionID = nil")
1641                 }
1642                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1643                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1644
1645                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1646                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1647                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1648                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1649
1650                         if actionToBeSetup.SubsequentAction != nil {
1651                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1652                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1653                         } else {
1654                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1655                         }
1656                 }
1657         }
1658 }
1659
1660 //-------------------------------------------------------------------
1661 // handle from E2T Subscription Delete Required
1662 //-------------------------------------------------------------------
1663 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1664         xapp.Logger.Info("MSG from E2T: %s", params.String())
1665         c.UpdateCounter(cSubDelRequFromE2)
1666         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1667         if err != nil {
1668                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1669                 //c.sendE2TErrorIndication(nil)
1670                 return
1671         }
1672         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1673         var subDB = []*Subscription{}
1674         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1675                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1676                 if err != nil {
1677                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1678                         continue
1679                 }
1680                 // Check if Delete Subscription Already triggered
1681                 if subs.OngoingDelCount > 0 {
1682                         continue
1683                 }
1684                 subDB = append(subDB, subs)
1685                 for _, endpoint := range subs.EpList.Endpoints {
1686                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1687                 }
1688                 // Sending Subscription Delete Request to E2T
1689                 //      c.SendSubscriptionDeleteReq(subs, true)
1690         }
1691         for _, subsTobeRemove := range subDB {
1692                 // Sending Subscription Delete Request to E2T
1693                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1694         }
1695 }
1696
1697 //-----------------------------------------------------------------
1698 // Initiate RIC Subscription Delete Request after receiving
1699 // RIC Subscription Delete Required from E2T
1700 //-----------------------------------------------------------------
1701 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1702         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1703         c.UpdateCounter(cSubDelReqToE2)
1704
1705         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1706                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1707                 return
1708         }
1709
1710         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1711         if trans == nil {
1712                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1713                 return
1714         }
1715         defer trans.Release()
1716
1717         err := c.tracker.Track(trans)
1718         if err != nil {
1719                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1720                 return
1721         }
1722
1723         //
1724         // Wake subs delete
1725         //
1726         subs.OngoingDelCount++
1727         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1728         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1729         subs.OngoingDelCount--
1730
1731         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1732
1733         if subs.NoRespToXapp == true {
1734                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1735                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1736                 return
1737         }
1738 }