RIC-1024: ErrorIndication handling at Submgr
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75 var e2IEOrderCheckValue uint8
76
77 type Control struct {
78         *xapp.RMRClient
79         e2ap              *E2ap
80         registry          *Registry
81         tracker           *Tracker
82         restDuplicateCtrl *DuplicateCtrl
83         e2IfState         *E2IfState
84         e2IfStateDb       XappRnibInterface
85         e2SubsDb          Sdlnterface
86         restSubsDb        Sdlnterface
87         CntRecvMsg        uint64
88         ResetTestFlag     bool
89         Counters          map[string]xapp.Counter
90         LoggerLevel       int
91         UTTesting         bool
92 }
93
94 type RMRMeid struct {
95         PlmnID  string
96         EnbID   string
97         RanName string
98 }
99
100 type SubmgrRestartTestEvent struct{}
101 type SubmgrRestartUpEvent struct{}
102 type PackSubscriptionRequestErrortEvent struct {
103         ErrorInfo ErrorInfo
104 }
105
106 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
107         p.ErrorInfo = *errorInfo
108 }
109
110 type SDLWriteErrortEvent struct {
111         ErrorInfo ErrorInfo
112 }
113
114 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
115         s.ErrorInfo = *errorInfo
116 }
117
118 func init() {
119         xapp.Logger.Debug("SUBMGR")
120         viper.AutomaticEnv()
121         viper.SetEnvPrefix("submgr")
122         viper.AllowEmptyEnv(true)
123 }
124
125 func NewControl() *Control {
126
127         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
128         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129
130         registry := new(Registry)
131         registry.Initialize()
132         registry.rtmgrClient = &rtmgrClient
133
134         tracker := new(Tracker)
135         tracker.Init()
136
137         restDuplicateCtrl := new(DuplicateCtrl)
138         restDuplicateCtrl.Init()
139
140         e2IfState := new(E2IfState)
141
142         c := &Control{e2ap: new(E2ap),
143                 registry:          registry,
144                 tracker:           tracker,
145                 restDuplicateCtrl: restDuplicateCtrl,
146                 e2IfState:         e2IfState,
147                 e2IfStateDb:       CreateXappRnibIfInstance(),
148                 e2SubsDb:          CreateSdl(),
149                 restSubsDb:        CreateRESTSdl(),
150                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
151                 LoggerLevel:       1,
152         }
153
154         e2IfState.Init(c)
155         c.ReadConfigParameters("")
156
157         // Register REST handler for testing support
158         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
159         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
160         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161
162         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
163         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164
165         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
167         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
170         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171
172         if readSubsFromDb == "true" {
173                 // Read subscriptions from db
174                 err := c.ReadE2Subscriptions()
175                 if err != nil {
176                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177                 }
178                 err = c.ReadRESTSubscriptions()
179                 if err != nil {
180                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
181                 }
182         }
183
184         go func() {
185                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186                 if err != nil {
187                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
188                 }
189         }()
190         return c
191 }
192
193 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
194         subscriptions, err := c.registry.QueryHandler()
195         if err != nil {
196                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
197         }
198
199         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
200 }
201
202 //-------------------------------------------------------------------
203 //
204 //-------------------------------------------------------------------
205 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
206         xapp.Logger.Debug("RESTQueryHandler() called")
207
208         c.CntRecvMsg++
209
210         return c.registry.QueryHandler()
211 }
212
213 //-------------------------------------------------------------------
214 //
215 //-------------------------------------------------------------------
216 func (c *Control) ReadE2Subscriptions() error {
217         var err error
218         var subIds []uint32
219         var register map[uint32]*Subscription
220         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
221                 xapp.Logger.Debug("Reading E2 subscriptions from db")
222                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223                 if err != nil {
224                         xapp.Logger.Error("%v", err)
225                         <-time.After(1 * time.Second)
226                 } else {
227                         c.registry.subIds = subIds
228                         c.registry.register = register
229                         go c.HandleUncompletedSubscriptions(register)
230                         return nil
231                 }
232         }
233         xapp.Logger.Debug("Continuing without retring")
234         return err
235 }
236
237 //-------------------------------------------------------------------
238 //
239 //-------------------------------------------------------------------
240 func (c *Control) ReadRESTSubscriptions() error {
241
242         xapp.Logger.Debug("ReadRESTSubscriptions()")
243         var err error
244         var restSubscriptions map[string]*RESTSubscription
245         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
246                 xapp.Logger.Debug("Reading REST subscriptions from db")
247                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248                 if err != nil {
249                         xapp.Logger.Error("%v", err)
250                         <-time.After(1 * time.Second)
251                 } else {
252                         // Fix REST subscriptions ongoing status after restart
253                         for restSubId, restSubscription := range restSubscriptions {
254                                 restSubscription.SubReqOngoing = false
255                                 restSubscription.SubDelReqOngoing = false
256                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257                                 if err != nil {
258                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
259                                 }
260                         }
261                         c.registry.restSubscriptions = restSubscriptions
262                         return nil
263                 }
264         }
265         xapp.Logger.Debug("Continuing without retring")
266         return err
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272 func (c *Control) ReadConfigParameters(f string) {
273
274         xapp.Logger.Debug("ReadConfigParameters")
275
276         c.LoggerLevel = int(xapp.Logger.GetLevel())
277         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
278         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279
280         // viper.GetDuration returns nanoseconds
281         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
282         if e2tSubReqTimeout == 0 {
283                 e2tSubReqTimeout = 2000 * 1000000
284                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285         }
286         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287
288         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
289         if e2tSubDelReqTime == 0 {
290                 e2tSubDelReqTime = 2000 * 1000000
291                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292         }
293         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294
295         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
296         if e2tRecvMsgTimeout == 0 {
297                 e2tRecvMsgTimeout = 2000 * 1000000
298                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299         }
300         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301
302         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
303         if e2tMaxSubReqTryCount == 0 {
304                 e2tMaxSubReqTryCount = 1
305                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306         }
307         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308
309         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
310         if e2tMaxSubDelReqTryCount == 0 {
311                 e2tMaxSubDelReqTryCount = 1
312                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313         }
314         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315
316         checkE2State = viper.GetString("controls.checkE2State")
317         if checkE2State == "" {
318                 checkE2State = "true"
319                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320         }
321         xapp.Logger.Debug("checkE2State= %v", checkE2State)
322
323         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
324         if readSubsFromDb == "" {
325                 readSubsFromDb = "true"
326                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327         }
328         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329
330         dbTryCount = viper.GetInt("controls.dbTryCount")
331         if dbTryCount == 0 {
332                 dbTryCount = 200
333                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334         }
335         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336
337         dbRetryForever = viper.GetString("controls.dbRetryForever")
338         if dbRetryForever == "" {
339                 dbRetryForever = "true"
340                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341         }
342         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343
344         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
345         // value 100ms used currently only in unittests.
346         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
347         if waitRouteCleanup_ms == 0 {
348                 waitRouteCleanup_ms = 5000 * 1000000
349                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350         }
351         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
352
353         viper.SetDefault("controls.checkE2IEOrder", 1)
354         e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder"))
355         c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue)
356         xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue)
357 }
358
359 //-------------------------------------------------------------------
360 //
361 //-------------------------------------------------------------------
362 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
363
364         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
365         for subId, subs := range register {
366                 if subs.SubRespRcvd == false {
367                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
368                         if subs.PolicyUpdate == false {
369                                 subs.NoRespToXapp = true
370                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
371                                 c.SendSubscriptionDeleteReq(subs, false)
372                         }
373                 }
374         }
375 }
376
377 func (c *Control) ReadyCB(data interface{}) {
378         if c.RMRClient == nil {
379                 c.RMRClient = xapp.Rmr
380         }
381 }
382
383 func (c *Control) Run() {
384         xapp.SetReadyCB(c.ReadyCB, nil)
385         xapp.AddConfigChangeListener(c.ReadConfigParameters)
386         xapp.Run(c)
387 }
388
389 //-------------------------------------------------------------------
390 //
391 //-------------------------------------------------------------------
392 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
393
394         var restSubId string
395         var restSubscription *RESTSubscription
396         var err error
397
398         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
399         if p.SubscriptionID == "" {
400                 // Subscription does not contain REST subscription Id
401                 if exists {
402                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
403                         if restSubscription != nil {
404                                 // Subscription not found
405                                 restSubId = prevRestSubsId
406                                 if err == nil {
407                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
408                                 } else {
409                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
410                                 }
411                         } else {
412                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
413                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
414                         }
415                 }
416
417                 if restSubscription == nil {
418                         restSubId = ksuid.New().String()
419                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
420                 }
421         } else {
422                 // Subscription contains REST subscription Id
423                 restSubId = p.SubscriptionID
424
425                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
426                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
427                 if err != nil {
428                         // Subscription with id in REST request does not exist
429                         xapp.Logger.Error("%s", err.Error())
430                         c.UpdateCounter(cRestSubFailToXapp)
431                         return nil, "", err
432                 }
433
434                 if !exists {
435                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
436                 } else {
437                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
438                 }
439         }
440
441         return restSubscription, restSubId, nil
442 }
443
444 //-------------------------------------------------------------------
445 //
446 //-------------------------------------------------------------------
447 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
448
449         c.CntRecvMsg++
450         c.UpdateCounter(cRestSubReqFromXapp)
451
452         subResp := models.SubscriptionResponse{}
453         p := params.(*models.SubscriptionParams)
454
455         if c.LoggerLevel > 2 {
456                 c.PrintRESTSubscriptionRequest(p)
457         }
458
459         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
460                 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
461                         xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
462                 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
463                         xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
464                 }
465                 c.UpdateCounter(cRestReqRejDueE2Down)
466                 return nil, common.SubscribeServiceUnavailableCode
467         }
468
469         if p.ClientEndpoint == nil {
470                 err := fmt.Errorf("ClientEndpoint == nil")
471                 xapp.Logger.Error("%v", err)
472                 c.UpdateCounter(cRestSubFailToXapp)
473                 return nil, common.SubscribeBadRequestCode
474         }
475
476         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
477         if err != nil {
478                 xapp.Logger.Error("%s", err)
479                 c.UpdateCounter(cRestSubFailToXapp)
480                 return nil, common.SubscribeBadRequestCode
481         }
482         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
483         if err != nil {
484                 xapp.Logger.Error("%s", err.Error())
485                 c.UpdateCounter(cRestSubFailToXapp)
486                 return nil, common.SubscribeBadRequestCode
487         }
488
489         md5sum, err := CalculateRequestMd5sum(params)
490         if err != nil {
491                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
492         }
493
494         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
495         if err != nil {
496                 xapp.Logger.Error("Subscription with id in REST request does not exist")
497                 return nil, common.SubscribeNotFoundCode
498         }
499
500         subResp.SubscriptionID = &restSubId
501         subReqList := e2ap.SubscriptionRequestList{}
502         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
503         if err != nil {
504                 xapp.Logger.Error("%s", err.Error())
505                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
506                 c.registry.DeleteRESTSubscription(&restSubId)
507                 c.UpdateCounter(cRestSubFailToXapp)
508                 return nil, common.SubscribeBadRequestCode
509         }
510
511         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
512         if duplicate {
513                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
514                 xapp.Logger.Debug("%s", err)
515                 c.registry.DeleteRESTSubscription(&restSubId)
516                 c.UpdateCounter(cRestSubRespToXapp)
517                 return &subResp, common.SubscribeCreatedCode
518         }
519
520         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
521         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
522
523         c.UpdateCounter(cRestSubRespToXapp)
524         return &subResp, common.SubscribeCreatedCode
525 }
526
527 //-------------------------------------------------------------------
528 //
529 //-------------------------------------------------------------------
530 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
531
532         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
533         if p == nil || p.E2SubscriptionDirectives == nil {
534                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
535                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
536                 e2SubscriptionDirectives.CreateRMRRoute = true
537                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
538         } else {
539                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
540                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
541                 } else {
542                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
543                 }
544                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
545                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
546                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
547                 } else {
548                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
549                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
550                         } else {
551                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
552                         }
553                 }
554                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
555         }
556         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
557         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
558         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
559         return e2SubscriptionDirectives, nil
560 }
561
562 //-------------------------------------------------------------------
563 //
564 //-------------------------------------------------------------------
565
566 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
567         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
568
569         c.SubscriptionProcessingStartDelay()
570         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
571
572         var xAppEventInstanceID int64
573         var e2EventInstanceID int64
574         errorInfo := &ErrorInfo{}
575
576         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
577
578         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
579                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
580                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
581
582                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
583                 if trans == nil {
584                         // Send notification to xApp that prosessing of a Subscription Request has failed.
585                         err := fmt.Errorf("Tracking failure")
586                         errorInfo.ErrorCause = err.Error()
587                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
588                         continue
589                 }
590
591                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
592
593                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
594
595                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
596                 trans.Release()
597
598                 if err != nil {
599                         if err.Error() == "TEST: restart event received" {
600                                 // This is just for UT cases. Stop here subscription processing
601                                 return
602                         }
603                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
604                 } else {
605                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
606                         restSubscription.AddMd5Sum(md5sum)
607                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
608                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
609                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
610                 }
611         }
612 }
613
614 //-------------------------------------------------------------------
615 //
616 //------------------------------------------------------------------
617 func (c *Control) SubscriptionProcessingStartDelay() {
618         if c.UTTesting == true {
619                 // This is temporary fix for the UT problem that notification arrives before subscription response
620                 // Correct fix would be to allow notification come before response and process it correctly
621                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
622                 <-time.After(time.Millisecond * 50)
623                 xapp.Logger.Debug("Continuing after delay")
624         }
625 }
626
627 //-------------------------------------------------------------------
628 //
629 //------------------------------------------------------------------
630 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
631         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
632
633         errorInfo := ErrorInfo{}
634
635         err := c.tracker.Track(trans)
636         if err != nil {
637                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
638                 errorInfo.ErrorCause = err.Error()
639                 err = fmt.Errorf("Tracking failure")
640                 return nil, &errorInfo, err
641         }
642
643         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
644         if err != nil {
645                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
646                 return nil, &errorInfo, err
647         }
648
649         //
650         // Wake subs request
651         //
652         subs.OngoingReqCount++
653         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
654         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
655         subs.OngoingReqCount--
656
657         err = nil
658         if event != nil {
659                 switch themsg := event.(type) {
660                 case *e2ap.E2APSubscriptionResponse:
661                         trans.Release()
662                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
663                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
664                                 return themsg, &errorInfo, nil
665                         } else {
666                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
667                                 c.RemoveSubscriptionFromDb(subs)
668                                 err = fmt.Errorf("E2 interface down")
669                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
670                         }
671                 case *e2ap.E2APSubscriptionFailure:
672                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
673                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
674                 case *e2ap.E2APErrorIndication:
675                         err = fmt.Errorf("RICE2RanErrorIndication. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
676                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
677                 case *PackSubscriptionRequestErrortEvent:
678                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
679                         errorInfo = themsg.ErrorInfo
680                 case *SDLWriteErrortEvent:
681                         err = fmt.Errorf("SDL write failure")
682                         errorInfo = themsg.ErrorInfo
683                 case *SubmgrRestartTestEvent:
684                         err = fmt.Errorf("TEST: restart event received")
685                         xapp.Logger.Debug("%s", err)
686                         return nil, &errorInfo, err
687                 default:
688                         err = fmt.Errorf("Unexpected E2 subscription response received")
689                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
690                         break
691                 }
692         } else {
693                 // Timer expiry
694                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
695                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
696                 if subs.PolicyUpdate == true {
697                         return nil, &errorInfo, err
698                 }
699         }
700
701         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
702         // If policy type subscription fails we cannot remove it only internally. Once subscription has been created
703         // successfully, it must be deleted on both sides.
704         if subs.PolicyUpdate == false {
705                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
706         }
707
708         return nil, &errorInfo, err
709 }
710
711 //-------------------------------------------------------------------
712 //
713 //-------------------------------------------------------------------
714 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
715         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
716
717         // Send notification to xApp that prosessing of a Subscription Request has failed.
718         e2EventInstanceID := (int64)(0)
719         if errorInfo.ErrorSource == "" {
720                 // Submgr is default source of error
721                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
722         }
723         resp := &models.SubscriptionResponse{
724                 SubscriptionID: restSubId,
725                 SubscriptionInstances: []*models.SubscriptionInstance{
726                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
727                                 ErrorCause:          errorInfo.ErrorCause,
728                                 ErrorSource:         errorInfo.ErrorSource,
729                                 TimeoutType:         errorInfo.TimeoutType,
730                                 XappEventInstanceID: &xAppEventInstanceID},
731                 },
732         }
733         // Mark REST subscription request processed.
734         restSubscription.SetProcessed(err)
735         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
736         if trans != nil {
737                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
738                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
739         } else {
740                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
741                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
742         }
743
744         c.UpdateCounter(cRestSubFailNotifToXapp)
745         err = xapp.Subscription.Notify(resp, *clientEndpoint)
746         if err != nil {
747                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
748         }
749
750         // E2 is down. Delete completely processed request safely now
751         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
752                 c.registry.DeleteRESTSubscription(restSubId)
753                 c.RemoveRESTSubscriptionFromDb(*restSubId)
754         }
755 }
756
757 //-------------------------------------------------------------------
758 //
759 //-------------------------------------------------------------------
760 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
761         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
762
763         // Store successfully processed InstanceId for deletion
764         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
765         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
766
767         // Send notification to xApp that a Subscription Request has been processed.
768         resp := &models.SubscriptionResponse{
769                 SubscriptionID: restSubId,
770                 SubscriptionInstances: []*models.SubscriptionInstance{
771                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
772                                 ErrorCause:          errorInfo.ErrorCause,
773                                 ErrorSource:         errorInfo.ErrorSource,
774                                 XappEventInstanceID: &xAppEventInstanceID},
775                 },
776         }
777         // Mark REST subscription request processesd.
778         restSubscription.SetProcessed(nil)
779         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
780         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
781                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
782         c.UpdateCounter(cRestSubNotifToXapp)
783         err := xapp.Subscription.Notify(resp, *clientEndpoint)
784         if err != nil {
785                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
786         }
787
788         // E2 is down. Delete completely processed request safely now
789         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
790                 c.registry.DeleteRESTSubscription(restSubId)
791                 c.RemoveRESTSubscriptionFromDb(*restSubId)
792         }
793 }
794
795 //-------------------------------------------------------------------
796 //
797 //-------------------------------------------------------------------
798 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
799
800         c.CntRecvMsg++
801         c.UpdateCounter(cRestSubDelReqFromXapp)
802
803         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
804
805         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
806         if err != nil {
807                 xapp.Logger.Error("%s", err.Error())
808                 if restSubscription == nil {
809                         // Subscription was not found
810                         c.UpdateCounter(cRestSubDelRespToXapp)
811                         return common.UnsubscribeNoContentCode
812                 } else {
813                         if restSubscription.SubReqOngoing == true {
814                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
815                                 xapp.Logger.Error("%s", err.Error())
816                                 c.UpdateCounter(cRestSubDelFailToXapp)
817                                 return common.UnsubscribeBadRequestCode
818                         } else if restSubscription.SubDelReqOngoing == true {
819                                 // Previous request for same restSubId still ongoing
820                                 c.UpdateCounter(cRestSubDelRespToXapp)
821                                 return common.UnsubscribeNoContentCode
822                         }
823                 }
824         }
825
826         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
827         go func() {
828                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
829                 for _, instanceId := range restSubscription.InstanceIds {
830                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
831
832                         if err != nil {
833                                 xapp.Logger.Error("%s", err.Error())
834                         }
835                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
836                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
837                         restSubscription.DeleteE2InstanceId(instanceId)
838                 }
839                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
840                 c.registry.DeleteRESTSubscription(&restSubId)
841                 c.RemoveRESTSubscriptionFromDb(restSubId)
842         }()
843
844         c.UpdateCounter(cRestSubDelRespToXapp)
845         return common.UnsubscribeNoContentCode
846 }
847
848 //-------------------------------------------------------------------
849 //
850 //-------------------------------------------------------------------
851 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
852
853         var xAppEventInstanceID int64
854         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
855         if err != nil {
856                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
857                         restSubId, instanceId, idstring(err, nil))
858                 return xAppEventInstanceID, nil
859         }
860
861         xAppEventInstanceID = int64(subs.ReqId.Id)
862         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
863         if trans == nil {
864                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
865                 xapp.Logger.Error("%s", err.Error())
866         }
867         defer trans.Release()
868
869         err = c.tracker.Track(trans)
870         if err != nil {
871                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
872                 xapp.Logger.Error("%s", err.Error())
873                 return xAppEventInstanceID, &time.ParseError{}
874         }
875         //
876         // Wake subs delete
877         //
878         subs.OngoingDelCount++
879         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
880         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
881         subs.OngoingDelCount--
882
883         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
884
885         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
886
887         return xAppEventInstanceID, nil
888 }
889
890 //-------------------------------------------------------------------
891 //
892 //-------------------------------------------------------------------
893
894 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
895         params := &xapp.RMRParams{}
896         params.Mtype = trans.GetMtype()
897         params.SubId = int(subs.GetReqId().InstanceId)
898         params.Xid = ""
899         params.Meid = subs.GetMeid()
900         params.Src = ""
901         params.PayloadLen = len(trans.Payload.Buf)
902         params.Payload = trans.Payload.Buf
903         params.Mbuf = nil
904         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
905         err = c.SendWithRetry(params, false, 5)
906         if err != nil {
907                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
908         }
909         return err
910 }
911
912 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
913
914         params := &xapp.RMRParams{}
915         params.Mtype = trans.GetMtype()
916         params.SubId = int(subs.GetReqId().InstanceId)
917         params.Xid = trans.GetXid()
918         params.Meid = trans.GetMeid()
919         params.Src = ""
920         params.PayloadLen = len(trans.Payload.Buf)
921         params.Payload = trans.Payload.Buf
922         params.Mbuf = nil
923         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
924         err = c.SendWithRetry(params, false, 5)
925         if err != nil {
926                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
927         }
928         return err
929 }
930
931 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
932         if c.RMRClient == nil {
933                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
934                 xapp.Logger.Error("%s", err.Error())
935                 return
936         }
937         c.CntRecvMsg++
938
939         defer c.RMRClient.Free(msg.Mbuf)
940
941         // xapp-frame might use direct access to c buffer and
942         // when msg.Mbuf is freed, someone might take it into use
943         // and payload data might be invalid inside message handle function
944         //
945         // subscriptions won't load system a lot so there is no
946         // real performance hit by cloning buffer into new go byte slice
947         cPay := append(msg.Payload[:0:0], msg.Payload...)
948         msg.Payload = cPay
949         msg.PayloadLen = len(cPay)
950
951         switch msg.Mtype {
952         case xapp.RIC_SUB_REQ:
953                 go c.handleXAPPSubscriptionRequest(msg)
954         case xapp.RIC_SUB_RESP:
955                 go c.handleE2TSubscriptionResponse(msg)
956         case xapp.RIC_SUB_FAILURE:
957                 go c.handleE2TSubscriptionFailure(msg)
958         case xapp.RIC_SUB_DEL_REQ:
959                 go c.handleXAPPSubscriptionDeleteRequest(msg)
960         case xapp.RIC_SUB_DEL_RESP:
961                 go c.handleE2TSubscriptionDeleteResponse(msg)
962         case xapp.RIC_SUB_DEL_FAILURE:
963                 go c.handleE2TSubscriptionDeleteFailure(msg)
964         case xapp.RIC_SUB_DEL_REQUIRED:
965                 go c.handleE2TSubscriptionDeleteRequired(msg)
966         case xapp.RIC_E2_RAN_ERROR_INDICATION:
967                 go c.handleE2RanErrorIndication(msg)
968         default:
969                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
970         }
971         return
972 }
973
974 //-------------------------------------------------------------------
975 // handle from XAPP Subscription Request
976 //------------------------------------------------------------------
977 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
978         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
979         c.UpdateCounter(cSubReqFromXapp)
980
981         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
982                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
983                 return
984         }
985
986         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
987         if err != nil {
988                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
989                 return
990         }
991
992         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
993         if trans == nil {
994                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
995                 return
996         }
997         defer trans.Release()
998
999         if err = c.tracker.Track(trans); err != nil {
1000                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1001                 return
1002         }
1003
1004         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
1005         if err != nil {
1006                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1007                 return
1008         }
1009
1010         c.wakeSubscriptionRequest(subs, trans)
1011 }
1012
1013 //-------------------------------------------------------------------
1014 // Wake Subscription Request to E2node
1015 //------------------------------------------------------------------
1016 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1017
1018         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1019         if err != nil {
1020                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1021         }
1022         subs.OngoingReqCount++
1023         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1024         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1025         subs.OngoingReqCount--
1026         if event != nil {
1027                 switch themsg := event.(type) {
1028                 case *e2ap.E2APSubscriptionResponse:
1029                         themsg.RequestId.Id = trans.RequestId.Id
1030                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1031                         if err == nil {
1032                                 trans.Release()
1033                                 c.UpdateCounter(cSubRespToXapp)
1034                                 err := c.rmrSendToXapp("", subs, trans)
1035                                 if err != nil {
1036                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1037                                 }
1038                                 return
1039                         }
1040                 case *e2ap.E2APSubscriptionFailure:
1041                         themsg.RequestId.Id = trans.RequestId.Id
1042                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1043                         if err == nil {
1044                                 c.UpdateCounter(cSubFailToXapp)
1045                                 c.rmrSendToXapp("", subs, trans)
1046                         }
1047                 default:
1048                         break
1049                 }
1050         }
1051         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1052 }
1053
1054 //-------------------------------------------------------------------
1055 // handle from XAPP Subscription Delete Request
1056 //------------------------------------------------------------------
1057 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1058         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1059         c.UpdateCounter(cSubDelReqFromXapp)
1060
1061         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1062                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1063                 return
1064         }
1065
1066         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1067         if err != nil {
1068                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1069                 return
1070         }
1071
1072         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1073         if trans == nil {
1074                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1075                 return
1076         }
1077         defer trans.Release()
1078
1079         err = c.tracker.Track(trans)
1080         if err != nil {
1081                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1082                 return
1083         }
1084
1085         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1086         if err != nil {
1087                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1088                 return
1089         }
1090
1091         //
1092         // Wake subs delete
1093         //
1094         subs.OngoingDelCount++
1095         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1096         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1097         subs.OngoingDelCount--
1098
1099         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1100
1101         if subs.NoRespToXapp == true {
1102                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1103                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1104                 return
1105         }
1106
1107         // Whatever is received success, fail or timeout, send successful delete response
1108         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1109         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1110         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1111         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1112         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1113         if err == nil {
1114                 c.UpdateCounter(cSubDelRespToXapp)
1115                 err := c.rmrSendToXapp("", subs, trans)
1116                 if err != nil {
1117                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1118                 }
1119         }
1120 }
1121
1122 //-------------------------------------------------------------------
1123 // SUBS CREATE Handling
1124 //-------------------------------------------------------------------
1125 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1126
1127         var event interface{} = nil
1128         var removeSubscriptionFromDb bool = false
1129         trans := c.tracker.NewSubsTransaction(subs)
1130         subs.WaitTransactionTurn(trans)
1131         defer subs.ReleaseTransactionTurn(trans)
1132         defer trans.Release()
1133
1134         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1135
1136         subRfMsg, valid := subs.GetCachedResponse()
1137         if subRfMsg == nil && valid == true {
1138                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1139                 switch event.(type) {
1140                 case *e2ap.E2APSubscriptionResponse:
1141                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1142                         subs.SubRespRcvd = true
1143                 case *e2ap.E2APSubscriptionFailure:
1144                         if subs.PolicyUpdate == false {
1145                                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1146                         } else {
1147                                 // In policy update case where subscription has already been created successfully in Gnb
1148                                 // we cannot delete subscription internally in submgr
1149                                 subRfMsg, valid = subs.SetCachedResponse(event, true)
1150                         }
1151                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1152                 case *e2ap.E2APErrorIndication:
1153                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1154                 case *SubmgrRestartTestEvent:
1155                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1156                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1157                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1158                         parentTrans.SendEvent(subRfMsg, 0)
1159                         return
1160                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1161                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1162                 default:
1163                         // Timer expiry
1164                         if subs.PolicyUpdate == false {
1165                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1166                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1167                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1168                         } else {
1169                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1170                         }
1171                 }
1172                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1173         } else {
1174                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1175         }
1176         xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate)
1177         xapp.Logger.Debug("subs: %v", subs)
1178
1179         if valid == false {
1180                 removeSubscriptionFromDb = true
1181         }
1182
1183         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1184         if err != nil {
1185                 valid = false
1186                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1187
1188         }
1189
1190         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1191         if valid == false {
1192                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1193         }
1194
1195         parentTrans.SendEvent(subRfMsg, 0)
1196 }
1197
1198 //-------------------------------------------------------------------
1199 // SUBS DELETE Handling
1200 //-------------------------------------------------------------------
1201
1202 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1203
1204         trans := c.tracker.NewSubsTransaction(subs)
1205         subs.WaitTransactionTurn(trans)
1206         defer subs.ReleaseTransactionTurn(trans)
1207         defer trans.Release()
1208
1209         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1210
1211         subs.mutex.Lock()
1212
1213         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1214                 subs.valid = false
1215                 subs.mutex.Unlock()
1216                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1217         } else {
1218                 subs.mutex.Unlock()
1219         }
1220
1221         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1222         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1223         parentTrans.SendEvent(nil, 0)
1224 }
1225
1226 //-------------------------------------------------------------------
1227 // send to E2T Subscription Request
1228 //-------------------------------------------------------------------
1229 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1230         var err error
1231         var event interface{} = nil
1232         var timedOut bool = false
1233         const ricRequestorId = 123
1234
1235         subReqMsg := subs.SubReqMsg
1236         subReqMsg.RequestId = subs.GetReqId().RequestId
1237         subReqMsg.RequestId.Id = ricRequestorId
1238         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1239         if err != nil {
1240                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1241                 return &PackSubscriptionRequestErrortEvent{
1242                         ErrorInfo{
1243                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1244                                 ErrorCause:  err.Error(),
1245                         },
1246                 }
1247         }
1248
1249         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1250         err = c.WriteSubscriptionToDb(subs)
1251         if err != nil {
1252                 return &SDLWriteErrortEvent{
1253                         ErrorInfo{
1254                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1255                                 ErrorCause:  err.Error(),
1256                         },
1257                 }
1258         }
1259
1260         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1261                 desc := fmt.Sprintf("(retry %d)", retries)
1262                 if retries == 0 {
1263                         c.UpdateCounter(cSubReqToE2)
1264                 } else {
1265                         c.UpdateCounter(cSubReReqToE2)
1266                 }
1267                 err := c.rmrSendToE2T(desc, subs, trans)
1268                 if err != nil {
1269                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1270                 }
1271
1272                 if subs.DoNotWaitSubResp == false {
1273                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1274                         if timedOut {
1275                                 c.UpdateCounter(cSubReqTimerExpiry)
1276                                 continue
1277                         }
1278                 } else {
1279                         // Simulating case where subscrition request has been sent but response has not been received before restart
1280                         event = &SubmgrRestartTestEvent{}
1281                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1282                 }
1283                 break
1284         }
1285         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1286         return event
1287 }
1288
1289 //-------------------------------------------------------------------
1290 // send to E2T Subscription Delete Request
1291 //-------------------------------------------------------------------
1292
1293 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1294         var err error
1295         var event interface{}
1296         var timedOut bool
1297         const ricRequestorId = 123
1298
1299         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1300         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1301         subDelReqMsg.RequestId.Id = ricRequestorId
1302         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1303         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1304         if err != nil {
1305                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1306                 return event
1307         }
1308
1309         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1310                 desc := fmt.Sprintf("(retry %d)", retries)
1311                 if retries == 0 {
1312                         c.UpdateCounter(cSubDelReqToE2)
1313                 } else {
1314                         c.UpdateCounter(cSubDelReReqToE2)
1315                 }
1316                 err := c.rmrSendToE2T(desc, subs, trans)
1317                 if err != nil {
1318                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1319                 }
1320                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1321                 if timedOut {
1322                         c.UpdateCounter(cSubDelReqTimerExpiry)
1323                         continue
1324                 }
1325                 break
1326         }
1327         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1328         return event
1329 }
1330
1331 //-------------------------------------------------------------------
1332 // handle from E2T Subscription Response
1333 //-------------------------------------------------------------------
1334 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1335         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1336         c.UpdateCounter(cSubRespFromE2)
1337
1338         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1339         if err != nil {
1340                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1341                 return
1342         }
1343         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1344         if err != nil {
1345                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1346                 return
1347         }
1348         trans := subs.GetTransaction()
1349         if trans == nil {
1350                 err = fmt.Errorf("Ongoing transaction not found")
1351                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1352                 return
1353         }
1354         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1355         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1356         if sendOk == false {
1357                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1358                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1359         }
1360         return
1361 }
1362
1363 //-------------------------------------------------------------------
1364 // handle from E2T Subscription Failure
1365 //-------------------------------------------------------------------
1366 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1367         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1368         c.UpdateCounter(cSubFailFromE2)
1369         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1370         if err != nil {
1371                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1372                 return
1373         }
1374         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1375         if err != nil {
1376                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1377                 return
1378         }
1379         trans := subs.GetTransaction()
1380         if trans == nil {
1381                 err = fmt.Errorf("Ongoing transaction not found")
1382                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1383                 return
1384         }
1385         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1386         if sendOk == false {
1387                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1388                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1389         }
1390         return
1391 }
1392
1393 //-------------------------------------------------------------------
1394 // handle from E2T Subscription Delete Response
1395 //-------------------------------------------------------------------
1396 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1397         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1398         c.UpdateCounter(cSubDelRespFromE2)
1399         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1400         if err != nil {
1401                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1402                 return
1403         }
1404         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1405         if err != nil {
1406                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1407                 return
1408         }
1409         trans := subs.GetTransaction()
1410         if trans == nil {
1411                 err = fmt.Errorf("Ongoing transaction not found")
1412                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1413                 return
1414         }
1415         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1416         if sendOk == false {
1417                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1418                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1419         }
1420         return
1421 }
1422
1423 //-------------------------------------------------------------------
1424 // handle from E2T Subscription Delete Failure
1425 //-------------------------------------------------------------------
1426 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1427         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1428         c.UpdateCounter(cSubDelFailFromE2)
1429         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1430         if err != nil {
1431                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1432                 return
1433         }
1434         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1435         if err != nil {
1436                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1437                 return
1438         }
1439         trans := subs.GetTransaction()
1440         if trans == nil {
1441                 err = fmt.Errorf("Ongoing transaction not found")
1442                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1443                 return
1444         }
1445         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1446         if sendOk == false {
1447                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1448                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1449         }
1450         return
1451 }
1452
1453 //-------------------------------------------------------------------
1454 //
1455 //-------------------------------------------------------------------
1456 func typeofSubsMessage(v interface{}) string {
1457         if v == nil {
1458                 return "NIL"
1459         }
1460         switch v.(type) {
1461         //case *e2ap.E2APSubscriptionRequest:
1462         //      return "SubReq"
1463         case *e2ap.E2APSubscriptionResponse:
1464                 return "SubResp"
1465         case *e2ap.E2APSubscriptionFailure:
1466                 return "SubFail"
1467         //case *e2ap.E2APSubscriptionDeleteRequest:
1468         //      return "SubDelReq"
1469         case *e2ap.E2APSubscriptionDeleteResponse:
1470                 return "SubDelResp"
1471         case *e2ap.E2APSubscriptionDeleteFailure:
1472                 return "SubDelFail"
1473         case *e2ap.E2APErrorIndication:
1474                 return "RicE2RanErrorIndication"
1475         default:
1476                 return "Unknown"
1477         }
1478 }
1479
1480 //-------------------------------------------------------------------
1481 //
1482 //-------------------------------------------------------------------
1483 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1484         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1485         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1486         if err != nil {
1487                 xapp.Logger.Error("%v", err)
1488                 return err
1489         }
1490         return nil
1491 }
1492
1493 //-------------------------------------------------------------------
1494 //
1495 //-------------------------------------------------------------------
1496 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1497
1498         if removeSubscriptionFromDb == true {
1499                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1500                 c.RemoveSubscriptionFromDb(subs)
1501         } else {
1502                 // Update is needed for successful response and merge case here
1503                 if subs.RetryFromXapp == false {
1504                         err := c.WriteSubscriptionToDb(subs)
1505                         return err
1506                 }
1507         }
1508         subs.RetryFromXapp = false
1509         return nil
1510 }
1511
1512 //-------------------------------------------------------------------
1513 //
1514 //-------------------------------------------------------------------
1515 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1516         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1517         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1518         if err != nil {
1519                 xapp.Logger.Error("%v", err)
1520         }
1521 }
1522
1523 //-------------------------------------------------------------------
1524 //
1525 //-------------------------------------------------------------------
1526 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1527         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1528         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1529         if err != nil {
1530                 xapp.Logger.Error("%v", err)
1531         }
1532 }
1533
1534 //-------------------------------------------------------------------
1535 //
1536 //-------------------------------------------------------------------
1537 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1538
1539         if removeRestSubscriptionFromDb == true {
1540                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1541                 c.RemoveRESTSubscriptionFromDb(restSubId)
1542         } else {
1543                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1544         }
1545 }
1546
1547 //-------------------------------------------------------------------
1548 //
1549 //-------------------------------------------------------------------
1550 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1551         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1552         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1553         if err != nil {
1554                 xapp.Logger.Error("%v", err)
1555         }
1556 }
1557
1558 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1559
1560         if c.UTTesting == true {
1561                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1562                 c.registry.mutex = new(sync.Mutex)
1563         }
1564
1565         const ricRequestorId = 123
1566         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1567
1568         // Send delete for every endpoint in the subscription
1569         if subs.PolicyUpdate == false {
1570                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1571                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1572                 subDelReqMsg.RequestId.Id = ricRequestorId
1573                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1574                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1575                 if err != nil {
1576                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1577                         return
1578                 }
1579                 for _, endPoint := range subs.EpList.Endpoints {
1580                         params := &xapp.RMRParams{}
1581                         params.Mtype = mType
1582                         params.SubId = int(subs.GetReqId().InstanceId)
1583                         params.Xid = ""
1584                         params.Meid = subs.Meid
1585                         params.Src = endPoint.String()
1586                         params.PayloadLen = len(payload.Buf)
1587                         params.Payload = payload.Buf
1588                         params.Mbuf = nil
1589                         subs.DeleteFromDb = true
1590                         if !e2SubsDelRequired {
1591                                 c.handleXAPPSubscriptionDeleteRequest(params)
1592                         } else {
1593                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1594                         }
1595                 }
1596         }
1597 }
1598
1599 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1600
1601         fmt.Println("CRESTSubscriptionRequest")
1602
1603         if p == nil {
1604                 return
1605         }
1606
1607         if p.SubscriptionID != "" {
1608                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1609         } else {
1610                 fmt.Println("  SubscriptionID = ''")
1611         }
1612
1613         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1614
1615         if p.ClientEndpoint.HTTPPort != nil {
1616                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1617         } else {
1618                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1619         }
1620
1621         if p.ClientEndpoint.RMRPort != nil {
1622                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1623         } else {
1624                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1625         }
1626
1627         if p.Meid != nil {
1628                 fmt.Printf("  Meid = %s\n", *p.Meid)
1629         } else {
1630                 fmt.Println("  Meid = nil")
1631         }
1632
1633         if p.E2SubscriptionDirectives == nil {
1634                 fmt.Println("  E2SubscriptionDirectives = nil")
1635         } else {
1636                 fmt.Println("  E2SubscriptionDirectives")
1637                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1638                         fmt.Println("    E2RetryCount == nil")
1639                 } else {
1640                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1641                 }
1642                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1643                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1644         }
1645         for _, subscriptionDetail := range p.SubscriptionDetails {
1646                 if p.RANFunctionID != nil {
1647                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1648                 } else {
1649                         fmt.Println("  RANFunctionID = nil")
1650                 }
1651                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1652                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1653
1654                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1655                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1656                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1657                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1658
1659                         if actionToBeSetup.SubsequentAction != nil {
1660                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1661                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1662                         } else {
1663                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1664                         }
1665                 }
1666         }
1667 }
1668
1669 //-------------------------------------------------------------------
1670 // handle from E2T Subscription Delete Required
1671 //-------------------------------------------------------------------
1672 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1673         xapp.Logger.Info("MSG from E2T: %s", params.String())
1674         c.UpdateCounter(cSubDelRequFromE2)
1675         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1676         if err != nil {
1677                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1678                 //c.sendE2TErrorIndication(nil)
1679                 return
1680         }
1681         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1682         var subDB = []*Subscription{}
1683         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1684                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1685                 if err != nil {
1686                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1687                         continue
1688                 }
1689                 // Check if Delete Subscription Already triggered
1690                 if subs.OngoingDelCount > 0 {
1691                         continue
1692                 }
1693                 subDB = append(subDB, subs)
1694                 for _, endpoint := range subs.EpList.Endpoints {
1695                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1696                 }
1697                 // Sending Subscription Delete Request to E2T
1698                 //      c.SendSubscriptionDeleteReq(subs, true)
1699         }
1700         for _, subsTobeRemove := range subDB {
1701                 // Sending Subscription Delete Request to E2T
1702                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1703         }
1704 }
1705
1706 //-------------------------------------------------------------------
1707 // handle from E2T Error Indication
1708 //-------------------------------------------------------------------
1709 func (c *Control) handleE2RanErrorIndication(params *xapp.RMRParams) {
1710         xapp.Logger.Debug("Received ErrorIndication from E2T")
1711         xapp.Logger.Error("MSG from E2T: %s", params.String())
1712
1713         c.UpdateCounter(cErrorIndicationFromE2Node)
1714
1715         errorIndication, err := c.e2ap.UnpackErrorIndicationFromE2Node(params.Payload)
1716         if err != nil {
1717                 xapp.Logger.Error("MSG-ErrorIndication From E2Node %s", idstring(err, params))
1718                 return
1719         }
1720
1721         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{errorIndication.RequestId.InstanceId})
1722         if err != nil {
1723                 xapp.Logger.Error("Unknown/Invalid InstanceId from E2Node. Dropping ErrorIndication from E2Node.")
1724                 xapp.Logger.Error("MSG-ErrorIndication From E2Node: %s", idstring(err, params))
1725                 if errorIndication.IsCausePresent == true {
1726                         xapp.Logger.Debug("Cause is present in received ErrorIndication Message - Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1727                 }
1728                 return
1729         }
1730
1731         trans := subs.GetTransaction()
1732         if trans == nil {
1733                 err = fmt.Errorf("Ongoing transaction not found")
1734                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1735                 if errorIndication.IsCausePresent == true {
1736                         xapp.Logger.Debug("Cause is present in received ErrorIndication Message - Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1737                 }
1738                 return
1739         }
1740
1741         if errorIndication.IsCausePresent == true {
1742                 xapp.Logger.Debug("Cause present in ErrorIndication is: Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1743                 if (errorIndication.Cause.Content == e2ap.E2AP_CauseContent_Misc && (errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseMisc_hardware_failure ||
1744                         errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseMisc_om_intervention)) ||
1745                         (errorIndication.Cause.Content == e2ap.E2AP_CauseContent_E2node && errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseE2node_e2node_component_unknown) {
1746
1747                         sendOk, timedOut := trans.SendEvent(errorIndication, e2tRecvMsgTimeout)
1748                         if sendOk == false {
1749                                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1750                                 xapp.Logger.Error("MSG-ErrorIndication: %s", idstring(err, trans, subs))
1751                         }
1752                 } else {
1753                         xapp.Logger.Debug("Cause present in ErrorIndication is not serious problem. Retrying SubReq Procedure")
1754                 }
1755         }
1756         return
1757 }
1758
1759 //-----------------------------------------------------------------
1760 // Initiate RIC Subscription Delete Request after receiving
1761 // RIC Subscription Delete Required from E2T
1762 //-----------------------------------------------------------------
1763 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1764         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1765         c.UpdateCounter(cSubDelReqToE2)
1766
1767         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1768                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1769                 return
1770         }
1771
1772         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1773         if trans == nil {
1774                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1775                 return
1776         }
1777         defer trans.Release()
1778
1779         err := c.tracker.Track(trans)
1780         if err != nil {
1781                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1782                 return
1783         }
1784
1785         //
1786         // Wake subs delete
1787         //
1788         subs.OngoingDelCount++
1789         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1790         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1791         subs.OngoingDelCount--
1792
1793         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1794
1795         if subs.NoRespToXapp == true {
1796                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1797                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1798                 return
1799         }
1800 }