49b7968428661dce37f445fb8804481bf465a843
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33         httptransport "github.com/go-openapi/runtime/client"
34         "github.com/go-openapi/strfmt"
35         "github.com/segmentio/ksuid"
36         "github.com/spf13/viper"
37 )
38
39 //-----------------------------------------------------------------------------
40 //
41 //-----------------------------------------------------------------------------
42
43 func idstring(err error, entries ...fmt.Stringer) string {
44         var retval string = ""
45         var filler string = ""
46         for _, entry := range entries {
47                 if entry != nil {
48                         retval += filler + entry.String()
49                         filler = " "
50                 } else {
51                         retval += filler + "(NIL)"
52                 }
53         }
54         if err != nil {
55                 retval += filler + "err(" + err.Error() + ")"
56                 filler = " "
57         }
58         return retval
59 }
60
61 //-----------------------------------------------------------------------------
62 //
63 //-----------------------------------------------------------------------------
64
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64    // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
74 var dbTryCount int
75
76 type Control struct {
77         *xapp.RMRClient
78         e2ap              *E2ap
79         registry          *Registry
80         tracker           *Tracker
81         restDuplicateCtrl *DuplicateCtrl
82         e2IfState         *E2IfState
83         e2IfStateDb       XappRnibInterface
84         e2SubsDb          Sdlnterface
85         restSubsDb        Sdlnterface
86         CntRecvMsg        uint64
87         ResetTestFlag     bool
88         Counters          map[string]xapp.Counter
89         LoggerLevel       int
90         UTTesting         bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         restDuplicateCtrl := new(DuplicateCtrl)
137         restDuplicateCtrl.Init()
138
139         e2IfState := new(E2IfState)
140
141         c := &Control{e2ap: new(E2ap),
142                 registry:          registry,
143                 tracker:           tracker,
144                 restDuplicateCtrl: restDuplicateCtrl,
145                 e2IfState:         e2IfState,
146                 e2IfStateDb:       CreateXappRnibIfInstance(),
147                 e2SubsDb:          CreateSdl(),
148                 restSubsDb:        CreateRESTSdl(),
149                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
150                 LoggerLevel:       1,
151         }
152
153         e2IfState.Init(c)
154         c.ReadConfigParameters("")
155
156         // Register REST handler for testing support
157         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
160
161         xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162         xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
163
164         xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165         xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166         xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
167
168         xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169         xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
170
171         if readSubsFromDb == "true" {
172                 // Read subscriptions from db
173                 err := c.ReadE2Subscriptions()
174                 if err != nil {
175                         xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
176                 }
177                 err = c.ReadRESTSubscriptions()
178                 if err != nil {
179                         xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
180                 }
181         }
182
183         go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
184         return c
185 }
186
187 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
188         subscriptions, err := c.registry.QueryHandler()
189         if err != nil {
190                 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
191         }
192
193         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
194 }
195
196 //-------------------------------------------------------------------
197 //
198 //-------------------------------------------------------------------
199 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
200         xapp.Logger.Debug("RESTQueryHandler() called")
201
202         c.CntRecvMsg++
203
204         return c.registry.QueryHandler()
205 }
206
207 //-------------------------------------------------------------------
208 //
209 //-------------------------------------------------------------------
210 func (c *Control) ReadE2Subscriptions() error {
211         var err error
212         var subIds []uint32
213         var register map[uint32]*Subscription
214         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
215                 xapp.Logger.Debug("Reading E2 subscriptions from db")
216                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
217                 if err != nil {
218                         xapp.Logger.Error("%v", err)
219                         <-time.After(1 * time.Second)
220                 } else {
221                         c.registry.subIds = subIds
222                         c.registry.register = register
223                         go c.HandleUncompletedSubscriptions(register)
224                         return nil
225                 }
226         }
227         xapp.Logger.Debug("Continuing without retring")
228         return err
229 }
230
231 //-------------------------------------------------------------------
232 //
233 //-------------------------------------------------------------------
234 func (c *Control) ReadRESTSubscriptions() error {
235
236         xapp.Logger.Debug("ReadRESTSubscriptions()")
237         var err error
238         var restSubscriptions map[string]*RESTSubscription
239         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
240                 xapp.Logger.Debug("Reading REST subscriptions from db")
241                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
242                 if err != nil {
243                         xapp.Logger.Error("%v", err)
244                         <-time.After(1 * time.Second)
245                 } else {
246                         // Fix REST subscriptions ongoing status after restart
247                         for restSubId, restSubscription := range restSubscriptions {
248                                 restSubscription.SubReqOngoing = false
249                                 restSubscription.SubDelReqOngoing = false
250                                 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
251                                 if err != nil {
252                                         xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
253                                 }
254                         }
255                         c.registry.restSubscriptions = restSubscriptions
256                         return nil
257                 }
258         }
259         xapp.Logger.Debug("Continuing without retring")
260         return err
261 }
262
263 //-------------------------------------------------------------------
264 //
265 //-------------------------------------------------------------------
266 func (c *Control) ReadConfigParameters(f string) {
267
268         xapp.Logger.Debug("ReadConfigParameters")
269
270         c.LoggerLevel = int(xapp.Logger.GetLevel())
271         xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
272         c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
273
274         // viper.GetDuration returns nanoseconds
275         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
276         if e2tSubReqTimeout == 0 {
277                 e2tSubReqTimeout = 2000 * 1000000
278                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
279         }
280         xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
281
282         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
283         if e2tSubDelReqTime == 0 {
284                 e2tSubDelReqTime = 2000 * 1000000
285                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
286         }
287         xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
288
289         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
290         if e2tRecvMsgTimeout == 0 {
291                 e2tRecvMsgTimeout = 2000 * 1000000
292                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
293         }
294         xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
295
296         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
297         if e2tMaxSubReqTryCount == 0 {
298                 e2tMaxSubReqTryCount = 1
299                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
300         }
301         xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
302
303         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
304         if e2tMaxSubDelReqTryCount == 0 {
305                 e2tMaxSubDelReqTryCount = 1
306                 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
307         }
308         xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
309
310         checkE2State = viper.GetString("controls.checkE2State")
311         if checkE2State == "" {
312                 checkE2State = "true"
313                 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
314         }
315         xapp.Logger.Debug("checkE2State= %v", checkE2State)
316
317         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
318         if readSubsFromDb == "" {
319                 readSubsFromDb = "true"
320                 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
321         }
322         xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
323
324         dbTryCount = viper.GetInt("controls.dbTryCount")
325         if dbTryCount == 0 {
326                 dbTryCount = 200
327                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
328         }
329         xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
330
331         dbRetryForever = viper.GetString("controls.dbRetryForever")
332         if dbRetryForever == "" {
333                 dbRetryForever = "true"
334                 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
335         }
336         xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
337
338         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
339         // value 100ms used currently only in unittests.
340         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
341         if waitRouteCleanup_ms == 0 {
342                 waitRouteCleanup_ms = 5000 * 1000000
343                 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
344         }
345         xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
346 }
347
348 //-------------------------------------------------------------------
349 //
350 //-------------------------------------------------------------------
351 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
352
353         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
354         for subId, subs := range register {
355                 if subs.SubRespRcvd == false {
356                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
357                         if subs.PolicyUpdate == false {
358                                 subs.NoRespToXapp = true
359                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
360                                 c.SendSubscriptionDeleteReq(subs)
361                         }
362                 }
363         }
364 }
365
366 func (c *Control) ReadyCB(data interface{}) {
367         if c.RMRClient == nil {
368                 c.RMRClient = xapp.Rmr
369         }
370 }
371
372 func (c *Control) Run() {
373         xapp.SetReadyCB(c.ReadyCB, nil)
374         xapp.AddConfigChangeListener(c.ReadConfigParameters)
375         xapp.Run(c)
376 }
377
378 //-------------------------------------------------------------------
379 //
380 //-------------------------------------------------------------------
381 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
382
383         var restSubId string
384         var restSubscription *RESTSubscription
385         var err error
386
387         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
388         if p.SubscriptionID == "" {
389                 // Subscription does not contain REST subscription Id
390                 if exists {
391                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
392                         if restSubscription != nil {
393                                 // Subscription not found
394                                 restSubId = prevRestSubsId
395                                 if err == nil {
396                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
397                                 } else {
398                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
399                                 }
400                         } else {
401                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
402                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
403                         }
404                 }
405
406                 if restSubscription == nil {
407                         restSubId = ksuid.New().String()
408                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
409                 }
410         } else {
411                 // Subscription contains REST subscription Id
412                 restSubId = p.SubscriptionID
413
414                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
415                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
416                 if err != nil {
417                         // Subscription with id in REST request does not exist
418                         xapp.Logger.Error("%s", err.Error())
419                         c.UpdateCounter(cRestSubFailToXapp)
420                         return nil, "", err
421                 }
422
423                 if !exists {
424                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
425                 } else {
426                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
427                 }
428         }
429
430         return restSubscription, restSubId, nil
431 }
432
433 //-------------------------------------------------------------------
434 //
435 //-------------------------------------------------------------------
436 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
437
438         c.CntRecvMsg++
439         c.UpdateCounter(cRestSubReqFromXapp)
440
441         subResp := models.SubscriptionResponse{}
442         p := params.(*models.SubscriptionParams)
443
444         if c.LoggerLevel > 2 {
445                 c.PrintRESTSubscriptionRequest(p)
446         }
447
448         if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
449                 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
450                 c.UpdateCounter(cRestReqRejDueE2Down)
451                 return nil, common.SubscribeServiceUnavailableCode
452         }
453
454         if p.ClientEndpoint == nil {
455                 err := fmt.Errorf("ClientEndpoint == nil")
456                 xapp.Logger.Error("%v", err)
457                 c.UpdateCounter(cRestSubFailToXapp)
458                 return nil, common.SubscribeBadRequestCode
459         }
460
461         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
462         if err != nil {
463                 xapp.Logger.Error("%s", err)
464                 c.UpdateCounter(cRestSubFailToXapp)
465                 return nil, common.SubscribeBadRequestCode
466         }
467         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
468         if err != nil {
469                 xapp.Logger.Error("%s", err.Error())
470                 c.UpdateCounter(cRestSubFailToXapp)
471                 return nil, common.SubscribeBadRequestCode
472         }
473
474         md5sum, err := CalculateRequestMd5sum(params)
475         if err != nil {
476                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
477         }
478
479         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
480         if err != nil {
481                 xapp.Logger.Error("Subscription with id in REST request does not exist")
482                 return nil, common.SubscribeNotFoundCode
483         }
484
485         subResp.SubscriptionID = &restSubId
486         subReqList := e2ap.SubscriptionRequestList{}
487         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
488         if err != nil {
489                 xapp.Logger.Error("%s", err.Error())
490                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
491                 c.registry.DeleteRESTSubscription(&restSubId)
492                 c.UpdateCounter(cRestSubFailToXapp)
493                 return nil, common.SubscribeBadRequestCode
494         }
495
496         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
497         if duplicate {
498                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
499                 xapp.Logger.Debug("%s", err)
500                 c.registry.DeleteRESTSubscription(&restSubId)
501                 c.UpdateCounter(cRestSubRespToXapp)
502                 return &subResp, common.SubscribeCreatedCode
503         }
504
505         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
506         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
507
508         c.UpdateCounter(cRestSubRespToXapp)
509         return &subResp, common.SubscribeCreatedCode
510 }
511
512 //-------------------------------------------------------------------
513 //
514 //-------------------------------------------------------------------
515 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
516
517         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
518         if p == nil || p.E2SubscriptionDirectives == nil {
519                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
520                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
521                 e2SubscriptionDirectives.CreateRMRRoute = true
522                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
523         } else {
524                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
525                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
526                 } else {
527                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
528                 }
529                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
530                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
531                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
532                 } else {
533                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
534                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
535                         } else {
536                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
537                         }
538                 }
539                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
540         }
541         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
542         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
543         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
544         return e2SubscriptionDirectives, nil
545 }
546
547 //-------------------------------------------------------------------
548 //
549 //-------------------------------------------------------------------
550
551 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
552         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
553
554         c.SubscriptionProcessingStartDelay()
555         xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
556
557         var xAppEventInstanceID int64
558         var e2EventInstanceID int64
559         errorInfo := &ErrorInfo{}
560
561         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
562
563         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
564                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
565                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
566
567                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
568                 if trans == nil {
569                         // Send notification to xApp that prosessing of a Subscription Request has failed.
570                         err := fmt.Errorf("Tracking failure")
571                         errorInfo.ErrorCause = err.Error()
572                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
573                         continue
574                 }
575
576                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
577
578                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
579
580                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
581                 trans.Release()
582
583                 if err != nil {
584                         if err.Error() == "TEST: restart event received" {
585                                 // This is just for UT cases. Stop here subscription processing
586                                 return
587                         }
588                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
589                 } else {
590                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
591                         restSubscription.AddMd5Sum(md5sum)
592                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
593                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
594                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
595                 }
596         }
597 }
598
599 //-------------------------------------------------------------------
600 //
601 //------------------------------------------------------------------
602 func (c *Control) SubscriptionProcessingStartDelay() {
603         if c.UTTesting == true {
604                 // This is temporary fix for the UT problem that notification arrives before subscription response
605                 // Correct fix would be to allow notification come before response and process it correctly
606                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
607                 <-time.After(time.Millisecond * 50)
608                 xapp.Logger.Debug("Continuing after delay")
609         }
610 }
611
612 //-------------------------------------------------------------------
613 //
614 //------------------------------------------------------------------
615 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
616         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
617
618         errorInfo := ErrorInfo{}
619
620         err := c.tracker.Track(trans)
621         if err != nil {
622                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
623                 errorInfo.ErrorCause = err.Error()
624                 err = fmt.Errorf("Tracking failure")
625                 return nil, &errorInfo, err
626         }
627
628         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
629         if err != nil {
630                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
631                 return nil, &errorInfo, err
632         }
633
634         //
635         // Wake subs request
636         //
637         subs.OngoingReqCount++
638         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
639         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
640         subs.OngoingReqCount--
641
642         err = nil
643         if event != nil {
644                 switch themsg := event.(type) {
645                 case *e2ap.E2APSubscriptionResponse:
646                         trans.Release()
647                         if c.e2IfState.IsE2ConnectionUp(meid) == true {
648                                 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
649                                 return themsg, &errorInfo, nil
650                         } else {
651                                 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
652                                 c.RemoveSubscriptionFromDb(subs)
653                                 err = fmt.Errorf("E2 interface down")
654                                 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
655                         }
656                 case *e2ap.E2APSubscriptionFailure:
657                         err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
658                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
659                 case *PackSubscriptionRequestErrortEvent:
660                         err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
661                         errorInfo = themsg.ErrorInfo
662                 case *SDLWriteErrortEvent:
663                         err = fmt.Errorf("SDL write failure")
664                         errorInfo = themsg.ErrorInfo
665                 case *SubmgrRestartTestEvent:
666                         err = fmt.Errorf("TEST: restart event received")
667                         xapp.Logger.Debug("%s", err)
668                         return nil, &errorInfo, err
669                 default:
670                         err = fmt.Errorf("Unexpected E2 subscription response received")
671                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
672                         break
673                 }
674         } else {
675                 // Timer expiry
676                 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
677                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
678                 if subs.PolicyUpdate == true {
679                         return nil, &errorInfo, err
680                 }
681         }
682
683         xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
684         err2 := c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
685         if err2 != nil {
686                 xapp.Logger.Error("RemoveFromSubscription failed: %s", err2.Error())
687         }
688         return nil, &errorInfo, err
689 }
690
691 //-------------------------------------------------------------------
692 //
693 //-------------------------------------------------------------------
694 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
695         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
696
697         // Send notification to xApp that prosessing of a Subscription Request has failed.
698         e2EventInstanceID := (int64)(0)
699         if errorInfo.ErrorSource == "" {
700                 // Submgr is default source of error
701                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
702         }
703         resp := &models.SubscriptionResponse{
704                 SubscriptionID: restSubId,
705                 SubscriptionInstances: []*models.SubscriptionInstance{
706                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
707                                 ErrorCause:          errorInfo.ErrorCause,
708                                 ErrorSource:         errorInfo.ErrorSource,
709                                 TimeoutType:         errorInfo.TimeoutType,
710                                 XappEventInstanceID: &xAppEventInstanceID},
711                 },
712         }
713         // Mark REST subscription request processed.
714         restSubscription.SetProcessed(err)
715         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
716         if trans != nil {
717                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
718                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
719         } else {
720                 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
721                         errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
722         }
723
724         c.UpdateCounter(cRestSubFailNotifToXapp)
725         err = xapp.Subscription.Notify(resp, *clientEndpoint)
726         if err != nil {
727                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
728         }
729
730         // E2 is down. Delete completely processed request safely now
731         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
732                 c.registry.DeleteRESTSubscription(restSubId)
733                 c.RemoveRESTSubscriptionFromDb(*restSubId)
734         }
735 }
736
737 //-------------------------------------------------------------------
738 //
739 //-------------------------------------------------------------------
740 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
741         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
742
743         // Store successfully processed InstanceId for deletion
744         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
745         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
746
747         // Send notification to xApp that a Subscription Request has been processed.
748         resp := &models.SubscriptionResponse{
749                 SubscriptionID: restSubId,
750                 SubscriptionInstances: []*models.SubscriptionInstance{
751                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
752                                 ErrorCause:          errorInfo.ErrorCause,
753                                 ErrorSource:         errorInfo.ErrorSource,
754                                 XappEventInstanceID: &xAppEventInstanceID},
755                 },
756         }
757         // Mark REST subscription request processesd.
758         restSubscription.SetProcessed(nil)
759         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
760         xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
761                 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
762         c.UpdateCounter(cRestSubNotifToXapp)
763         err := xapp.Subscription.Notify(resp, *clientEndpoint)
764         if err != nil {
765                 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
766         }
767
768         // E2 is down. Delete completely processed request safely now
769         if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
770                 c.registry.DeleteRESTSubscription(restSubId)
771                 c.RemoveRESTSubscriptionFromDb(*restSubId)
772         }
773 }
774
775 //-------------------------------------------------------------------
776 //
777 //-------------------------------------------------------------------
778 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
779
780         c.CntRecvMsg++
781         c.UpdateCounter(cRestSubDelReqFromXapp)
782
783         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
784
785         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
786         if err != nil {
787                 xapp.Logger.Error("%s", err.Error())
788                 if restSubscription == nil {
789                         // Subscription was not found
790                         c.UpdateCounter(cRestSubDelRespToXapp)
791                         return common.UnsubscribeNoContentCode
792                 } else {
793                         if restSubscription.SubReqOngoing == true {
794                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
795                                 xapp.Logger.Error("%s", err.Error())
796                                 c.UpdateCounter(cRestSubDelFailToXapp)
797                                 return common.UnsubscribeBadRequestCode
798                         } else if restSubscription.SubDelReqOngoing == true {
799                                 // Previous request for same restSubId still ongoing
800                                 c.UpdateCounter(cRestSubDelRespToXapp)
801                                 return common.UnsubscribeNoContentCode
802                         }
803                 }
804         }
805
806         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
807         go func() {
808                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
809                 for _, instanceId := range restSubscription.InstanceIds {
810                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
811
812                         if err != nil {
813                                 xapp.Logger.Error("%s", err.Error())
814                         }
815                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
816                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
817                         restSubscription.DeleteE2InstanceId(instanceId)
818                 }
819                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
820                 c.registry.DeleteRESTSubscription(&restSubId)
821                 c.RemoveRESTSubscriptionFromDb(restSubId)
822         }()
823
824         c.UpdateCounter(cRestSubDelRespToXapp)
825         return common.UnsubscribeNoContentCode
826 }
827
828 //-------------------------------------------------------------------
829 //
830 //-------------------------------------------------------------------
831 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
832
833         var xAppEventInstanceID int64
834         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
835         if err != nil {
836                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
837                         restSubId, instanceId, idstring(err, nil))
838                 return xAppEventInstanceID, nil
839         }
840
841         xAppEventInstanceID = int64(subs.ReqId.Id)
842         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
843         if trans == nil {
844                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
845                 xapp.Logger.Error("%s", err.Error())
846         }
847         defer trans.Release()
848
849         err = c.tracker.Track(trans)
850         if err != nil {
851                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
852                 xapp.Logger.Error("%s", err.Error())
853                 return xAppEventInstanceID, &time.ParseError{}
854         }
855         //
856         // Wake subs delete
857         //
858         subs.OngoingDelCount++
859         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
860         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
861         subs.OngoingDelCount--
862
863         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
864
865         err = c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
866         if err != nil {
867                 xapp.Logger.Error("XAPP-SubDelReq %s:", idstring(fmt.Errorf("RemoveFromSubscription faliled"), trans, subs))
868         }
869
870         return xAppEventInstanceID, nil
871 }
872
873 //-------------------------------------------------------------------
874 //
875 //-------------------------------------------------------------------
876
877 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
878         params := &xapp.RMRParams{}
879         params.Mtype = trans.GetMtype()
880         params.SubId = int(subs.GetReqId().InstanceId)
881         params.Xid = ""
882         params.Meid = subs.GetMeid()
883         params.Src = ""
884         params.PayloadLen = len(trans.Payload.Buf)
885         params.Payload = trans.Payload.Buf
886         params.Mbuf = nil
887         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
888         err = c.SendWithRetry(params, false, 5)
889         if err != nil {
890                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
891         }
892         return err
893 }
894
895 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
896
897         params := &xapp.RMRParams{}
898         params.Mtype = trans.GetMtype()
899         params.SubId = int(subs.GetReqId().InstanceId)
900         params.Xid = trans.GetXid()
901         params.Meid = trans.GetMeid()
902         params.Src = ""
903         params.PayloadLen = len(trans.Payload.Buf)
904         params.Payload = trans.Payload.Buf
905         params.Mbuf = nil
906         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
907         err = c.SendWithRetry(params, false, 5)
908         if err != nil {
909                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
910         }
911         return err
912 }
913
914 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
915         if c.RMRClient == nil {
916                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
917                 xapp.Logger.Error("%s", err.Error())
918                 return
919         }
920         c.CntRecvMsg++
921
922         defer c.RMRClient.Free(msg.Mbuf)
923
924         // xapp-frame might use direct access to c buffer and
925         // when msg.Mbuf is freed, someone might take it into use
926         // and payload data might be invalid inside message handle function
927         //
928         // subscriptions won't load system a lot so there is no
929         // real performance hit by cloning buffer into new go byte slice
930         cPay := append(msg.Payload[:0:0], msg.Payload...)
931         msg.Payload = cPay
932         msg.PayloadLen = len(cPay)
933
934         switch msg.Mtype {
935         case xapp.RIC_SUB_REQ:
936                 go c.handleXAPPSubscriptionRequest(msg)
937         case xapp.RIC_SUB_RESP:
938                 go c.handleE2TSubscriptionResponse(msg)
939         case xapp.RIC_SUB_FAILURE:
940                 go c.handleE2TSubscriptionFailure(msg)
941         case xapp.RIC_SUB_DEL_REQ:
942                 go c.handleXAPPSubscriptionDeleteRequest(msg)
943         case xapp.RIC_SUB_DEL_RESP:
944                 go c.handleE2TSubscriptionDeleteResponse(msg)
945         case xapp.RIC_SUB_DEL_FAILURE:
946                 go c.handleE2TSubscriptionDeleteFailure(msg)
947         default:
948                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
949         }
950         return
951 }
952
953 //-------------------------------------------------------------------
954 // handle from XAPP Subscription Request
955 //------------------------------------------------------------------
956 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
957         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
958         c.UpdateCounter(cSubReqFromXapp)
959
960         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
961                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
962                 return
963         }
964
965         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
966         if err != nil {
967                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
968                 return
969         }
970
971         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
972         if trans == nil {
973                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
974                 return
975         }
976         defer trans.Release()
977
978         if err = c.tracker.Track(trans); err != nil {
979                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
980                 return
981         }
982
983         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
984         if err != nil {
985                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
986                 return
987         }
988
989         c.wakeSubscriptionRequest(subs, trans)
990 }
991
992 //-------------------------------------------------------------------
993 // Wake Subscription Request to E2node
994 //------------------------------------------------------------------
995 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
996
997         e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
998         subs.OngoingReqCount++
999         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1000         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1001         subs.OngoingReqCount--
1002         var err error
1003         if event != nil {
1004                 switch themsg := event.(type) {
1005                 case *e2ap.E2APSubscriptionResponse:
1006                         themsg.RequestId.Id = trans.RequestId.Id
1007                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1008                         if err == nil {
1009                                 trans.Release()
1010                                 c.UpdateCounter(cSubRespToXapp)
1011                                 err := c.rmrSendToXapp("", subs, trans)
1012                                 if err != nil {
1013                                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1014                                 }
1015                                 return
1016                         }
1017                 case *e2ap.E2APSubscriptionFailure:
1018                         themsg.RequestId.Id = trans.RequestId.Id
1019                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1020                         if err == nil {
1021                                 c.UpdateCounter(cSubFailToXapp)
1022                                 c.rmrSendToXapp("", subs, trans)
1023                         }
1024                 default:
1025                         break
1026                 }
1027         }
1028         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1029 }
1030
1031 //-------------------------------------------------------------------
1032 // handle from XAPP Subscription Delete Request
1033 //------------------------------------------------------------------
1034 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1035         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1036         c.UpdateCounter(cSubDelReqFromXapp)
1037
1038         if c.e2IfState.IsE2ConnectionUp(&params.Meid.RanName) == false {
1039                 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1040                 return
1041         }
1042
1043         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1044         if err != nil {
1045                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1046                 return
1047         }
1048
1049         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1050         if trans == nil {
1051                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1052                 return
1053         }
1054         defer trans.Release()
1055
1056         err = c.tracker.Track(trans)
1057         if err != nil {
1058                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1059                 return
1060         }
1061
1062         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1063         if err != nil {
1064                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1065                 return
1066         }
1067
1068         //
1069         // Wake subs delete
1070         //
1071         subs.OngoingDelCount++
1072         go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1073         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1074         subs.OngoingDelCount--
1075
1076         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1077
1078         if subs.NoRespToXapp == true {
1079                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1080                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1081                 return
1082         }
1083
1084         // Whatever is received success, fail or timeout, send successful delete response
1085         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1086         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1087         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1088         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1089         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1090         if err == nil {
1091                 c.UpdateCounter(cSubDelRespToXapp)
1092                 err := c.rmrSendToXapp("", subs, trans)
1093                 if err != nil {
1094                         xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1095                 }
1096         }
1097 }
1098
1099 //-------------------------------------------------------------------
1100 // SUBS CREATE Handling
1101 //-------------------------------------------------------------------
1102 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1103
1104         var event interface{} = nil
1105         var removeSubscriptionFromDb bool = false
1106         trans := c.tracker.NewSubsTransaction(subs)
1107         subs.WaitTransactionTurn(trans)
1108         defer subs.ReleaseTransactionTurn(trans)
1109         defer trans.Release()
1110
1111         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1112
1113         subRfMsg, valid := subs.GetCachedResponse()
1114         if subRfMsg == nil && valid == true {
1115                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1116                 switch event.(type) {
1117                 case *e2ap.E2APSubscriptionResponse:
1118                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1119                         subs.SubRespRcvd = true
1120                 case *e2ap.E2APSubscriptionFailure:
1121                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1122                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1123                 case *SubmgrRestartTestEvent:
1124                         // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1125                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1126                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1127                         parentTrans.SendEvent(subRfMsg, 0)
1128                         return
1129                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1130                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1131                 default:
1132                         // Timer expiry
1133                         if subs.PolicyUpdate == false {
1134                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1135                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1136                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1137                         } else {
1138                                 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1139                         }
1140                 }
1141                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1142         } else {
1143                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1144         }
1145         if valid == false {
1146                 removeSubscriptionFromDb = true
1147         }
1148
1149         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1150         if err != nil {
1151                 valid = false
1152                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1153
1154         }
1155
1156         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1157         if valid == false {
1158                 err = c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1159                 if err != nil {
1160                         xapp.Logger.Error("RemoveFromSubscription() failed:%s", err.Error())
1161                 }
1162         }
1163
1164         parentTrans.SendEvent(subRfMsg, 0)
1165 }
1166
1167 //-------------------------------------------------------------------
1168 // SUBS DELETE Handling
1169 //-------------------------------------------------------------------
1170
1171 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1172
1173         trans := c.tracker.NewSubsTransaction(subs)
1174         subs.WaitTransactionTurn(trans)
1175         defer subs.ReleaseTransactionTurn(trans)
1176         defer trans.Release()
1177
1178         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1179
1180         subs.mutex.Lock()
1181
1182         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1183                 subs.valid = false
1184                 subs.mutex.Unlock()
1185                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1186         } else {
1187                 subs.mutex.Unlock()
1188         }
1189
1190         // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1191         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1192         parentTrans.SendEvent(nil, 0)
1193 }
1194
1195 //-------------------------------------------------------------------
1196 // send to E2T Subscription Request
1197 //-------------------------------------------------------------------
1198 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1199         var err error
1200         var event interface{} = nil
1201         var timedOut bool = false
1202         const ricRequestorId = 123
1203
1204         subReqMsg := subs.SubReqMsg
1205         subReqMsg.RequestId = subs.GetReqId().RequestId
1206         subReqMsg.RequestId.Id = ricRequestorId
1207         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1208         if err != nil {
1209                 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1210                 return &PackSubscriptionRequestErrortEvent{
1211                         ErrorInfo{
1212                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1213                                 ErrorCause:  err.Error(),
1214                         },
1215                 }
1216         }
1217
1218         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1219         err = c.WriteSubscriptionToDb(subs)
1220         if err != nil {
1221                 return &SDLWriteErrortEvent{
1222                         ErrorInfo{
1223                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1224                                 ErrorCause:  err.Error(),
1225                         },
1226                 }
1227         }
1228
1229         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1230                 desc := fmt.Sprintf("(retry %d)", retries)
1231                 if retries == 0 {
1232                         c.UpdateCounter(cSubReqToE2)
1233                 } else {
1234                         c.UpdateCounter(cSubReReqToE2)
1235                 }
1236                 err := c.rmrSendToE2T(desc, subs, trans)
1237                 if err != nil {
1238                         xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1239                 }
1240
1241                 if subs.DoNotWaitSubResp == false {
1242                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1243                         if timedOut {
1244                                 c.UpdateCounter(cSubReqTimerExpiry)
1245                                 continue
1246                         }
1247                 } else {
1248                         // Simulating case where subscrition request has been sent but response has not been received before restart
1249                         event = &SubmgrRestartTestEvent{}
1250                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1251                 }
1252                 break
1253         }
1254         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1255         return event
1256 }
1257
1258 //-------------------------------------------------------------------
1259 // send to E2T Subscription Delete Request
1260 //-------------------------------------------------------------------
1261
1262 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1263         var err error
1264         var event interface{}
1265         var timedOut bool
1266         const ricRequestorId = 123
1267
1268         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1269         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1270         subDelReqMsg.RequestId.Id = ricRequestorId
1271         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1272         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1273         if err != nil {
1274                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1275                 return event
1276         }
1277
1278         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1279                 desc := fmt.Sprintf("(retry %d)", retries)
1280                 if retries == 0 {
1281                         c.UpdateCounter(cSubDelReqToE2)
1282                 } else {
1283                         c.UpdateCounter(cSubDelReReqToE2)
1284                 }
1285                 err := c.rmrSendToE2T(desc, subs, trans)
1286                 if err != nil {
1287                         xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1288                 }
1289                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1290                 if timedOut {
1291                         c.UpdateCounter(cSubDelReqTimerExpiry)
1292                         continue
1293                 }
1294                 break
1295         }
1296         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1297         return event
1298 }
1299
1300 //-------------------------------------------------------------------
1301 // handle from E2T Subscription Response
1302 //-------------------------------------------------------------------
1303 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1304         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1305         c.UpdateCounter(cSubRespFromE2)
1306
1307         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1308         if err != nil {
1309                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1310                 return
1311         }
1312         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1313         if err != nil {
1314                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1315                 return
1316         }
1317         trans := subs.GetTransaction()
1318         if trans == nil {
1319                 err = fmt.Errorf("Ongoing transaction not found")
1320                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1321                 return
1322         }
1323         xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1324         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1325         if sendOk == false {
1326                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1327                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1328         }
1329         return
1330 }
1331
1332 //-------------------------------------------------------------------
1333 // handle from E2T Subscription Failure
1334 //-------------------------------------------------------------------
1335 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1336         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1337         c.UpdateCounter(cSubFailFromE2)
1338         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1339         if err != nil {
1340                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1341                 return
1342         }
1343         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1344         if err != nil {
1345                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1346                 return
1347         }
1348         trans := subs.GetTransaction()
1349         if trans == nil {
1350                 err = fmt.Errorf("Ongoing transaction not found")
1351                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1352                 return
1353         }
1354         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1355         if sendOk == false {
1356                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1357                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1358         }
1359         return
1360 }
1361
1362 //-------------------------------------------------------------------
1363 // handle from E2T Subscription Delete Response
1364 //-------------------------------------------------------------------
1365 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1366         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1367         c.UpdateCounter(cSubDelRespFromE2)
1368         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1369         if err != nil {
1370                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1371                 return
1372         }
1373         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1374         if err != nil {
1375                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1376                 return
1377         }
1378         trans := subs.GetTransaction()
1379         if trans == nil {
1380                 err = fmt.Errorf("Ongoing transaction not found")
1381                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1382                 return
1383         }
1384         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1385         if sendOk == false {
1386                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1387                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1388         }
1389         return
1390 }
1391
1392 //-------------------------------------------------------------------
1393 // handle from E2T Subscription Delete Failure
1394 //-------------------------------------------------------------------
1395 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1396         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1397         c.UpdateCounter(cSubDelFailFromE2)
1398         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1399         if err != nil {
1400                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1401                 return
1402         }
1403         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1404         if err != nil {
1405                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1406                 return
1407         }
1408         trans := subs.GetTransaction()
1409         if trans == nil {
1410                 err = fmt.Errorf("Ongoing transaction not found")
1411                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1412                 return
1413         }
1414         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1415         if sendOk == false {
1416                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1417                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1418         }
1419         return
1420 }
1421
1422 //-------------------------------------------------------------------
1423 //
1424 //-------------------------------------------------------------------
1425 func typeofSubsMessage(v interface{}) string {
1426         if v == nil {
1427                 return "NIL"
1428         }
1429         switch v.(type) {
1430         //case *e2ap.E2APSubscriptionRequest:
1431         //      return "SubReq"
1432         case *e2ap.E2APSubscriptionResponse:
1433                 return "SubResp"
1434         case *e2ap.E2APSubscriptionFailure:
1435                 return "SubFail"
1436         //case *e2ap.E2APSubscriptionDeleteRequest:
1437         //      return "SubDelReq"
1438         case *e2ap.E2APSubscriptionDeleteResponse:
1439                 return "SubDelResp"
1440         case *e2ap.E2APSubscriptionDeleteFailure:
1441                 return "SubDelFail"
1442         default:
1443                 return "Unknown"
1444         }
1445 }
1446
1447 //-------------------------------------------------------------------
1448 //
1449 //-------------------------------------------------------------------
1450 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1451         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1452         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1453         if err != nil {
1454                 xapp.Logger.Error("%v", err)
1455                 return err
1456         }
1457         return nil
1458 }
1459
1460 //-------------------------------------------------------------------
1461 //
1462 //-------------------------------------------------------------------
1463 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1464
1465         if removeSubscriptionFromDb == true {
1466                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1467                 c.RemoveSubscriptionFromDb(subs)
1468         } else {
1469                 // Update is needed for successful response and merge case here
1470                 if subs.RetryFromXapp == false {
1471                         err := c.WriteSubscriptionToDb(subs)
1472                         return err
1473                 }
1474         }
1475         subs.RetryFromXapp = false
1476         return nil
1477 }
1478
1479 //-------------------------------------------------------------------
1480 //
1481 //-------------------------------------------------------------------
1482 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1483         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1484         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1485         if err != nil {
1486                 xapp.Logger.Error("%v", err)
1487         }
1488 }
1489
1490 //-------------------------------------------------------------------
1491 //
1492 //-------------------------------------------------------------------
1493 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1494         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1495         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1496         if err != nil {
1497                 xapp.Logger.Error("%v", err)
1498         }
1499 }
1500
1501 //-------------------------------------------------------------------
1502 //
1503 //-------------------------------------------------------------------
1504 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1505
1506         if removeRestSubscriptionFromDb == true {
1507                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1508                 c.RemoveRESTSubscriptionFromDb(restSubId)
1509         } else {
1510                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1511         }
1512 }
1513
1514 //-------------------------------------------------------------------
1515 //
1516 //-------------------------------------------------------------------
1517 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1518         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1519         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1520         if err != nil {
1521                 xapp.Logger.Error("%v", err)
1522         }
1523 }
1524
1525 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1526
1527         if c.UTTesting == true {
1528                 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1529                 c.registry.mutex = new(sync.Mutex)
1530         }
1531
1532         const ricRequestorId = 123
1533         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1534
1535         // Send delete for every endpoint in the subscription
1536         if subs.PolicyUpdate == false {
1537                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1538                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1539                 subDelReqMsg.RequestId.Id = ricRequestorId
1540                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1541                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1542                 if err != nil {
1543                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1544                         return
1545                 }
1546                 for _, endPoint := range subs.EpList.Endpoints {
1547                         params := &xapp.RMRParams{}
1548                         params.Mtype = mType
1549                         params.SubId = int(subs.GetReqId().InstanceId)
1550                         params.Xid = ""
1551                         params.Meid = subs.Meid
1552                         params.Src = endPoint.String()
1553                         params.PayloadLen = len(payload.Buf)
1554                         params.Payload = payload.Buf
1555                         params.Mbuf = nil
1556                         subs.DeleteFromDb = true
1557                         c.handleXAPPSubscriptionDeleteRequest(params)
1558                 }
1559         }
1560 }
1561
1562 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1563
1564         fmt.Println("CRESTSubscriptionRequest")
1565
1566         if p == nil {
1567                 return
1568         }
1569
1570         if p.SubscriptionID != "" {
1571                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1572         } else {
1573                 fmt.Println("  SubscriptionID = ''")
1574         }
1575
1576         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1577
1578         if p.ClientEndpoint.HTTPPort != nil {
1579                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1580         } else {
1581                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1582         }
1583
1584         if p.ClientEndpoint.RMRPort != nil {
1585                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1586         } else {
1587                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1588         }
1589
1590         if p.Meid != nil {
1591                 fmt.Printf("  Meid = %s\n", *p.Meid)
1592         } else {
1593                 fmt.Println("  Meid = nil")
1594         }
1595
1596         if p.E2SubscriptionDirectives == nil {
1597                 fmt.Println("  E2SubscriptionDirectives = nil")
1598         } else {
1599                 fmt.Println("  E2SubscriptionDirectives")
1600                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1601                         fmt.Println("    E2RetryCount == nil")
1602                 } else {
1603                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1604                 }
1605                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1606                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1607         }
1608         for _, subscriptionDetail := range p.SubscriptionDetails {
1609                 if p.RANFunctionID != nil {
1610                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1611                 } else {
1612                         fmt.Println("  RANFunctionID = nil")
1613                 }
1614                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1615                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1616
1617                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1618                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1619                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1620                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1621
1622                         if actionToBeSetup.SubsequentAction != nil {
1623                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1624                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1625                         } else {
1626                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1627                         }
1628                 }
1629         }
1630 }