Support for handling unordered IEs in RIC Subscription Response messgae
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75 var e2IEOrderCheckValue uint8
76
77 type Control struct {
78         *xapp.RMRClient
79         e2ap              *E2ap
80         registry          *Registry
81         tracker           *Tracker
82         restDuplicateCtrl *DuplicateCtrl
83         e2IfState         *E2IfState
84         e2IfStateDb       XappRnibInterface
85         e2SubsDb          Sdlnterface
86         restSubsDb        Sdlnterface
87         CntRecvMsg        uint64
88         ResetTestFlag     bool
89         Counters          map[string]xapp.Counter
90         LoggerLevel       int
91         UTTesting         bool
92 }
93
94 type RMRMeid struct {
95         PlmnID  string
96         EnbID   string
97         RanName string
98 }
99
100 type SubmgrRestartTestEvent struct{}
101 type SubmgrRestartUpEvent struct{}
102 type PackSubscriptionRequestErrortEvent struct {
103         ErrorInfo ErrorInfo
104 }
105
106 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
107         p.ErrorInfo = *errorInfo
108 }
109
110 type SDLWriteErrortEvent struct {
111         ErrorInfo ErrorInfo
112 }
113
114 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
115         s.ErrorInfo = *errorInfo
116 }
117
118 func init() {
119         xapp.Logger.Debug("SUBMGR")
120         viper.AutomaticEnv()
121         viper.SetEnvPrefix("submgr")
122         viper.AllowEmptyEnv(true)
123 }
124
125 func NewControl() *Control {
126
127         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
128         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129
130         registry := new(Registry)
131         registry.Initialize()
132         registry.rtmgrClient = &rtmgrClient
133
134         tracker := new(Tracker)
135         tracker.Init()
136
137         restDuplicateCtrl := new(DuplicateCtrl)
138         restDuplicateCtrl.Init()
139
140         e2IfState := new(E2IfState)
141
142         c := &Control{e2ap: new(E2ap),
143                 registry:          registry,
144                 tracker:           tracker,
145                 restDuplicateCtrl: restDuplicateCtrl,
146                 e2IfState:         e2IfState,
147                 e2IfStateDb:       CreateXappRnibIfInstance(),
148                 e2SubsDb:          CreateSdl(),
149                 restSubsDb:        CreateRESTSdl(),
150                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
151                 LoggerLevel:       1,
152         }
153
154         e2IfState.Init(c)
155         c.ReadConfigParameters("")
156
157         // Register REST handler for testing support
158         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
159         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
160         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161
162         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
163         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164
165         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
167         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
170         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171
172         if readSubsFromDb == "true" {
173                 // Read subscriptions from db
174                 err := c.ReadE2Subscriptions()
175                 if err != nil {
176                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177                 }
178                 err = c.ReadRESTSubscriptions()
179                 if err != nil {
180                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
181                 }
182         }
183
184         go func() {
185                 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186                 if err != nil {
187                         xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
188                 }
189         }()
190         return c
191 }
192
193 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
194         subscriptions, err := c.registry.QueryHandler()
195         if err != nil {
196                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
197         }
198
199         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
200 }
201
202 //-------------------------------------------------------------------
203 //
204 //-------------------------------------------------------------------
205 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
206         xapp.Logger.Debug("RESTQueryHandler() called")
207
208         c.CntRecvMsg++
209
210         return c.registry.QueryHandler()
211 }
212
213 //-------------------------------------------------------------------
214 //
215 //-------------------------------------------------------------------
216 func (c *Control) ReadE2Subscriptions() error {
217         var err error
218         var subIds []uint32
219         var register map[uint32]*Subscription
220         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
221                 xapp.Logger.Debug("Reading E2 subscriptions from db")
222                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223                 if err != nil {
224                         xapp.Logger.Error("%v", err)
225                         <-time.After(1 * time.Second)
226                 } else {
227                         c.registry.subIds = subIds
228                         c.registry.register = register
229                         go c.HandleUncompletedSubscriptions(register)
230                         return nil
231                 }
232         }
233         xapp.Logger.Debug("Continuing without retring")
234         return err
235 }
236
237 //-------------------------------------------------------------------
238 //
239 //-------------------------------------------------------------------
240 func (c *Control) ReadRESTSubscriptions() error {
241
242         xapp.Logger.Debug("ReadRESTSubscriptions()")
243         var err error
244         var restSubscriptions map[string]*RESTSubscription
245         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
246                 xapp.Logger.Debug("Reading REST subscriptions from db")
247                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248                 if err != nil {
249                         xapp.Logger.Error("%v", err)
250                         <-time.After(1 * time.Second)
251                 } else {
252                         // Fix REST subscriptions ongoing status after restart
253                         for restSubId, restSubscription := range restSubscriptions {
254                                 restSubscription.SubReqOngoing = false
255                                 restSubscription.SubDelReqOngoing = false
256                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257                                 if err != nil {
258                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
259                                 }
260                         }
261                         c.registry.restSubscriptions = restSubscriptions
262                         return nil
263                 }
264         }
265         xapp.Logger.Debug("Continuing without retring")
266         return err
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272 func (c *Control) ReadConfigParameters(f string) {
273
274         xapp.Logger.Debug("ReadConfigParameters")
275
276         c.LoggerLevel = int(xapp.Logger.GetLevel())
277         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
278         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279
280         // viper.GetDuration returns nanoseconds
281         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
282         if e2tSubReqTimeout == 0 {
283                 e2tSubReqTimeout = 2000 * 1000000
284                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285         }
286         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287
288         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
289         if e2tSubDelReqTime == 0 {
290                 e2tSubDelReqTime = 2000 * 1000000
291                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292         }
293         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294
295         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
296         if e2tRecvMsgTimeout == 0 {
297                 e2tRecvMsgTimeout = 2000 * 1000000
298                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299         }
300         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301
302         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
303         if e2tMaxSubReqTryCount == 0 {
304                 e2tMaxSubReqTryCount = 1
305                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306         }
307         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308
309         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
310         if e2tMaxSubDelReqTryCount == 0 {
311                 e2tMaxSubDelReqTryCount = 1
312                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313         }
314         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315
316         checkE2State = viper.GetString("controls.checkE2State")
317         if checkE2State == "" {
318                 checkE2State = "true"
319                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320         }
321         xapp.Logger.Debug("checkE2State= %v", checkE2State)
322
323         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
324         if readSubsFromDb == "" {
325                 readSubsFromDb = "true"
326                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327         }
328         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329
330         dbTryCount = viper.GetInt("controls.dbTryCount")
331         if dbTryCount == 0 {
332                 dbTryCount = 200
333                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334         }
335         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336
337         dbRetryForever = viper.GetString("controls.dbRetryForever")
338         if dbRetryForever == "" {
339                 dbRetryForever = "true"
340                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341         }
342         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343
344         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
345         // value 100ms used currently only in unittests.
346         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
347         if waitRouteCleanup_ms == 0 {
348                 waitRouteCleanup_ms = 5000 * 1000000
349                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350         }
351         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
352
353         viper.SetDefault("controls.checkE2IEOrder", 1)
354         e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder"))
355         c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue)
356         xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue)
357 }
358
359 //-------------------------------------------------------------------
360 //
361 //-------------------------------------------------------------------
362 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
363
364         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
365         for subId, subs := range register {
366                 if subs.SubRespRcvd == false {
367                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
368                         if subs.PolicyUpdate == false {
369                                 subs.NoRespToXapp = true
370                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
371                                 c.SendSubscriptionDeleteReq(subs, false)
372                         }
373                 }
374         }
375 }
376
377 func (c *Control) ReadyCB(data interface{}) {
378         if c.RMRClient == nil {
379                 c.RMRClient = xapp.Rmr
380         }
381 }
382
383 func (c *Control) Run() {
384         xapp.SetReadyCB(c.ReadyCB, nil)
385         xapp.AddConfigChangeListener(c.ReadConfigParameters)
386         xapp.Run(c)
387 }
388
389 //-------------------------------------------------------------------
390 //
391 //-------------------------------------------------------------------
392 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
393
394         var restSubId string
395         var restSubscription *RESTSubscription
396         var err error
397
398         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
399         if p.SubscriptionID == "" {
400                 // Subscription does not contain REST subscription Id
401                 if exists {
402                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
403                         if restSubscription != nil {
404                                 // Subscription not found
405                                 restSubId = prevRestSubsId
406                                 if err == nil {
407                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
408                                 } else {
409                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
410                                 }
411                         } else {
412                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
413                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
414                         }
415                 }
416
417                 if restSubscription == nil {
418                         restSubId = ksuid.New().String()
419                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
420                 }
421         } else {
422                 // Subscription contains REST subscription Id
423                 restSubId = p.SubscriptionID
424
425                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
426                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
427                 if err != nil {
428                         // Subscription with id in REST request does not exist
429                         xapp.Logger.Error("%s", err.Error())
430                         c.UpdateCounter(cRestSubFailToXapp)
431                         return nil, "", err
432                 }
433
434                 if !exists {
435                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
436                 } else {
437                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
438                 }
439         }
440
441         return restSubscription, restSubId, nil
442 }
443
444 //-------------------------------------------------------------------
445 //
446 //-------------------------------------------------------------------
447 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
448
449         c.CntRecvMsg++
450         c.UpdateCounter(cRestSubReqFromXapp)
451
452         subResp := models.SubscriptionResponse{}
453         p := params.(*models.SubscriptionParams)
454
455         if c.LoggerLevel > 2 {
456                 c.PrintRESTSubscriptionRequest(p)
457         }
458
459         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
460                 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
461                         xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
462                 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
463                         xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
464                 }
465                 c.UpdateCounter(cRestReqRejDueE2Down)
466                 return nil, common.SubscribeServiceUnavailableCode
467         }
468
469         if p.ClientEndpoint == nil {
470                 err := fmt.Errorf("ClientEndpoint == nil")
471                 xapp.Logger.Error("%v", err)
472                 c.UpdateCounter(cRestSubFailToXapp)
473                 return nil, common.SubscribeBadRequestCode
474         }
475
476         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
477         if err != nil {
478                 xapp.Logger.Error("%s", err)
479                 c.UpdateCounter(cRestSubFailToXapp)
480                 return nil, common.SubscribeBadRequestCode
481         }
482         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
483         if err != nil {
484                 xapp.Logger.Error("%s", err.Error())
485                 c.UpdateCounter(cRestSubFailToXapp)
486                 return nil, common.SubscribeBadRequestCode
487         }
488
489         md5sum, err := CalculateRequestMd5sum(params)
490         if err != nil {
491                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
492         }
493
494         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
495         if err != nil {
496                 xapp.Logger.Error("Subscription with id in REST request does not exist")
497                 return nil, common.SubscribeNotFoundCode
498         }
499
500         subResp.SubscriptionID = &restSubId
501         subReqList := e2ap.SubscriptionRequestList{}
502         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
503         if err != nil {
504                 xapp.Logger.Error("%s", err.Error())
505                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
506                 c.registry.DeleteRESTSubscription(&restSubId)
507                 c.UpdateCounter(cRestSubFailToXapp)
508                 return nil, common.SubscribeBadRequestCode
509         }
510
511         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
512         if duplicate {
513                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
514                 xapp.Logger.Debug("%s", err)
515                 c.registry.DeleteRESTSubscription(&restSubId)
516                 c.UpdateCounter(cRestSubRespToXapp)
517                 return &subResp, common.SubscribeCreatedCode
518         }
519
520         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
521         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
522
523         c.UpdateCounter(cRestSubRespToXapp)
524         return &subResp, common.SubscribeCreatedCode
525 }
526
527 //-------------------------------------------------------------------
528 //
529 //-------------------------------------------------------------------
530 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
531
532         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
533         if p == nil || p.E2SubscriptionDirectives == nil {
534                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
535                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
536                 e2SubscriptionDirectives.CreateRMRRoute = true
537                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
538         } else {
539                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
540                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
541                 } else {
542                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
543                 }
544                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
545                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
546                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
547                 } else {
548                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
549                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
550                         } else {
551                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
552                         }
553                 }
554                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
555         }
556         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
557         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
558         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
559         return e2SubscriptionDirectives, nil
560 }
561
562 //-------------------------------------------------------------------
563 //
564 //-------------------------------------------------------------------
565
566 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
567         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
568
569         c.SubscriptionProcessingStartDelay()
570         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
571
572         var xAppEventInstanceID int64
573         var e2EventInstanceID int64
574         errorInfo := &ErrorInfo{}
575
576         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
577
578         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
579                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
580                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
581
582                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
583                 if trans == nil {
584                         // Send notification to xApp that prosessing of a Subscription Request has failed.
585                         err := fmt.Errorf("Tracking failure")
586                         errorInfo.ErrorCause = err.Error()
587                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
588                         continue
589                 }
590
591                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
592
593                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
594
595                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
596                 trans.Release()
597
598                 if err != nil {
599                         if err.Error() == "TEST: restart event received" {
600                                 // This is just for UT cases. Stop here subscription processing
601                                 return
602                         }
603                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
604                 } else {
605                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
606                         restSubscription.AddMd5Sum(md5sum)
607                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
608                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
609                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
610                 }
611         }
612 }
613
614 //-------------------------------------------------------------------
615 //
616 //------------------------------------------------------------------
617 func (c *Control) SubscriptionProcessingStartDelay() {
618         if c.UTTesting == true {
619                 // This is temporary fix for the UT problem that notification arrives before subscription response
620                 // Correct fix would be to allow notification come before response and process it correctly
621                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
622                 <-time.After(time.Millisecond * 50)
623                 xapp.Logger.Debug("Continuing after delay")
624         }
625 }
626
627 //-------------------------------------------------------------------
628 //
629 //------------------------------------------------------------------
630 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
631         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
632
633         errorInfo := ErrorInfo{}
634
635         err := c.tracker.Track(trans)
636         if err != nil {
637                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
638                 errorInfo.ErrorCause = err.Error()
639                 err = fmt.Errorf("Tracking failure")
640                 return nil, &errorInfo, err
641         }
642
643         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
644         if err != nil {
645                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
646                 return nil, &errorInfo, err
647         }
648
649         //
650         // Wake subs request
651         //
652         subs.OngoingReqCount++
653         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
654         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
655         subs.OngoingReqCount--
656
657         err = nil
658         if event != nil {
659                 switch themsg := event.(type) {
660                 case *e2ap.E2APSubscriptionResponse:
661                         trans.Release()
662                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
663                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
664                                 return themsg, &errorInfo, nil
665                         } else {
666                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
667                                 c.RemoveSubscriptionFromDb(subs)
668                                 err = fmt.Errorf("E2 interface down")
669                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
670                         }
671                 case *e2ap.E2APSubscriptionFailure:
672                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
673                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
674                 case *PackSubscriptionRequestErrortEvent:
675                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
676                         errorInfo = themsg.ErrorInfo
677                 case *SDLWriteErrortEvent:
678                         err = fmt.Errorf("SDL write failure")
679                         errorInfo = themsg.ErrorInfo
680                 case *SubmgrRestartTestEvent:
681                         err = fmt.Errorf("TEST: restart event received")
682                         xapp.Logger.Debug("%s", err)
683                         return nil, &errorInfo, err
684                 default:
685                         err = fmt.Errorf("Unexpected E2 subscription response received")
686                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
687                         break
688                 }
689         } else {
690                 // Timer expiry
691                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
692                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
693                 if subs.PolicyUpdate == true {
694                         return nil, &errorInfo, err
695                 }
696         }
697
698         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
699         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
700
701         return nil, &errorInfo, err
702 }
703
704 //-------------------------------------------------------------------
705 //
706 //-------------------------------------------------------------------
707 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
708         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
709
710         // Send notification to xApp that prosessing of a Subscription Request has failed.
711         e2EventInstanceID := (int64)(0)
712         if errorInfo.ErrorSource == "" {
713                 // Submgr is default source of error
714                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
715         }
716         resp := &models.SubscriptionResponse{
717                 SubscriptionID: restSubId,
718                 SubscriptionInstances: []*models.SubscriptionInstance{
719                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
720                                 ErrorCause:          errorInfo.ErrorCause,
721                                 ErrorSource:         errorInfo.ErrorSource,
722                                 TimeoutType:         errorInfo.TimeoutType,
723                                 XappEventInstanceID: &xAppEventInstanceID},
724                 },
725         }
726         // Mark REST subscription request processed.
727         restSubscription.SetProcessed(err)
728         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
729         if trans != nil {
730                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
731                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
732         } else {
733                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
734                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
735         }
736
737         c.UpdateCounter(cRestSubFailNotifToXapp)
738         err = xapp.Subscription.Notify(resp, *clientEndpoint)
739         if err != nil {
740                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
741         }
742
743         // E2 is down. Delete completely processed request safely now
744         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
745                 c.registry.DeleteRESTSubscription(restSubId)
746                 c.RemoveRESTSubscriptionFromDb(*restSubId)
747         }
748 }
749
750 //-------------------------------------------------------------------
751 //
752 //-------------------------------------------------------------------
753 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
754         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
755
756         // Store successfully processed InstanceId for deletion
757         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
758         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
759
760         // Send notification to xApp that a Subscription Request has been processed.
761         resp := &models.SubscriptionResponse{
762                 SubscriptionID: restSubId,
763                 SubscriptionInstances: []*models.SubscriptionInstance{
764                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
765                                 ErrorCause:          errorInfo.ErrorCause,
766                                 ErrorSource:         errorInfo.ErrorSource,
767                                 XappEventInstanceID: &xAppEventInstanceID},
768                 },
769         }
770         // Mark REST subscription request processesd.
771         restSubscription.SetProcessed(nil)
772         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
773         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
774                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
775         c.UpdateCounter(cRestSubNotifToXapp)
776         err := xapp.Subscription.Notify(resp, *clientEndpoint)
777         if err != nil {
778                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
779         }
780
781         // E2 is down. Delete completely processed request safely now
782         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
783                 c.registry.DeleteRESTSubscription(restSubId)
784                 c.RemoveRESTSubscriptionFromDb(*restSubId)
785         }
786 }
787
788 //-------------------------------------------------------------------
789 //
790 //-------------------------------------------------------------------
791 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
792
793         c.CntRecvMsg++
794         c.UpdateCounter(cRestSubDelReqFromXapp)
795
796         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
797
798         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
799         if err != nil {
800                 xapp.Logger.Error("%s", err.Error())
801                 if restSubscription == nil {
802                         // Subscription was not found
803                         c.UpdateCounter(cRestSubDelRespToXapp)
804                         return common.UnsubscribeNoContentCode
805                 } else {
806                         if restSubscription.SubReqOngoing == true {
807                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
808                                 xapp.Logger.Error("%s", err.Error())
809                                 c.UpdateCounter(cRestSubDelFailToXapp)
810                                 return common.UnsubscribeBadRequestCode
811                         } else if restSubscription.SubDelReqOngoing == true {
812                                 // Previous request for same restSubId still ongoing
813                                 c.UpdateCounter(cRestSubDelRespToXapp)
814                                 return common.UnsubscribeNoContentCode
815                         }
816                 }
817         }
818
819         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
820         go func() {
821                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
822                 for _, instanceId := range restSubscription.InstanceIds {
823                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
824
825                         if err != nil {
826                                 xapp.Logger.Error("%s", err.Error())
827                         }
828                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
829                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
830                         restSubscription.DeleteE2InstanceId(instanceId)
831                 }
832                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
833                 c.registry.DeleteRESTSubscription(&restSubId)
834                 c.RemoveRESTSubscriptionFromDb(restSubId)
835         }()
836
837         c.UpdateCounter(cRestSubDelRespToXapp)
838         return common.UnsubscribeNoContentCode
839 }
840
841 //-------------------------------------------------------------------
842 //
843 //-------------------------------------------------------------------
844 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
845
846         var xAppEventInstanceID int64
847         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
848         if err != nil {
849                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
850                         restSubId, instanceId, idstring(err, nil))
851                 return xAppEventInstanceID, nil
852         }
853
854         xAppEventInstanceID = int64(subs.ReqId.Id)
855         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
856         if trans == nil {
857                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
858                 xapp.Logger.Error("%s", err.Error())
859         }
860         defer trans.Release()
861
862         err = c.tracker.Track(trans)
863         if err != nil {
864                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
865                 xapp.Logger.Error("%s", err.Error())
866                 return xAppEventInstanceID, &time.ParseError{}
867         }
868         //
869         // Wake subs delete
870         //
871         subs.OngoingDelCount++
872         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
873         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
874         subs.OngoingDelCount--
875
876         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
877
878         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
879
880         return xAppEventInstanceID, nil
881 }
882
883 //-------------------------------------------------------------------
884 //
885 //-------------------------------------------------------------------
886
887 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
888         params := &xapp.RMRParams{}
889         params.Mtype = trans.GetMtype()
890         params.SubId = int(subs.GetReqId().InstanceId)
891         params.Xid = ""
892         params.Meid = subs.GetMeid()
893         params.Src = ""
894         params.PayloadLen = len(trans.Payload.Buf)
895         params.Payload = trans.Payload.Buf
896         params.Mbuf = nil
897         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
898         err = c.SendWithRetry(params, false, 5)
899         if err != nil {
900                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
901         }
902         return err
903 }
904
905 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
906
907         params := &xapp.RMRParams{}
908         params.Mtype = trans.GetMtype()
909         params.SubId = int(subs.GetReqId().InstanceId)
910         params.Xid = trans.GetXid()
911         params.Meid = trans.GetMeid()
912         params.Src = ""
913         params.PayloadLen = len(trans.Payload.Buf)
914         params.Payload = trans.Payload.Buf
915         params.Mbuf = nil
916         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
917         err = c.SendWithRetry(params, false, 5)
918         if err != nil {
919                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
920         }
921         return err
922 }
923
924 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
925         if c.RMRClient == nil {
926                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
927                 xapp.Logger.Error("%s", err.Error())
928                 return
929         }
930         c.CntRecvMsg++
931
932         defer c.RMRClient.Free(msg.Mbuf)
933
934         // xapp-frame might use direct access to c buffer and
935         // when msg.Mbuf is freed, someone might take it into use
936         // and payload data might be invalid inside message handle function
937         //
938         // subscriptions won't load system a lot so there is no
939         // real performance hit by cloning buffer into new go byte slice
940         cPay := append(msg.Payload[:0:0], msg.Payload...)
941         msg.Payload = cPay
942         msg.PayloadLen = len(cPay)
943
944         switch msg.Mtype {
945         case xapp.RIC_SUB_REQ:
946                 go c.handleXAPPSubscriptionRequest(msg)
947         case xapp.RIC_SUB_RESP:
948                 go c.handleE2TSubscriptionResponse(msg)
949         case xapp.RIC_SUB_FAILURE:
950                 go c.handleE2TSubscriptionFailure(msg)
951         case xapp.RIC_SUB_DEL_REQ:
952                 go c.handleXAPPSubscriptionDeleteRequest(msg)
953         case xapp.RIC_SUB_DEL_RESP:
954                 go c.handleE2TSubscriptionDeleteResponse(msg)
955         case xapp.RIC_SUB_DEL_FAILURE:
956                 go c.handleE2TSubscriptionDeleteFailure(msg)
957         case xapp.RIC_SUB_DEL_REQUIRED:
958                 go c.handleE2TSubscriptionDeleteRequired(msg)
959         default:
960                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
961         }
962         return
963 }
964
965 //-------------------------------------------------------------------
966 // handle from XAPP Subscription Request
967 //------------------------------------------------------------------
968 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
969         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
970         c.UpdateCounter(cSubReqFromXapp)
971
972         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
973                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
974                 return
975         }
976
977         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
978         if err != nil {
979                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
980                 return
981         }
982
983         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
984         if trans == nil {
985                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
986                 return
987         }
988         defer trans.Release()
989
990         if err = c.tracker.Track(trans); err != nil {
991                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
992                 return
993         }
994
995         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
996         if err != nil {
997                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
998                 return
999         }
1000
1001         c.wakeSubscriptionRequest(subs, trans)
1002 }
1003
1004 //-------------------------------------------------------------------
1005 // Wake Subscription Request to E2node
1006 //------------------------------------------------------------------
1007 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1008
1009         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1010         if err != nil {
1011                 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1012         }
1013         subs.OngoingReqCount++
1014         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1015         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1016         subs.OngoingReqCount--
1017         if event != nil {
1018                 switch themsg := event.(type) {
1019                 case *e2ap.E2APSubscriptionResponse:
1020                         themsg.RequestId.Id = trans.RequestId.Id
1021                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1022                         if err == nil {
1023                                 trans.Release()
1024                                 c.UpdateCounter(cSubRespToXapp)
1025                                 err := c.rmrSendToXapp("", subs, trans)
1026                                 if err != nil {
1027                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1028                                 }
1029                                 return
1030                         }
1031                 case *e2ap.E2APSubscriptionFailure:
1032                         themsg.RequestId.Id = trans.RequestId.Id
1033                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1034                         if err == nil {
1035                                 c.UpdateCounter(cSubFailToXapp)
1036                                 c.rmrSendToXapp("", subs, trans)
1037                         }
1038                 default:
1039                         break
1040                 }
1041         }
1042         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1043 }
1044
1045 //-------------------------------------------------------------------
1046 // handle from XAPP Subscription Delete Request
1047 //------------------------------------------------------------------
1048 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1049         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1050         c.UpdateCounter(cSubDelReqFromXapp)
1051
1052         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1053                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1054                 return
1055         }
1056
1057         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1058         if err != nil {
1059                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1060                 return
1061         }
1062
1063         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1064         if trans == nil {
1065                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1066                 return
1067         }
1068         defer trans.Release()
1069
1070         err = c.tracker.Track(trans)
1071         if err != nil {
1072                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1073                 return
1074         }
1075
1076         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1077         if err != nil {
1078                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1079                 return
1080         }
1081
1082         //
1083         // Wake subs delete
1084         //
1085         subs.OngoingDelCount++
1086         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1087         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1088         subs.OngoingDelCount--
1089
1090         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1091
1092         if subs.NoRespToXapp == true {
1093                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1094                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1095                 return
1096         }
1097
1098         // Whatever is received success, fail or timeout, send successful delete response
1099         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1100         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1101         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1102         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1103         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1104         if err == nil {
1105                 c.UpdateCounter(cSubDelRespToXapp)
1106                 err := c.rmrSendToXapp("", subs, trans)
1107                 if err != nil {
1108                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1109                 }
1110         }
1111 }
1112
1113 //-------------------------------------------------------------------
1114 // SUBS CREATE Handling
1115 //-------------------------------------------------------------------
1116 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1117
1118         var event interface{} = nil
1119         var removeSubscriptionFromDb bool = false
1120         trans := c.tracker.NewSubsTransaction(subs)
1121         subs.WaitTransactionTurn(trans)
1122         defer subs.ReleaseTransactionTurn(trans)
1123         defer trans.Release()
1124
1125         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1126
1127         subRfMsg, valid := subs.GetCachedResponse()
1128         if subRfMsg == nil && valid == true {
1129                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1130                 switch event.(type) {
1131                 case *e2ap.E2APSubscriptionResponse:
1132                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1133                         subs.SubRespRcvd = true
1134                 case *e2ap.E2APSubscriptionFailure:
1135                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1136                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1137                 case *SubmgrRestartTestEvent:
1138                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1139                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1140                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1141                         parentTrans.SendEvent(subRfMsg, 0)
1142                         return
1143                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1144                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1145                 default:
1146                         // Timer expiry
1147                         if subs.PolicyUpdate == false {
1148                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1149                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1150                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1151                         } else {
1152                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1153                         }
1154                 }
1155                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1156         } else {
1157                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1158         }
1159         if valid == false {
1160                 removeSubscriptionFromDb = true
1161         }
1162
1163         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1164         if err != nil {
1165                 valid = false
1166                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1167
1168         }
1169
1170         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1171         if valid == false {
1172                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1173         }
1174
1175         parentTrans.SendEvent(subRfMsg, 0)
1176 }
1177
1178 //-------------------------------------------------------------------
1179 // SUBS DELETE Handling
1180 //-------------------------------------------------------------------
1181
1182 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1183
1184         trans := c.tracker.NewSubsTransaction(subs)
1185         subs.WaitTransactionTurn(trans)
1186         defer subs.ReleaseTransactionTurn(trans)
1187         defer trans.Release()
1188
1189         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1190
1191         subs.mutex.Lock()
1192
1193         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1194                 subs.valid = false
1195                 subs.mutex.Unlock()
1196                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1197         } else {
1198                 subs.mutex.Unlock()
1199         }
1200
1201         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1202         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1203         parentTrans.SendEvent(nil, 0)
1204 }
1205
1206 //-------------------------------------------------------------------
1207 // send to E2T Subscription Request
1208 //-------------------------------------------------------------------
1209 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1210         var err error
1211         var event interface{} = nil
1212         var timedOut bool = false
1213         const ricRequestorId = 123
1214
1215         subReqMsg := subs.SubReqMsg
1216         subReqMsg.RequestId = subs.GetReqId().RequestId
1217         subReqMsg.RequestId.Id = ricRequestorId
1218         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1219         if err != nil {
1220                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1221                 return &PackSubscriptionRequestErrortEvent{
1222                         ErrorInfo{
1223                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1224                                 ErrorCause:  err.Error(),
1225                         },
1226                 }
1227         }
1228
1229         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1230         err = c.WriteSubscriptionToDb(subs)
1231         if err != nil {
1232                 return &SDLWriteErrortEvent{
1233                         ErrorInfo{
1234                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1235                                 ErrorCause:  err.Error(),
1236                         },
1237                 }
1238         }
1239
1240         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1241                 desc := fmt.Sprintf("(retry %d)", retries)
1242                 if retries == 0 {
1243                         c.UpdateCounter(cSubReqToE2)
1244                 } else {
1245                         c.UpdateCounter(cSubReReqToE2)
1246                 }
1247                 err := c.rmrSendToE2T(desc, subs, trans)
1248                 if err != nil {
1249                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1250                 }
1251
1252                 if subs.DoNotWaitSubResp == false {
1253                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1254                         if timedOut {
1255                                 c.UpdateCounter(cSubReqTimerExpiry)
1256                                 continue
1257                         }
1258                 } else {
1259                         // Simulating case where subscrition request has been sent but response has not been received before restart
1260                         event = &SubmgrRestartTestEvent{}
1261                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1262                 }
1263                 break
1264         }
1265         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1266         return event
1267 }
1268
1269 //-------------------------------------------------------------------
1270 // send to E2T Subscription Delete Request
1271 //-------------------------------------------------------------------
1272
1273 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1274         var err error
1275         var event interface{}
1276         var timedOut bool
1277         const ricRequestorId = 123
1278
1279         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1280         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1281         subDelReqMsg.RequestId.Id = ricRequestorId
1282         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1283         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1284         if err != nil {
1285                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1286                 return event
1287         }
1288
1289         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1290                 desc := fmt.Sprintf("(retry %d)", retries)
1291                 if retries == 0 {
1292                         c.UpdateCounter(cSubDelReqToE2)
1293                 } else {
1294                         c.UpdateCounter(cSubDelReReqToE2)
1295                 }
1296                 err := c.rmrSendToE2T(desc, subs, trans)
1297                 if err != nil {
1298                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1299                 }
1300                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1301                 if timedOut {
1302                         c.UpdateCounter(cSubDelReqTimerExpiry)
1303                         continue
1304                 }
1305                 break
1306         }
1307         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1308         return event
1309 }
1310
1311 //-------------------------------------------------------------------
1312 // handle from E2T Subscription Response
1313 //-------------------------------------------------------------------
1314 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1315         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1316         c.UpdateCounter(cSubRespFromE2)
1317
1318         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1319         if err != nil {
1320                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1321                 return
1322         }
1323         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1324         if err != nil {
1325                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1326                 return
1327         }
1328         trans := subs.GetTransaction()
1329         if trans == nil {
1330                 err = fmt.Errorf("Ongoing transaction not found")
1331                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1332                 return
1333         }
1334         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1335         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1336         if sendOk == false {
1337                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1338                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1339         }
1340         return
1341 }
1342
1343 //-------------------------------------------------------------------
1344 // handle from E2T Subscription Failure
1345 //-------------------------------------------------------------------
1346 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1347         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1348         c.UpdateCounter(cSubFailFromE2)
1349         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1350         if err != nil {
1351                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1352                 return
1353         }
1354         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1355         if err != nil {
1356                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1357                 return
1358         }
1359         trans := subs.GetTransaction()
1360         if trans == nil {
1361                 err = fmt.Errorf("Ongoing transaction not found")
1362                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1363                 return
1364         }
1365         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1366         if sendOk == false {
1367                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1368                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1369         }
1370         return
1371 }
1372
1373 //-------------------------------------------------------------------
1374 // handle from E2T Subscription Delete Response
1375 //-------------------------------------------------------------------
1376 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1377         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1378         c.UpdateCounter(cSubDelRespFromE2)
1379         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1380         if err != nil {
1381                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1382                 return
1383         }
1384         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1385         if err != nil {
1386                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1387                 return
1388         }
1389         trans := subs.GetTransaction()
1390         if trans == nil {
1391                 err = fmt.Errorf("Ongoing transaction not found")
1392                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1393                 return
1394         }
1395         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1396         if sendOk == false {
1397                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1398                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1399         }
1400         return
1401 }
1402
1403 //-------------------------------------------------------------------
1404 // handle from E2T Subscription Delete Failure
1405 //-------------------------------------------------------------------
1406 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1407         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1408         c.UpdateCounter(cSubDelFailFromE2)
1409         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1410         if err != nil {
1411                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1412                 return
1413         }
1414         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1415         if err != nil {
1416                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1417                 return
1418         }
1419         trans := subs.GetTransaction()
1420         if trans == nil {
1421                 err = fmt.Errorf("Ongoing transaction not found")
1422                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1423                 return
1424         }
1425         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1426         if sendOk == false {
1427                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1428                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1429         }
1430         return
1431 }
1432
1433 //-------------------------------------------------------------------
1434 //
1435 //-------------------------------------------------------------------
1436 func typeofSubsMessage(v interface{}) string {
1437         if v == nil {
1438                 return "NIL"
1439         }
1440         switch v.(type) {
1441         //case *e2ap.E2APSubscriptionRequest:
1442         //      return "SubReq"
1443         case *e2ap.E2APSubscriptionResponse:
1444                 return "SubResp"
1445         case *e2ap.E2APSubscriptionFailure:
1446                 return "SubFail"
1447         //case *e2ap.E2APSubscriptionDeleteRequest:
1448         //      return "SubDelReq"
1449         case *e2ap.E2APSubscriptionDeleteResponse:
1450                 return "SubDelResp"
1451         case *e2ap.E2APSubscriptionDeleteFailure:
1452                 return "SubDelFail"
1453         default:
1454                 return "Unknown"
1455         }
1456 }
1457
1458 //-------------------------------------------------------------------
1459 //
1460 //-------------------------------------------------------------------
1461 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1462         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1463         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1464         if err != nil {
1465                 xapp.Logger.Error("%v", err)
1466                 return err
1467         }
1468         return nil
1469 }
1470
1471 //-------------------------------------------------------------------
1472 //
1473 //-------------------------------------------------------------------
1474 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1475
1476         if removeSubscriptionFromDb == true {
1477                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1478                 c.RemoveSubscriptionFromDb(subs)
1479         } else {
1480                 // Update is needed for successful response and merge case here
1481                 if subs.RetryFromXapp == false {
1482                         err := c.WriteSubscriptionToDb(subs)
1483                         return err
1484                 }
1485         }
1486         subs.RetryFromXapp = false
1487         return nil
1488 }
1489
1490 //-------------------------------------------------------------------
1491 //
1492 //-------------------------------------------------------------------
1493 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1494         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1495         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1496         if err != nil {
1497                 xapp.Logger.Error("%v", err)
1498         }
1499 }
1500
1501 //-------------------------------------------------------------------
1502 //
1503 //-------------------------------------------------------------------
1504 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1505         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1506         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1507         if err != nil {
1508                 xapp.Logger.Error("%v", err)
1509         }
1510 }
1511
1512 //-------------------------------------------------------------------
1513 //
1514 //-------------------------------------------------------------------
1515 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1516
1517         if removeRestSubscriptionFromDb == true {
1518                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1519                 c.RemoveRESTSubscriptionFromDb(restSubId)
1520         } else {
1521                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1522         }
1523 }
1524
1525 //-------------------------------------------------------------------
1526 //
1527 //-------------------------------------------------------------------
1528 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1529         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1530         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1531         if err != nil {
1532                 xapp.Logger.Error("%v", err)
1533         }
1534 }
1535
1536 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1537
1538         if c.UTTesting == true {
1539                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1540                 c.registry.mutex = new(sync.Mutex)
1541         }
1542
1543         const ricRequestorId = 123
1544         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1545
1546         // Send delete for every endpoint in the subscription
1547         if subs.PolicyUpdate == false {
1548                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1549                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1550                 subDelReqMsg.RequestId.Id = ricRequestorId
1551                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1552                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1553                 if err != nil {
1554                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1555                         return
1556                 }
1557                 for _, endPoint := range subs.EpList.Endpoints {
1558                         params := &xapp.RMRParams{}
1559                         params.Mtype = mType
1560                         params.SubId = int(subs.GetReqId().InstanceId)
1561                         params.Xid = ""
1562                         params.Meid = subs.Meid
1563                         params.Src = endPoint.String()
1564                         params.PayloadLen = len(payload.Buf)
1565                         params.Payload = payload.Buf
1566                         params.Mbuf = nil
1567                         subs.DeleteFromDb = true
1568                         if !e2SubsDelRequired {
1569                                 c.handleXAPPSubscriptionDeleteRequest(params)
1570                         } else {
1571                                 c.SendSubscriptionDeleteReqToE2T(subs, params)
1572                         }
1573                 }
1574         }
1575 }
1576
1577 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1578
1579         fmt.Println("CRESTSubscriptionRequest")
1580
1581         if p == nil {
1582                 return
1583         }
1584
1585         if p.SubscriptionID != "" {
1586                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1587         } else {
1588                 fmt.Println("  SubscriptionID = ''")
1589         }
1590
1591         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1592
1593         if p.ClientEndpoint.HTTPPort != nil {
1594                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1595         } else {
1596                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1597         }
1598
1599         if p.ClientEndpoint.RMRPort != nil {
1600                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1601         } else {
1602                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1603         }
1604
1605         if p.Meid != nil {
1606                 fmt.Printf("  Meid = %s\n", *p.Meid)
1607         } else {
1608                 fmt.Println("  Meid = nil")
1609         }
1610
1611         if p.E2SubscriptionDirectives == nil {
1612                 fmt.Println("  E2SubscriptionDirectives = nil")
1613         } else {
1614                 fmt.Println("  E2SubscriptionDirectives")
1615                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1616                         fmt.Println("    E2RetryCount == nil")
1617                 } else {
1618                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1619                 }
1620                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1621                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1622         }
1623         for _, subscriptionDetail := range p.SubscriptionDetails {
1624                 if p.RANFunctionID != nil {
1625                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1626                 } else {
1627                         fmt.Println("  RANFunctionID = nil")
1628                 }
1629                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1630                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1631
1632                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1633                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1634                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1635                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1636
1637                         if actionToBeSetup.SubsequentAction != nil {
1638                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1639                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1640                         } else {
1641                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1642                         }
1643                 }
1644         }
1645 }
1646
1647 //-------------------------------------------------------------------
1648 // handle from E2T Subscription Delete Required
1649 //-------------------------------------------------------------------
1650 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1651         xapp.Logger.Info("MSG from E2T: %s", params.String())
1652         c.UpdateCounter(cSubDelRequFromE2)
1653         subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1654         if err != nil {
1655                 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1656                 //c.sendE2TErrorIndication(nil)
1657                 return
1658         }
1659         var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1660         var subDB = []*Subscription{}
1661         for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1662                 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1663                 if err != nil {
1664                         xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1665                         continue
1666                 }
1667                 // Check if Delete Subscription Already triggered
1668                 if subs.OngoingDelCount > 0 {
1669                         continue
1670                 }
1671                 subDB = append(subDB, subs)
1672                 for _, endpoint := range subs.EpList.Endpoints {
1673                         subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1674                 }
1675                 // Sending Subscription Delete Request to E2T
1676                 //      c.SendSubscriptionDeleteReq(subs, true)
1677         }
1678         for _, subsTobeRemove := range subDB {
1679                 // Sending Subscription Delete Request to E2T
1680                 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1681         }
1682 }
1683
1684 //-----------------------------------------------------------------
1685 // Initiate RIC Subscription Delete Request after receiving
1686 // RIC Subscription Delete Required from E2T
1687 //-----------------------------------------------------------------
1688 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1689         xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1690         c.UpdateCounter(cSubDelReqToE2)
1691
1692         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1693                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1694                 return
1695         }
1696
1697         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1698         if trans == nil {
1699                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1700                 return
1701         }
1702         defer trans.Release()
1703
1704         err := c.tracker.Track(trans)
1705         if err != nil {
1706                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1707                 return
1708         }
1709
1710         //
1711         // Wake subs delete
1712         //
1713         subs.OngoingDelCount++
1714         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1715         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1716         subs.OngoingDelCount--
1717
1718         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1719
1720         if subs.NoRespToXapp == true {
1721                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1722                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1723                 return
1724         }
1725 }