848b916a270e6464efb9fd0a39218f3ad63d5aba
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35         httptransport "github.com/go-openapi/runtime/client"
36         "github.com/go-openapi/strfmt"
37         "github.com/gorilla/mux"
38         "github.com/segmentio/ksuid"
39         "github.com/spf13/viper"
40 )
41
42 //-----------------------------------------------------------------------------
43 //
44 //-----------------------------------------------------------------------------
45
46 func idstring(err error, entries ...fmt.Stringer) string {
47         var retval string = ""
48         var filler string = ""
49         for _, entry := range entries {
50                 if entry != nil {
51                         retval += filler + entry.String()
52                         filler = " "
53                 } else {
54                         retval += filler + "(NIL)"
55                 }
56         }
57         if err != nil {
58                 retval += filler + "err(" + err.Error() + ")"
59                 filler = " "
60         }
61         return retval
62 }
63
64 //-----------------------------------------------------------------------------
65 //
66 //-----------------------------------------------------------------------------
67
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64    // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var dbRetryForever string
76 var dbTryCount int
77
78 type Control struct {
79         *xapp.RMRClient
80         e2ap              *E2ap
81         registry          *Registry
82         tracker           *Tracker
83         restDuplicateCtrl *DuplicateCtrl
84         e2SubsDb          Sdlnterface
85         restSubsDb        Sdlnterface
86         CntRecvMsg        uint64
87         ResetTestFlag     bool
88         Counters          map[string]xapp.Counter
89         LoggerLevel       int
90         UTTesting         bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         restDuplicateCtrl := new(DuplicateCtrl)
137         restDuplicateCtrl.Init()
138
139         c := &Control{e2ap: new(E2ap),
140                 registry:          registry,
141                 tracker:           tracker,
142                 restDuplicateCtrl: restDuplicateCtrl,
143                 e2SubsDb:          CreateSdl(),
144                 restSubsDb:        CreateRESTSdl(),
145                 Counters:          xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
146                 LoggerLevel:       4,
147         }
148         c.ReadConfigParameters("")
149
150         // Register REST handler for testing support
151         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
152         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
153         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
154
155         if readSubsFromDb == "false" {
156                 return c
157         }
158
159         // Read subscriptions from db
160         c.ReadE2Subscriptions()
161         c.ReadRESTSubscriptions()
162
163         go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
164
165         return c
166 }
167
168 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
169         subscriptions, _ := c.registry.QueryHandler()
170         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
171 }
172
173 //-------------------------------------------------------------------
174 //
175 //-------------------------------------------------------------------
176 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
177         xapp.Logger.Debug("GetAllRestSubscriptions() called")
178         response := c.registry.GetAllRestSubscriptions()
179         w.Write(response)
180 }
181
182 //-------------------------------------------------------------------
183 //
184 //-------------------------------------------------------------------
185 func (c *Control) ReadE2Subscriptions() error {
186         var err error
187         var subIds []uint32
188         var register map[uint32]*Subscription
189         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
190                 xapp.Logger.Debug("Reading E2 subscriptions from db")
191                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
192                 if err != nil {
193                         xapp.Logger.Error("%v", err)
194                         <-time.After(1 * time.Second)
195                 } else {
196                         c.registry.subIds = subIds
197                         c.registry.register = register
198                         c.HandleUncompletedSubscriptions(register)
199                         return nil
200                 }
201         }
202         xapp.Logger.Debug("Continuing without retring")
203         return err
204 }
205
206 //-------------------------------------------------------------------
207 //
208 //-------------------------------------------------------------------
209 func (c *Control) ReadRESTSubscriptions() error {
210         var err error
211         var restSubscriptions map[string]*RESTSubscription
212         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
213                 xapp.Logger.Debug("Reading REST subscriptions from db")
214                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
215                 if err != nil {
216                         xapp.Logger.Error("%v", err)
217                         <-time.After(1 * time.Second)
218                 } else {
219                         c.registry.restSubscriptions = restSubscriptions
220                         return nil
221                 }
222         }
223         xapp.Logger.Debug("Continuing without retring")
224         return err
225 }
226
227 //-------------------------------------------------------------------
228 //
229 //-------------------------------------------------------------------
230 func (c *Control) ReadConfigParameters(f string) {
231
232         c.LoggerLevel = int(xapp.Logger.GetLevel())
233         xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
234
235         // viper.GetDuration returns nanoseconds
236         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
237         if e2tSubReqTimeout == 0 {
238                 e2tSubReqTimeout = 2000 * 1000000
239         }
240         xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
241
242         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
243         if e2tSubDelReqTime == 0 {
244                 e2tSubDelReqTime = 2000 * 1000000
245         }
246         xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
247         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
248         if e2tRecvMsgTimeout == 0 {
249                 e2tRecvMsgTimeout = 2000 * 1000000
250         }
251         xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
252
253         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
254         if e2tMaxSubReqTryCount == 0 {
255                 e2tMaxSubReqTryCount = 1
256         }
257         xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
258
259         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
260         if e2tMaxSubDelReqTryCount == 0 {
261                 e2tMaxSubDelReqTryCount = 1
262         }
263         xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
264
265         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
266         if readSubsFromDb == "" {
267                 readSubsFromDb = "true"
268         }
269         xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
270
271         dbTryCount = viper.GetInt("controls.dbTryCount")
272         if dbTryCount == 0 {
273                 dbTryCount = 200
274         }
275         xapp.Logger.Debug("dbTryCount %v", dbTryCount)
276
277         dbRetryForever = viper.GetString("controls.dbRetryForever")
278         if dbRetryForever == "" {
279                 dbRetryForever = "true"
280         }
281         xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
282
283         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
284         // value 100ms used currently only in unittests.
285         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
286         if waitRouteCleanup_ms == 0 {
287                 waitRouteCleanup_ms = 5000 * 1000000
288         }
289         xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
290 }
291
292 //-------------------------------------------------------------------
293 //
294 //-------------------------------------------------------------------
295 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
296
297         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
298         for subId, subs := range register {
299                 if subs.SubRespRcvd == false {
300                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
301                         if subs.PolicyUpdate == false {
302                                 subs.NoRespToXapp = true
303                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
304                                 c.SendSubscriptionDeleteReq(subs)
305                         }
306                 }
307         }
308 }
309
310 func (c *Control) ReadyCB(data interface{}) {
311         if c.RMRClient == nil {
312                 c.RMRClient = xapp.Rmr
313         }
314 }
315
316 func (c *Control) Run() {
317         xapp.SetReadyCB(c.ReadyCB, nil)
318         xapp.AddConfigChangeListener(c.ReadConfigParameters)
319         xapp.Run(c)
320 }
321
322 //-------------------------------------------------------------------
323 //
324 //-------------------------------------------------------------------
325 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
326
327         var restSubId string
328         var restSubscription *RESTSubscription
329         var err error
330
331         prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
332         if p.SubscriptionID == "" {
333                 // Subscription does not contain REST subscription Id
334                 if exists {
335                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
336                         if restSubscription != nil {
337                                 // Subscription not found
338                                 restSubId = prevRestSubsId
339                                 if err == nil {
340                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
341                                 } else {
342                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
343                                 }
344                         } else {
345                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
346                                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
347                         }
348                 }
349
350                 if restSubscription == nil {
351                         restSubId = ksuid.New().String()
352                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
353                 }
354         } else {
355                 // Subscription contains REST subscription Id
356                 restSubId = p.SubscriptionID
357
358                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
359                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
360                 if err != nil {
361                         // Subscription with id in REST request does not exist
362                         xapp.Logger.Error("%s", err.Error())
363                         c.UpdateCounter(cRestSubFailToXapp)
364                         return nil, "", err
365                 }
366
367                 if !exists {
368                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
369                 } else {
370                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
371                 }
372         }
373
374         return restSubscription, restSubId, nil
375 }
376
377 //-------------------------------------------------------------------
378 //
379 //-------------------------------------------------------------------
380 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
381
382         c.CntRecvMsg++
383         c.UpdateCounter(cRestSubReqFromXapp)
384
385         subResp := models.SubscriptionResponse{}
386         p := params.(*models.SubscriptionParams)
387
388         if c.LoggerLevel > 2 {
389                 c.PrintRESTSubscriptionRequest(p)
390         }
391
392         if p.ClientEndpoint == nil {
393                 err := fmt.Errorf("ClientEndpoint == nil")
394                 xapp.Logger.Error("%v", err)
395                 c.UpdateCounter(cRestSubFailToXapp)
396                 return nil, common.SubscribeBadRequestCode
397         }
398
399         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
400         if err != nil {
401                 xapp.Logger.Error("%s", err.Error())
402                 c.UpdateCounter(cRestSubFailToXapp)
403                 return nil, common.SubscribeBadRequestCode
404         }
405
406         md5sum, err := CalculateRequestMd5sum(params)
407         if err != nil {
408                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
409         }
410
411         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
412         if err != nil {
413                 xapp.Logger.Error("Subscription with id in REST request does not exist")
414                 return nil, common.SubscribeNotFoundCode
415         }
416
417         subResp.SubscriptionID = &restSubId
418         subReqList := e2ap.SubscriptionRequestList{}
419         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
420         if err != nil {
421                 xapp.Logger.Error("%s", err.Error())
422                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
423                 c.registry.DeleteRESTSubscription(&restSubId)
424                 c.UpdateCounter(cRestSubFailToXapp)
425                 return nil, common.SubscribeBadRequestCode
426         }
427
428         duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
429         if duplicate {
430                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
431                 xapp.Logger.Debug("%s", err)
432                 c.UpdateCounter(cRestSubRespToXapp)
433                 return &subResp, common.SubscribeCreatedCode
434         }
435
436         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
437         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
438         if err != nil {
439                 xapp.Logger.Error("%s", err)
440                 return nil, common.SubscribeBadRequestCode
441         }
442         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
443
444         c.UpdateCounter(cRestSubRespToXapp)
445         return &subResp, common.SubscribeCreatedCode
446 }
447
448 //-------------------------------------------------------------------
449 //
450 //-------------------------------------------------------------------
451 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
452
453         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
454         if p == nil || p.E2SubscriptionDirectives == nil {
455                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
456                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
457                 e2SubscriptionDirectives.CreateRMRRoute = true
458                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
459         } else {
460                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
461                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
462                 } else {
463                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
464                 }
465                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
466                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
467                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
468                 } else {
469                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
470                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
471                         } else {
472                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
473                         }
474                 }
475                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
476         }
477         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
478         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
479         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
480         return e2SubscriptionDirectives, nil
481 }
482
483 //-------------------------------------------------------------------
484 //
485 //-------------------------------------------------------------------
486
487 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
488         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
489
490         c.SubscriptionProcessingStartDelay()
491         xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
492
493         var xAppEventInstanceID int64
494         var e2EventInstanceID int64
495         errorInfo := &ErrorInfo{}
496
497         defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
498
499         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
500                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
501                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
502
503                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
504                 if trans == nil {
505                         // Send notification to xApp that prosessing of a Subscription Request has failed.
506                         err := fmt.Errorf("Tracking failure")
507                         errorInfo.ErrorCause = err.Error()
508                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
509                         continue
510                 }
511
512                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
513
514                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
515
516                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
517                 trans.Release()
518
519                 if err != nil {
520                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
521                 } else {
522                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
523                         restSubscription.AddMd5Sum(md5sum)
524                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
525                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
526                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
527                 }
528         }
529 }
530
531 //-------------------------------------------------------------------
532 //
533 //------------------------------------------------------------------
534 func (c *Control) SubscriptionProcessingStartDelay() {
535         if c.UTTesting == true {
536                 // This is temporary fix for the UT problem that notification arrives before subscription response
537                 // Correct fix would be to allow notification come before response and process it correctly
538                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
539                 <-time.After(time.Millisecond * 50)
540                 xapp.Logger.Debug("Continuing after delay")
541         }
542 }
543
544 //-------------------------------------------------------------------
545 //
546 //------------------------------------------------------------------
547 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
548         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
549
550         errorInfo := ErrorInfo{}
551
552         err := c.tracker.Track(trans)
553         if err != nil {
554                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
555                 errorInfo.ErrorCause = err.Error()
556                 err = fmt.Errorf("Tracking failure")
557                 return nil, &errorInfo, err
558         }
559
560         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
561         if err != nil {
562                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
563                 return nil, &errorInfo, err
564         }
565
566         //
567         // Wake subs request
568         //
569         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
570         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
571
572         err = nil
573         if event != nil {
574                 switch themsg := event.(type) {
575                 case *e2ap.E2APSubscriptionResponse:
576                         trans.Release()
577                         return themsg, &errorInfo, nil
578                 case *e2ap.E2APSubscriptionFailure:
579                         err = fmt.Errorf("E2 SubscriptionFailure received")
580                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
581                         return nil, &errorInfo, err
582                 case *PackSubscriptionRequestErrortEvent:
583                         err = fmt.Errorf("E2 SubscriptionRequest pack failure")
584                         return nil, &themsg.ErrorInfo, err
585                 case *SDLWriteErrortEvent:
586                         err = fmt.Errorf("SDL write failure")
587                         return nil, &themsg.ErrorInfo, err
588                 default:
589                         err = fmt.Errorf("Unexpected E2 subscription response received")
590                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
591                         break
592                 }
593         } else {
594                 err = fmt.Errorf("E2 subscription response timeout")
595                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
596                 if subs.PolicyUpdate == true {
597                         return nil, &errorInfo, err
598                 }
599         }
600
601         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
602         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
603         return nil, &errorInfo, err
604 }
605
606 //-------------------------------------------------------------------
607 //
608 //-------------------------------------------------------------------
609 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
610         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
611
612         // Send notification to xApp that prosessing of a Subscription Request has failed.
613         e2EventInstanceID := (int64)(0)
614         if errorInfo.ErrorSource == "" {
615                 // Submgr is default source of error
616                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
617         }
618         resp := &models.SubscriptionResponse{
619                 SubscriptionID: restSubId,
620                 SubscriptionInstances: []*models.SubscriptionInstance{
621                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
622                                 ErrorCause:          errorInfo.ErrorCause,
623                                 ErrorSource:         errorInfo.ErrorSource,
624                                 TimeoutType:         errorInfo.TimeoutType,
625                                 XappEventInstanceID: &xAppEventInstanceID},
626                 },
627         }
628         // Mark REST subscription request processed.
629         restSubscription.SetProcessed(err)
630         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
631         if trans != nil {
632                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
633                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
634         } else {
635                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
636                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
637         }
638
639         c.UpdateCounter(cRestSubFailNotifToXapp)
640         xapp.Subscription.Notify(resp, *clientEndpoint)
641 }
642
643 //-------------------------------------------------------------------
644 //
645 //-------------------------------------------------------------------
646 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
647         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
648
649         // Store successfully processed InstanceId for deletion
650         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
651         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
652
653         // Send notification to xApp that a Subscription Request has been processed.
654         resp := &models.SubscriptionResponse{
655                 SubscriptionID: restSubId,
656                 SubscriptionInstances: []*models.SubscriptionInstance{
657                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
658                                 ErrorCause:          "",
659                                 XappEventInstanceID: &xAppEventInstanceID},
660                 },
661         }
662         // Mark REST subscription request processesd.
663         restSubscription.SetProcessed(nil)
664         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
665         xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
666                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
667
668         c.UpdateCounter(cRestSubNotifToXapp)
669         xapp.Subscription.Notify(resp, *clientEndpoint)
670 }
671
672 //-------------------------------------------------------------------
673 //
674 //-------------------------------------------------------------------
675 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
676
677         c.CntRecvMsg++
678         c.UpdateCounter(cRestSubDelReqFromXapp)
679
680         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
681
682         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
683         if err != nil {
684                 xapp.Logger.Error("%s", err.Error())
685                 if restSubscription == nil {
686                         // Subscription was not found
687                         return common.UnsubscribeNoContentCode
688                 } else {
689                         if restSubscription.SubReqOngoing == true {
690                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
691                                 xapp.Logger.Error("%s", err.Error())
692                                 return common.UnsubscribeBadRequestCode
693                         } else if restSubscription.SubDelReqOngoing == true {
694                                 // Previous request for same restSubId still ongoing
695                                 return common.UnsubscribeBadRequestCode
696                         }
697                 }
698         }
699
700         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
701         go func() {
702                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
703                 for _, instanceId := range restSubscription.InstanceIds {
704                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
705
706                         if err != nil {
707                                 xapp.Logger.Error("%s", err.Error())
708                         }
709                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
710                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
711                         restSubscription.DeleteE2InstanceId(instanceId)
712                 }
713                 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
714                 c.registry.DeleteRESTSubscription(&restSubId)
715                 c.RemoveRESTSubscriptionFromDb(restSubId)
716         }()
717
718         c.UpdateCounter(cRestSubDelRespToXapp)
719
720         return common.UnsubscribeNoContentCode
721 }
722
723 //-------------------------------------------------------------------
724 //
725 //-------------------------------------------------------------------
726 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
727
728         var xAppEventInstanceID int64
729         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
730         if err != nil {
731                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
732                         restSubId, instanceId, idstring(err, nil))
733                 return xAppEventInstanceID, nil
734         }
735
736         xAppEventInstanceID = int64(subs.ReqId.Id)
737         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
738         if trans == nil {
739                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
740                 xapp.Logger.Error("%s", err.Error())
741         }
742         defer trans.Release()
743
744         err = c.tracker.Track(trans)
745         if err != nil {
746                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
747                 xapp.Logger.Error("%s", err.Error())
748                 return xAppEventInstanceID, &time.ParseError{}
749         }
750         //
751         // Wake subs delete
752         //
753         go c.handleSubscriptionDelete(subs, trans)
754         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
755
756         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
757
758         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
759
760         return xAppEventInstanceID, nil
761 }
762
763 //-------------------------------------------------------------------
764 //
765 //-------------------------------------------------------------------
766 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
767         xapp.Logger.Debug("RESTQueryHandler() called")
768
769         c.CntRecvMsg++
770
771         return c.registry.QueryHandler()
772 }
773
774 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
775         xapp.Logger.Debug("RESTTestRestHandler() called")
776
777         pathParams := mux.Vars(r)
778         s := pathParams["testId"]
779
780         // This can be used to delete single subscription from db
781         if contains := strings.Contains(s, "deletesubid="); contains == true {
782                 var splits = strings.Split(s, "=")
783                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
784                         xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
785                         c.RemoveSubscriptionFromSdl(uint32(subId))
786                         return
787                 }
788         }
789
790         // This can be used to remove all subscriptions db from
791         if s == "emptydb" {
792                 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
793                 c.RemoveAllSubscriptionsFromSdl()
794                 c.RemoveAllRESTSubscriptionsFromSdl()
795                 return
796         }
797
798         // This is meant to cause submgr's restart in testing
799         if s == "restart" {
800                 xapp.Logger.Debug("os.Exit(1) called")
801                 os.Exit(1)
802         }
803
804         xapp.Logger.Debug("Unsupported rest command received %s", s)
805 }
806
807 //-------------------------------------------------------------------
808 //
809 //-------------------------------------------------------------------
810
811 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
812         params := &xapp.RMRParams{}
813         params.Mtype = trans.GetMtype()
814         params.SubId = int(subs.GetReqId().InstanceId)
815         params.Xid = ""
816         params.Meid = subs.GetMeid()
817         params.Src = ""
818         params.PayloadLen = len(trans.Payload.Buf)
819         params.Payload = trans.Payload.Buf
820         params.Mbuf = nil
821         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
822         err = c.SendWithRetry(params, false, 5)
823         if err != nil {
824                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
825         }
826         return err
827 }
828
829 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
830
831         params := &xapp.RMRParams{}
832         params.Mtype = trans.GetMtype()
833         params.SubId = int(subs.GetReqId().InstanceId)
834         params.Xid = trans.GetXid()
835         params.Meid = trans.GetMeid()
836         params.Src = ""
837         params.PayloadLen = len(trans.Payload.Buf)
838         params.Payload = trans.Payload.Buf
839         params.Mbuf = nil
840         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
841         err = c.SendWithRetry(params, false, 5)
842         if err != nil {
843                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
844         }
845         return err
846 }
847
848 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
849         if c.RMRClient == nil {
850                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
851                 xapp.Logger.Error("%s", err.Error())
852                 return
853         }
854         c.CntRecvMsg++
855
856         defer c.RMRClient.Free(msg.Mbuf)
857
858         // xapp-frame might use direct access to c buffer and
859         // when msg.Mbuf is freed, someone might take it into use
860         // and payload data might be invalid inside message handle function
861         //
862         // subscriptions won't load system a lot so there is no
863         // real performance hit by cloning buffer into new go byte slice
864         cPay := append(msg.Payload[:0:0], msg.Payload...)
865         msg.Payload = cPay
866         msg.PayloadLen = len(cPay)
867
868         switch msg.Mtype {
869         case xapp.RIC_SUB_REQ:
870                 go c.handleXAPPSubscriptionRequest(msg)
871         case xapp.RIC_SUB_RESP:
872                 go c.handleE2TSubscriptionResponse(msg)
873         case xapp.RIC_SUB_FAILURE:
874                 go c.handleE2TSubscriptionFailure(msg)
875         case xapp.RIC_SUB_DEL_REQ:
876                 go c.handleXAPPSubscriptionDeleteRequest(msg)
877         case xapp.RIC_SUB_DEL_RESP:
878                 go c.handleE2TSubscriptionDeleteResponse(msg)
879         case xapp.RIC_SUB_DEL_FAILURE:
880                 go c.handleE2TSubscriptionDeleteFailure(msg)
881         default:
882                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
883         }
884         return
885 }
886
887 //-------------------------------------------------------------------
888 // handle from XAPP Subscription Request
889 //------------------------------------------------------------------
890 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
891         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
892         c.UpdateCounter(cSubReqFromXapp)
893
894         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
895         if err != nil {
896                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
897                 return
898         }
899
900         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
901         if trans == nil {
902                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
903                 return
904         }
905         defer trans.Release()
906
907         if err = c.tracker.Track(trans); err != nil {
908                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
909                 return
910         }
911
912         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
913         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
914         if err != nil {
915                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
916                 return
917         }
918
919         c.wakeSubscriptionRequest(subs, trans)
920 }
921
922 //-------------------------------------------------------------------
923 // Wake Subscription Request to E2node
924 //------------------------------------------------------------------
925 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
926
927         e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
928         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
929         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
930         var err error
931         if event != nil {
932                 switch themsg := event.(type) {
933                 case *e2ap.E2APSubscriptionResponse:
934                         themsg.RequestId.Id = trans.RequestId.Id
935                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
936                         if err == nil {
937                                 trans.Release()
938                                 c.UpdateCounter(cSubRespToXapp)
939                                 c.rmrSendToXapp("", subs, trans)
940                                 return
941                         }
942                 case *e2ap.E2APSubscriptionFailure:
943                         themsg.RequestId.Id = trans.RequestId.Id
944                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
945                         if err == nil {
946                                 c.UpdateCounter(cSubFailToXapp)
947                                 c.rmrSendToXapp("", subs, trans)
948                         }
949                 default:
950                         break
951                 }
952         }
953         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
954         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
955 }
956
957 //-------------------------------------------------------------------
958 // handle from XAPP Subscription Delete Request
959 //------------------------------------------------------------------
960 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
961         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
962         c.UpdateCounter(cSubDelReqFromXapp)
963
964         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
965         if err != nil {
966                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
967                 return
968         }
969
970         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
971         if trans == nil {
972                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
973                 return
974         }
975         defer trans.Release()
976
977         err = c.tracker.Track(trans)
978         if err != nil {
979                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
980                 return
981         }
982
983         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
984         if err != nil {
985                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
986                 return
987         }
988
989         //
990         // Wake subs delete
991         //
992         go c.handleSubscriptionDelete(subs, trans)
993         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
994
995         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
996
997         if subs.NoRespToXapp == true {
998                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
999                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1000                 return
1001         }
1002
1003         // Whatever is received success, fail or timeout, send successful delete response
1004         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1005         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1006         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1007         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1008         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1009         if err == nil {
1010                 c.UpdateCounter(cSubDelRespToXapp)
1011                 c.rmrSendToXapp("", subs, trans)
1012         }
1013
1014         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1015         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1016 }
1017
1018 //-------------------------------------------------------------------
1019 // SUBS CREATE Handling
1020 //-------------------------------------------------------------------
1021 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1022
1023         var event interface{} = nil
1024         var removeSubscriptionFromDb bool = false
1025         trans := c.tracker.NewSubsTransaction(subs)
1026         subs.WaitTransactionTurn(trans)
1027         defer subs.ReleaseTransactionTurn(trans)
1028         defer trans.Release()
1029
1030         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1031
1032         subRfMsg, valid := subs.GetCachedResponse()
1033         if subRfMsg == nil && valid == true {
1034                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1035                 switch event.(type) {
1036                 case *e2ap.E2APSubscriptionResponse:
1037                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1038                         subs.SubRespRcvd = true
1039                 case *e2ap.E2APSubscriptionFailure:
1040                         removeSubscriptionFromDb = true
1041                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1042                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1043                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1044                 case *SubmgrRestartTestEvent:
1045                         // This simulates that no response has been received and after restart subscriptions are restored from db
1046                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1047                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1048                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1049                 default:
1050                         if subs.PolicyUpdate == false {
1051                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1052                                 removeSubscriptionFromDb = true
1053                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1054                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1055                         }
1056                 }
1057                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1058         } else {
1059                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1060         }
1061
1062         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1063         if err != nil {
1064                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1065                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1066         }
1067
1068         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1069         if valid == false {
1070                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1071         }
1072
1073         parentTrans.SendEvent(subRfMsg, 0)
1074 }
1075
1076 //-------------------------------------------------------------------
1077 // SUBS DELETE Handling
1078 //-------------------------------------------------------------------
1079
1080 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1081
1082         trans := c.tracker.NewSubsTransaction(subs)
1083         subs.WaitTransactionTurn(trans)
1084         defer subs.ReleaseTransactionTurn(trans)
1085         defer trans.Release()
1086
1087         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1088
1089         subs.mutex.Lock()
1090
1091         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1092                 subs.valid = false
1093                 subs.mutex.Unlock()
1094                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1095         } else {
1096                 subs.mutex.Unlock()
1097         }
1098         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1099         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1100         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1101         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1102         c.registry.UpdateSubscriptionToDb(subs, c)
1103         parentTrans.SendEvent(nil, 0)
1104 }
1105
1106 //-------------------------------------------------------------------
1107 // send to E2T Subscription Request
1108 //-------------------------------------------------------------------
1109 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1110         var err error
1111         var event interface{} = nil
1112         var timedOut bool = false
1113         const ricRequestorId = 123
1114
1115         subReqMsg := subs.SubReqMsg
1116         subReqMsg.RequestId = subs.GetReqId().RequestId
1117         subReqMsg.RequestId.Id = ricRequestorId
1118         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1119         if err != nil {
1120                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1121                 return &PackSubscriptionRequestErrortEvent{
1122                         ErrorInfo{
1123                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1124                                 ErrorCause:  err.Error(),
1125                         },
1126                 }
1127         }
1128
1129         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1130         err = c.WriteSubscriptionToDb(subs)
1131         if err != nil {
1132                 return &SDLWriteErrortEvent{
1133                         ErrorInfo{
1134                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1135                                 ErrorCause:  err.Error(),
1136                         },
1137                 }
1138         }
1139
1140         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1141                 desc := fmt.Sprintf("(retry %d)", retries)
1142                 if retries == 0 {
1143                         c.UpdateCounter(cSubReqToE2)
1144                 } else {
1145                         c.UpdateCounter(cSubReReqToE2)
1146                 }
1147                 c.rmrSendToE2T(desc, subs, trans)
1148                 if subs.DoNotWaitSubResp == false {
1149                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1150                         if timedOut {
1151                                 c.UpdateCounter(cSubReqTimerExpiry)
1152                                 continue
1153                         }
1154                 } else {
1155                         // Simulating case where subscrition request has been sent but response has not been received before restart
1156                         event = &SubmgrRestartTestEvent{}
1157                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1158                 }
1159                 break
1160         }
1161         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1162         return event
1163 }
1164
1165 //-------------------------------------------------------------------
1166 // send to E2T Subscription Delete Request
1167 //-------------------------------------------------------------------
1168
1169 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1170         var err error
1171         var event interface{}
1172         var timedOut bool
1173         const ricRequestorId = 123
1174
1175         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1176         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1177         subDelReqMsg.RequestId.Id = ricRequestorId
1178         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1179         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1180         if err != nil {
1181                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1182                 return event
1183         }
1184
1185         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1186                 desc := fmt.Sprintf("(retry %d)", retries)
1187                 if retries == 0 {
1188                         c.UpdateCounter(cSubDelReqToE2)
1189                 } else {
1190                         c.UpdateCounter(cSubDelReReqToE2)
1191                 }
1192                 c.rmrSendToE2T(desc, subs, trans)
1193                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1194                 if timedOut {
1195                         c.UpdateCounter(cSubDelReqTimerExpiry)
1196                         continue
1197                 }
1198                 break
1199         }
1200         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1201         return event
1202 }
1203
1204 //-------------------------------------------------------------------
1205 // handle from E2T Subscription Response
1206 //-------------------------------------------------------------------
1207 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1208         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1209         c.UpdateCounter(cSubRespFromE2)
1210
1211         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1212         if err != nil {
1213                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1214                 return
1215         }
1216         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1217         if err != nil {
1218                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1219                 return
1220         }
1221         trans := subs.GetTransaction()
1222         if trans == nil {
1223                 err = fmt.Errorf("Ongoing transaction not found")
1224                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1225                 return
1226         }
1227         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1228         if sendOk == false {
1229                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1230                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1231         }
1232         return
1233 }
1234
1235 //-------------------------------------------------------------------
1236 // handle from E2T Subscription Failure
1237 //-------------------------------------------------------------------
1238 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1239         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1240         c.UpdateCounter(cSubFailFromE2)
1241         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1242         if err != nil {
1243                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1244                 return
1245         }
1246         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1247         if err != nil {
1248                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1249                 return
1250         }
1251         trans := subs.GetTransaction()
1252         if trans == nil {
1253                 err = fmt.Errorf("Ongoing transaction not found")
1254                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1255                 return
1256         }
1257         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1258         if sendOk == false {
1259                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1260                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1261         }
1262         return
1263 }
1264
1265 //-------------------------------------------------------------------
1266 // handle from E2T Subscription Delete Response
1267 //-------------------------------------------------------------------
1268 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1269         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1270         c.UpdateCounter(cSubDelRespFromE2)
1271         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1272         if err != nil {
1273                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1274                 return
1275         }
1276         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1277         if err != nil {
1278                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1279                 return
1280         }
1281         trans := subs.GetTransaction()
1282         if trans == nil {
1283                 err = fmt.Errorf("Ongoing transaction not found")
1284                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1285                 return
1286         }
1287         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1288         if sendOk == false {
1289                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1290                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1291         }
1292         return
1293 }
1294
1295 //-------------------------------------------------------------------
1296 // handle from E2T Subscription Delete Failure
1297 //-------------------------------------------------------------------
1298 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1299         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1300         c.UpdateCounter(cSubDelFailFromE2)
1301         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1302         if err != nil {
1303                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1304                 return
1305         }
1306         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1307         if err != nil {
1308                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1309                 return
1310         }
1311         trans := subs.GetTransaction()
1312         if trans == nil {
1313                 err = fmt.Errorf("Ongoing transaction not found")
1314                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1315                 return
1316         }
1317         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1318         if sendOk == false {
1319                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1320                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1321         }
1322         return
1323 }
1324
1325 //-------------------------------------------------------------------
1326 //
1327 //-------------------------------------------------------------------
1328 func typeofSubsMessage(v interface{}) string {
1329         if v == nil {
1330                 return "NIL"
1331         }
1332         switch v.(type) {
1333         //case *e2ap.E2APSubscriptionRequest:
1334         //      return "SubReq"
1335         case *e2ap.E2APSubscriptionResponse:
1336                 return "SubResp"
1337         case *e2ap.E2APSubscriptionFailure:
1338                 return "SubFail"
1339         //case *e2ap.E2APSubscriptionDeleteRequest:
1340         //      return "SubDelReq"
1341         case *e2ap.E2APSubscriptionDeleteResponse:
1342                 return "SubDelResp"
1343         case *e2ap.E2APSubscriptionDeleteFailure:
1344                 return "SubDelFail"
1345         default:
1346                 return "Unknown"
1347         }
1348 }
1349
1350 //-------------------------------------------------------------------
1351 //
1352 //-------------------------------------------------------------------
1353 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1354         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1355         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1356         if err != nil {
1357                 xapp.Logger.Error("%v", err)
1358                 return err
1359         }
1360         return nil
1361 }
1362
1363 //-------------------------------------------------------------------
1364 //
1365 //-------------------------------------------------------------------
1366 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1367
1368         if removeSubscriptionFromDb == true {
1369                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1370                 c.RemoveSubscriptionFromDb(subs)
1371         } else {
1372                 // Update is needed for successful response and merge case here
1373                 if subs.RetryFromXapp == false {
1374                         err := c.WriteSubscriptionToDb(subs)
1375                         return err
1376                 }
1377         }
1378         subs.RetryFromXapp = false
1379         return nil
1380 }
1381
1382 //-------------------------------------------------------------------
1383 //
1384 //-------------------------------------------------------------------
1385 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1386         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1387         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1388         if err != nil {
1389                 xapp.Logger.Error("%v", err)
1390         }
1391 }
1392
1393 //-------------------------------------------------------------------
1394 //
1395 //-------------------------------------------------------------------
1396 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1397         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1398         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1399         if err != nil {
1400                 xapp.Logger.Error("%v", err)
1401         }
1402 }
1403
1404 //-------------------------------------------------------------------
1405 //
1406 //-------------------------------------------------------------------
1407 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1408
1409         if removeRestSubscriptionFromDb == true {
1410                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1411                 c.RemoveRESTSubscriptionFromDb(restSubId)
1412         } else {
1413                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1414         }
1415 }
1416
1417 //-------------------------------------------------------------------
1418 //
1419 //-------------------------------------------------------------------
1420 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1421         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1422         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1423         if err != nil {
1424                 xapp.Logger.Error("%v", err)
1425         }
1426 }
1427
1428 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1429
1430         const ricRequestorId = 123
1431         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1432
1433         // Send delete for every endpoint in the subscription
1434         if subs.PolicyUpdate == false {
1435                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1436                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1437                 subDelReqMsg.RequestId.Id = ricRequestorId
1438                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1439                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1440                 if err != nil {
1441                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1442                         return
1443                 }
1444                 for _, endPoint := range subs.EpList.Endpoints {
1445                         params := &xapp.RMRParams{}
1446                         params.Mtype = mType
1447                         params.SubId = int(subs.GetReqId().InstanceId)
1448                         params.Xid = ""
1449                         params.Meid = subs.Meid
1450                         params.Src = endPoint.String()
1451                         params.PayloadLen = len(payload.Buf)
1452                         params.Payload = payload.Buf
1453                         params.Mbuf = nil
1454                         subs.DeleteFromDb = true
1455                         c.handleXAPPSubscriptionDeleteRequest(params)
1456                 }
1457         }
1458 }
1459
1460 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1461
1462         fmt.Println("CRESTSubscriptionRequest")
1463
1464         if p == nil {
1465                 return
1466         }
1467
1468         if p.SubscriptionID != "" {
1469                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1470         } else {
1471                 fmt.Println("  SubscriptionID = ''")
1472         }
1473
1474         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1475
1476         if p.ClientEndpoint.HTTPPort != nil {
1477                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1478         } else {
1479                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1480         }
1481
1482         if p.ClientEndpoint.RMRPort != nil {
1483                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1484         } else {
1485                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1486         }
1487
1488         if p.Meid != nil {
1489                 fmt.Printf("  Meid = %s\n", *p.Meid)
1490         } else {
1491                 fmt.Println("  Meid = nil")
1492         }
1493
1494         if p.E2SubscriptionDirectives == nil {
1495                 fmt.Println("  E2SubscriptionDirectives = nil")
1496         } else {
1497                 fmt.Println("  E2SubscriptionDirectives")
1498                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1499                         fmt.Println("    E2RetryCount == nil")
1500                 } else {
1501                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1502                 }
1503                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1504                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1505         }
1506         for _, subscriptionDetail := range p.SubscriptionDetails {
1507                 if p.RANFunctionID != nil {
1508                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1509                 } else {
1510                         fmt.Println("  RANFunctionID = nil")
1511                 }
1512                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1513                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1514
1515                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1516                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1517                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1518                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1519
1520                         if actionToBeSetup.SubsequentAction != nil {
1521                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1522                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1523                         } else {
1524                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1525                         }
1526                 }
1527         }
1528 }