1bcffc6d8f843839557b4c737429cfc75c6ccee5
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35         httptransport "github.com/go-openapi/runtime/client"
36         "github.com/go-openapi/strfmt"
37         "github.com/gorilla/mux"
38         "github.com/segmentio/ksuid"
39         "github.com/spf13/viper"
40 )
41
42 //-----------------------------------------------------------------------------
43 //
44 //-----------------------------------------------------------------------------
45
46 func idstring(err error, entries ...fmt.Stringer) string {
47         var retval string = ""
48         var filler string = ""
49         for _, entry := range entries {
50                 if entry != nil {
51                         retval += filler + entry.String()
52                         filler = " "
53                 } else {
54                         retval += filler + "(NIL)"
55                 }
56         }
57         if err != nil {
58                 retval += filler + "err(" + err.Error() + ")"
59                 filler = " "
60         }
61         return retval
62 }
63
64 //-----------------------------------------------------------------------------
65 //
66 //-----------------------------------------------------------------------------
67
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64    // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var restDuplicateCtrl duplicateCtrl
76 var dbRetryForever string
77 var dbTryCount int
78
79 type Control struct {
80         *xapp.RMRClient
81         e2ap          *E2ap
82         registry      *Registry
83         tracker       *Tracker
84         e2SubsDb      Sdlnterface
85         restSubsDb    Sdlnterface
86         CntRecvMsg    uint64
87         ResetTestFlag bool
88         Counters      map[string]xapp.Counter
89         LoggerLevel   int
90         UTTesting     bool
91 }
92
93 type RMRMeid struct {
94         PlmnID  string
95         EnbID   string
96         RanName string
97 }
98
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
102         ErrorInfo ErrorInfo
103 }
104
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106         p.ErrorInfo = *errorInfo
107 }
108
109 type SDLWriteErrortEvent struct {
110         ErrorInfo ErrorInfo
111 }
112
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114         s.ErrorInfo = *errorInfo
115 }
116
117 func init() {
118         xapp.Logger.Debug("SUBMGR")
119         viper.AutomaticEnv()
120         viper.SetEnvPrefix("submgr")
121         viper.AllowEmptyEnv(true)
122 }
123
124 func NewControl() *Control {
125
126         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128
129         registry := new(Registry)
130         registry.Initialize()
131         registry.rtmgrClient = &rtmgrClient
132
133         tracker := new(Tracker)
134         tracker.Init()
135
136         c := &Control{e2ap: new(E2ap),
137                 registry:    registry,
138                 tracker:     tracker,
139                 e2SubsDb:    CreateSdl(),
140                 restSubsDb:  CreateRESTSdl(),
141                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
142                 LoggerLevel: 3,
143         }
144         c.ReadConfigParameters("")
145
146         // Register REST handler for testing support
147         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
148         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
149         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
150
151         go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
152
153         if readSubsFromDb == "false" {
154                 return c
155         }
156
157         restDuplicateCtrl.Init()
158
159         // Read subscriptions from db
160         c.ReadE2Subscriptions()
161         c.ReadRESTSubscriptions()
162         return c
163 }
164
165 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
166         subscriptions, _ := c.registry.QueryHandler()
167         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
168 }
169
170 //-------------------------------------------------------------------
171 //
172 //-------------------------------------------------------------------
173 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
174         xapp.Logger.Debug("GetAllRestSubscriptions() called")
175         response := c.registry.GetAllRestSubscriptions()
176         w.Write(response)
177 }
178
179 //-------------------------------------------------------------------
180 //
181 //-------------------------------------------------------------------
182 func (c *Control) ReadE2Subscriptions() error {
183         var err error
184         var subIds []uint32
185         var register map[uint32]*Subscription
186         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
187                 xapp.Logger.Debug("Reading E2 subscriptions from db")
188                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
189                 if err != nil {
190                         xapp.Logger.Error("%v", err)
191                         <-time.After(1 * time.Second)
192                 } else {
193                         c.registry.subIds = subIds
194                         c.registry.register = register
195                         c.HandleUncompletedSubscriptions(register)
196                         return nil
197                 }
198         }
199         xapp.Logger.Debug("Continuing without retring")
200         return err
201 }
202
203 //-------------------------------------------------------------------
204 //
205 //-------------------------------------------------------------------
206 func (c *Control) ReadRESTSubscriptions() error {
207         var err error
208         var restSubscriptions map[string]*RESTSubscription
209         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
210                 xapp.Logger.Debug("Reading REST subscriptions from db")
211                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
212                 if err != nil {
213                         xapp.Logger.Error("%v", err)
214                         <-time.After(1 * time.Second)
215                 } else {
216                         c.registry.restSubscriptions = restSubscriptions
217                         return nil
218                 }
219         }
220         xapp.Logger.Debug("Continuing without retring")
221         return err
222 }
223
224 //-------------------------------------------------------------------
225 //
226 //-------------------------------------------------------------------
227 func (c *Control) ReadConfigParameters(f string) {
228
229         c.LoggerLevel = int(xapp.Logger.GetLevel())
230         xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
231
232         // viper.GetDuration returns nanoseconds
233         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
234         if e2tSubReqTimeout == 0 {
235                 e2tSubReqTimeout = 2000 * 1000000
236         }
237         xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
238
239         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
240         if e2tSubDelReqTime == 0 {
241                 e2tSubDelReqTime = 2000 * 1000000
242         }
243         xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
244         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
245         if e2tRecvMsgTimeout == 0 {
246                 e2tRecvMsgTimeout = 2000 * 1000000
247         }
248         xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
249
250         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
251         if e2tMaxSubReqTryCount == 0 {
252                 e2tMaxSubReqTryCount = 1
253         }
254         xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
255
256         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
257         if e2tMaxSubDelReqTryCount == 0 {
258                 e2tMaxSubDelReqTryCount = 1
259         }
260         xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
261
262         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
263         if readSubsFromDb == "" {
264                 readSubsFromDb = "true"
265         }
266         xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
267
268         dbTryCount = viper.GetInt("controls.dbTryCount")
269         if dbTryCount == 0 {
270                 dbTryCount = 200
271         }
272         xapp.Logger.Debug("dbTryCount %v", dbTryCount)
273
274         dbRetryForever = viper.GetString("controls.dbRetryForever")
275         if dbRetryForever == "" {
276                 dbRetryForever = "true"
277         }
278         xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
279
280         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
281         // value 100ms used currently only in unittests.
282         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
283         if waitRouteCleanup_ms == 0 {
284                 waitRouteCleanup_ms = 5000 * 1000000
285         }
286         xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
287 }
288
289 //-------------------------------------------------------------------
290 //
291 //-------------------------------------------------------------------
292 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
293
294         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
295         for subId, subs := range register {
296                 if subs.SubRespRcvd == false {
297                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
298                         if subs.PolicyUpdate == false {
299                                 subs.NoRespToXapp = true
300                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
301                                 c.SendSubscriptionDeleteReq(subs)
302                         }
303                 }
304         }
305 }
306
307 func (c *Control) ReadyCB(data interface{}) {
308         if c.RMRClient == nil {
309                 c.RMRClient = xapp.Rmr
310         }
311 }
312
313 func (c *Control) Run() {
314         xapp.SetReadyCB(c.ReadyCB, nil)
315         xapp.AddConfigChangeListener(c.ReadConfigParameters)
316         xapp.Run(c)
317 }
318
319 //-------------------------------------------------------------------
320 //
321 //-------------------------------------------------------------------
322 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
323
324         var restSubId string
325         var restSubscription *RESTSubscription
326         var err error
327
328         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
329         if p.SubscriptionID == "" {
330                 // Subscription does not contain REST subscription Id
331                 if exists {
332                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333                         if restSubscription != nil {
334                                 // Subscription not found
335                                 restSubId = prevRestSubsId
336                                 if err == nil {
337                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
338                                 } else {
339                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
340                                 }
341                         } else {
342                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
343                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
344                         }
345                 }
346
347                 if restSubscription == nil {
348                         restSubId = ksuid.New().String()
349                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
350                 }
351         } else {
352                 // Subscription contains REST subscription Id
353                 restSubId = p.SubscriptionID
354
355                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
356                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
357                 if err != nil {
358                         // Subscription with id in REST request does not exist
359                         xapp.Logger.Error("%s", err.Error())
360                         c.UpdateCounter(cRestSubFailToXapp)
361                         return nil, "", err
362                 }
363
364                 if !exists {
365                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
366                 } else {
367                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
368                 }
369         }
370
371         return restSubscription, restSubId, nil
372 }
373
374 //-------------------------------------------------------------------
375 //
376 //-------------------------------------------------------------------
377 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
378
379         c.CntRecvMsg++
380         c.UpdateCounter(cRestSubReqFromXapp)
381
382         subResp := models.SubscriptionResponse{}
383         p := params.(*models.SubscriptionParams)
384
385         if c.LoggerLevel > 2 {
386                 c.PrintRESTSubscriptionRequest(p)
387         }
388
389         if p.ClientEndpoint == nil {
390                 err := fmt.Errorf("ClientEndpoint == nil")
391                 xapp.Logger.Error("%v", err)
392                 c.UpdateCounter(cRestSubFailToXapp)
393                 return nil, common.SubscribeBadRequestCode
394         }
395
396         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
397         if err != nil {
398                 xapp.Logger.Error("%s", err.Error())
399                 c.UpdateCounter(cRestSubFailToXapp)
400                 return nil, common.SubscribeBadRequestCode
401         }
402
403         md5sum, err := CalculateRequestMd5sum(params)
404         if err != nil {
405                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
406         }
407
408         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
409         if err != nil {
410                 xapp.Logger.Error("Subscription with id in REST request does not exist")
411                 return nil, common.SubscribeNotFoundCode
412         }
413
414         subResp.SubscriptionID = &restSubId
415         subReqList := e2ap.SubscriptionRequestList{}
416         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
417         if err != nil {
418                 xapp.Logger.Error("%s", err.Error())
419                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
420                 c.registry.DeleteRESTSubscription(&restSubId)
421                 c.UpdateCounter(cRestSubFailToXapp)
422                 return nil, common.SubscribeBadRequestCode
423         }
424
425         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
426         if duplicate {
427                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
428                 xapp.Logger.Debug("%s", err)
429                 c.UpdateCounter(cRestSubRespToXapp)
430                 return &subResp, common.SubscribeCreatedCode
431         }
432
433         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
434         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
435         if err != nil {
436                 xapp.Logger.Error("%s", err)
437                 return nil, common.SubscribeBadRequestCode
438         }
439         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
440
441         c.UpdateCounter(cRestSubRespToXapp)
442         return &subResp, common.SubscribeCreatedCode
443 }
444
445 //-------------------------------------------------------------------
446 //
447 //-------------------------------------------------------------------
448 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
449
450         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
451         if p == nil || p.E2SubscriptionDirectives == nil {
452                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
453                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
454                 e2SubscriptionDirectives.CreateRMRRoute = true
455                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
456         } else {
457                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
458                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
459                 } else {
460                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
461                 }
462                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
463                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
464                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
465                 } else {
466                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
467                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
468                         } else {
469                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
470                         }
471                 }
472                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
473         }
474         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
475         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
476         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
477         return e2SubscriptionDirectives, nil
478 }
479
480 //-------------------------------------------------------------------
481 //
482 //-------------------------------------------------------------------
483
484 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
485         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
486
487         c.SubscriptionProcessingStartDelay()
488         xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
489
490         var xAppEventInstanceID int64
491         var e2EventInstanceID int64
492         errorInfo := &ErrorInfo{}
493
494         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
495
496         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
497                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
498                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
499
500                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
501                 if trans == nil {
502                         // Send notification to xApp that prosessing of a Subscription Request has failed.
503                         err := fmt.Errorf("Tracking failure")
504                         errorInfo.ErrorCause = err.Error()
505                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
506                         continue
507                 }
508
509                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
510
511                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
512
513                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
514
515                 if err != nil {
516                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
517                 } else {
518                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
519                         restSubscription.AddMd5Sum(md5sum)
520                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
521                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
522                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
523                 }
524                 trans.Release()
525         }
526 }
527
528 //-------------------------------------------------------------------
529 //
530 //------------------------------------------------------------------
531 func (c *Control) SubscriptionProcessingStartDelay() {
532         if c.UTTesting == true {
533                 // This is temporary fix for the UT problem that notification arrives before subscription response
534                 // Correct fix would be to allow notification come before response and process it correctly
535                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
536                 <-time.After(time.Millisecond * 50)
537                 xapp.Logger.Debug("Continuing after delay")
538         }
539 }
540
541 //-------------------------------------------------------------------
542 //
543 //------------------------------------------------------------------
544 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
545         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
546
547         errorInfo := ErrorInfo{}
548
549         err := c.tracker.Track(trans)
550         if err != nil {
551                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
552                 errorInfo.ErrorCause = err.Error()
553                 err = fmt.Errorf("Tracking failure")
554                 return nil, &errorInfo, err
555         }
556
557         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
558         if err != nil {
559                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
560                 return nil, &errorInfo, err
561         }
562
563         //
564         // Wake subs request
565         //
566         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
567         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
568
569         err = nil
570         if event != nil {
571                 switch themsg := event.(type) {
572                 case *e2ap.E2APSubscriptionResponse:
573                         trans.Release()
574                         return themsg, &errorInfo, nil
575                 case *e2ap.E2APSubscriptionFailure:
576                         err = fmt.Errorf("E2 SubscriptionFailure received")
577                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
578                         return nil, &errorInfo, err
579                 case *PackSubscriptionRequestErrortEvent:
580                         err = fmt.Errorf("E2 SubscriptionRequest pack failure")
581                         return nil, &themsg.ErrorInfo, err
582                 case *SDLWriteErrortEvent:
583                         err = fmt.Errorf("SDL write failure")
584                         return nil, &themsg.ErrorInfo, err
585                 default:
586                         err = fmt.Errorf("Unexpected E2 subscription response received")
587                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
588                         break
589                 }
590         } else {
591                 err = fmt.Errorf("E2 subscription response timeout")
592                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
593                 if subs.PolicyUpdate == true {
594                         return nil, &errorInfo, err
595                 }
596         }
597
598         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
599         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
600         return nil, &errorInfo, err
601 }
602
603 //-------------------------------------------------------------------
604 //
605 //-------------------------------------------------------------------
606 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
607         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
608
609         // Send notification to xApp that prosessing of a Subscription Request has failed.
610         e2EventInstanceID := (int64)(0)
611         if errorInfo.ErrorSource == "" {
612                 // Submgr is default source of error
613                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
614         }
615         resp := &models.SubscriptionResponse{
616                 SubscriptionID: restSubId,
617                 SubscriptionInstances: []*models.SubscriptionInstance{
618                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
619                                 ErrorCause:          errorInfo.ErrorCause,
620                                 ErrorSource:         errorInfo.ErrorSource,
621                                 TimeoutType:         errorInfo.TimeoutType,
622                                 XappEventInstanceID: &xAppEventInstanceID},
623                 },
624         }
625         // Mark REST subscription request processed.
626         restSubscription.SetProcessed(err)
627         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
628         if trans != nil {
629                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
630                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
631         } else {
632                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
633                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
634         }
635
636         c.UpdateCounter(cRestSubFailNotifToXapp)
637         xapp.Subscription.Notify(resp, *clientEndpoint)
638 }
639
640 //-------------------------------------------------------------------
641 //
642 //-------------------------------------------------------------------
643 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
644         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
645
646         // Store successfully processed InstanceId for deletion
647         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
648         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
649
650         // Send notification to xApp that a Subscription Request has been processed.
651         resp := &models.SubscriptionResponse{
652                 SubscriptionID: restSubId,
653                 SubscriptionInstances: []*models.SubscriptionInstance{
654                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
655                                 ErrorCause:          "",
656                                 XappEventInstanceID: &xAppEventInstanceID},
657                 },
658         }
659         // Mark REST subscription request processesd.
660         restSubscription.SetProcessed(nil)
661         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
662         xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
663                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
664
665         c.UpdateCounter(cRestSubNotifToXapp)
666         xapp.Subscription.Notify(resp, *clientEndpoint)
667 }
668
669 //-------------------------------------------------------------------
670 //
671 //-------------------------------------------------------------------
672 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
673
674         c.CntRecvMsg++
675         c.UpdateCounter(cRestSubDelReqFromXapp)
676
677         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
678
679         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
680         if err != nil {
681                 xapp.Logger.Error("%s", err.Error())
682                 if restSubscription == nil {
683                         // Subscription was not found
684                         return common.UnsubscribeNoContentCode
685                 } else {
686                         if restSubscription.SubReqOngoing == true {
687                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
688                                 xapp.Logger.Error("%s", err.Error())
689                                 return common.UnsubscribeBadRequestCode
690                         } else if restSubscription.SubDelReqOngoing == true {
691                                 // Previous request for same restSubId still ongoing
692                                 return common.UnsubscribeBadRequestCode
693                         }
694                 }
695         }
696
697         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
698         go func() {
699                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
700                 for _, instanceId := range restSubscription.InstanceIds {
701                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
702
703                         if err != nil {
704                                 xapp.Logger.Error("%s", err.Error())
705                         }
706                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
707                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
708                         restSubscription.DeleteE2InstanceId(instanceId)
709                 }
710                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
711                 c.registry.DeleteRESTSubscription(&restSubId)
712                 c.RemoveRESTSubscriptionFromDb(restSubId)
713         }()
714
715         c.UpdateCounter(cRestSubDelRespToXapp)
716
717         return common.UnsubscribeNoContentCode
718 }
719
720 //-------------------------------------------------------------------
721 //
722 //-------------------------------------------------------------------
723 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
724
725         var xAppEventInstanceID int64
726         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
727         if err != nil {
728                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
729                         restSubId, instanceId, idstring(err, nil))
730                 return xAppEventInstanceID, nil
731         }
732
733         xAppEventInstanceID = int64(subs.ReqId.Id)
734         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
735         if trans == nil {
736                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
737                 xapp.Logger.Error("%s", err.Error())
738         }
739         defer trans.Release()
740
741         err = c.tracker.Track(trans)
742         if err != nil {
743                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
744                 xapp.Logger.Error("%s", err.Error())
745                 return xAppEventInstanceID, &time.ParseError{}
746         }
747         //
748         // Wake subs delete
749         //
750         go c.handleSubscriptionDelete(subs, trans)
751         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
752
753         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
754
755         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
756
757         return xAppEventInstanceID, nil
758 }
759
760 //-------------------------------------------------------------------
761 //
762 //-------------------------------------------------------------------
763 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
764         xapp.Logger.Debug("RESTQueryHandler() called")
765
766         c.CntRecvMsg++
767
768         return c.registry.QueryHandler()
769 }
770
771 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
772         xapp.Logger.Debug("RESTTestRestHandler() called")
773
774         pathParams := mux.Vars(r)
775         s := pathParams["testId"]
776
777         // This can be used to delete single subscription from db
778         if contains := strings.Contains(s, "deletesubid="); contains == true {
779                 var splits = strings.Split(s, "=")
780                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
781                         xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
782                         c.RemoveSubscriptionFromSdl(uint32(subId))
783                         return
784                 }
785         }
786
787         // This can be used to remove all subscriptions db from
788         if s == "emptydb" {
789                 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
790                 c.RemoveAllSubscriptionsFromSdl()
791                 c.RemoveAllRESTSubscriptionsFromSdl()
792                 return
793         }
794
795         // This is meant to cause submgr's restart in testing
796         if s == "restart" {
797                 xapp.Logger.Debug("os.Exit(1) called")
798                 os.Exit(1)
799         }
800
801         xapp.Logger.Debug("Unsupported rest command received %s", s)
802 }
803
804 //-------------------------------------------------------------------
805 //
806 //-------------------------------------------------------------------
807
808 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
809         params := &xapp.RMRParams{}
810         params.Mtype = trans.GetMtype()
811         params.SubId = int(subs.GetReqId().InstanceId)
812         params.Xid = ""
813         params.Meid = subs.GetMeid()
814         params.Src = ""
815         params.PayloadLen = len(trans.Payload.Buf)
816         params.Payload = trans.Payload.Buf
817         params.Mbuf = nil
818         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
819         err = c.SendWithRetry(params, false, 5)
820         if err != nil {
821                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
822         }
823         return err
824 }
825
826 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
827
828         params := &xapp.RMRParams{}
829         params.Mtype = trans.GetMtype()
830         params.SubId = int(subs.GetReqId().InstanceId)
831         params.Xid = trans.GetXid()
832         params.Meid = trans.GetMeid()
833         params.Src = ""
834         params.PayloadLen = len(trans.Payload.Buf)
835         params.Payload = trans.Payload.Buf
836         params.Mbuf = nil
837         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
838         err = c.SendWithRetry(params, false, 5)
839         if err != nil {
840                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
841         }
842         return err
843 }
844
845 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
846         if c.RMRClient == nil {
847                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
848                 xapp.Logger.Error("%s", err.Error())
849                 return
850         }
851         c.CntRecvMsg++
852
853         defer c.RMRClient.Free(msg.Mbuf)
854
855         // xapp-frame might use direct access to c buffer and
856         // when msg.Mbuf is freed, someone might take it into use
857         // and payload data might be invalid inside message handle function
858         //
859         // subscriptions won't load system a lot so there is no
860         // real performance hit by cloning buffer into new go byte slice
861         cPay := append(msg.Payload[:0:0], msg.Payload...)
862         msg.Payload = cPay
863         msg.PayloadLen = len(cPay)
864
865         switch msg.Mtype {
866         case xapp.RIC_SUB_REQ:
867                 go c.handleXAPPSubscriptionRequest(msg)
868         case xapp.RIC_SUB_RESP:
869                 go c.handleE2TSubscriptionResponse(msg)
870         case xapp.RIC_SUB_FAILURE:
871                 go c.handleE2TSubscriptionFailure(msg)
872         case xapp.RIC_SUB_DEL_REQ:
873                 go c.handleXAPPSubscriptionDeleteRequest(msg)
874         case xapp.RIC_SUB_DEL_RESP:
875                 go c.handleE2TSubscriptionDeleteResponse(msg)
876         case xapp.RIC_SUB_DEL_FAILURE:
877                 go c.handleE2TSubscriptionDeleteFailure(msg)
878         default:
879                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
880         }
881         return
882 }
883
884 //-------------------------------------------------------------------
885 // handle from XAPP Subscription Request
886 //------------------------------------------------------------------
887 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
888         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
889         c.UpdateCounter(cSubReqFromXapp)
890
891         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
892         if err != nil {
893                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
894                 return
895         }
896
897         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
898         if trans == nil {
899                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
900                 return
901         }
902         defer trans.Release()
903
904         if err = c.tracker.Track(trans); err != nil {
905                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
906                 return
907         }
908
909         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
910         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
911         if err != nil {
912                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
913                 return
914         }
915
916         c.wakeSubscriptionRequest(subs, trans)
917 }
918
919 //-------------------------------------------------------------------
920 // Wake Subscription Request to E2node
921 //------------------------------------------------------------------
922 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
923
924         e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
925         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
926         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
927         var err error
928         if event != nil {
929                 switch themsg := event.(type) {
930                 case *e2ap.E2APSubscriptionResponse:
931                         themsg.RequestId.Id = trans.RequestId.Id
932                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
933                         if err == nil {
934                                 trans.Release()
935                                 c.UpdateCounter(cSubRespToXapp)
936                                 c.rmrSendToXapp("", subs, trans)
937                                 return
938                         }
939                 case *e2ap.E2APSubscriptionFailure:
940                         themsg.RequestId.Id = trans.RequestId.Id
941                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
942                         if err == nil {
943                                 c.UpdateCounter(cSubFailToXapp)
944                                 c.rmrSendToXapp("", subs, trans)
945                         }
946                 default:
947                         break
948                 }
949         }
950         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
951         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
952 }
953
954 //-------------------------------------------------------------------
955 // handle from XAPP Subscription Delete Request
956 //------------------------------------------------------------------
957 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
958         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
959         c.UpdateCounter(cSubDelReqFromXapp)
960
961         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
962         if err != nil {
963                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
964                 return
965         }
966
967         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
968         if trans == nil {
969                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
970                 return
971         }
972         defer trans.Release()
973
974         err = c.tracker.Track(trans)
975         if err != nil {
976                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
977                 return
978         }
979
980         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
981         if err != nil {
982                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
983                 return
984         }
985
986         //
987         // Wake subs delete
988         //
989         go c.handleSubscriptionDelete(subs, trans)
990         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
991
992         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
993
994         if subs.NoRespToXapp == true {
995                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
996                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
997                 return
998         }
999
1000         // Whatever is received success, fail or timeout, send successful delete response
1001         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1002         subDelRespMsg.RequestId.Id = trans.RequestId.Id
1003         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1004         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1005         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1006         if err == nil {
1007                 c.UpdateCounter(cSubDelRespToXapp)
1008                 c.rmrSendToXapp("", subs, trans)
1009         }
1010
1011         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1012         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1013 }
1014
1015 //-------------------------------------------------------------------
1016 // SUBS CREATE Handling
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1019
1020         var event interface{} = nil
1021         var removeSubscriptionFromDb bool = false
1022         trans := c.tracker.NewSubsTransaction(subs)
1023         subs.WaitTransactionTurn(trans)
1024         defer subs.ReleaseTransactionTurn(trans)
1025         defer trans.Release()
1026
1027         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1028
1029         subRfMsg, valid := subs.GetCachedResponse()
1030         if subRfMsg == nil && valid == true {
1031                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1032                 switch event.(type) {
1033                 case *e2ap.E2APSubscriptionResponse:
1034                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1035                         subs.SubRespRcvd = true
1036                 case *e2ap.E2APSubscriptionFailure:
1037                         removeSubscriptionFromDb = true
1038                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1039                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1040                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1041                 case *SubmgrRestartTestEvent:
1042                         // This simulates that no response has been received and after restart subscriptions are restored from db
1043                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1044                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1045                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1046                 default:
1047                         if subs.PolicyUpdate == false {
1048                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1049                                 removeSubscriptionFromDb = true
1050                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1051                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1052                         }
1053                 }
1054                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1055         } else {
1056                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1057         }
1058
1059         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1060         if err != nil {
1061                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1062                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1063         }
1064
1065         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1066         if valid == false {
1067                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1068         }
1069
1070         parentTrans.SendEvent(subRfMsg, 0)
1071 }
1072
1073 //-------------------------------------------------------------------
1074 // SUBS DELETE Handling
1075 //-------------------------------------------------------------------
1076
1077 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1078
1079         trans := c.tracker.NewSubsTransaction(subs)
1080         subs.WaitTransactionTurn(trans)
1081         defer subs.ReleaseTransactionTurn(trans)
1082         defer trans.Release()
1083
1084         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1085
1086         subs.mutex.Lock()
1087
1088         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1089                 subs.valid = false
1090                 subs.mutex.Unlock()
1091                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1092         } else {
1093                 subs.mutex.Unlock()
1094         }
1095         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1096         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1097         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1098         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1099         c.registry.UpdateSubscriptionToDb(subs, c)
1100         parentTrans.SendEvent(nil, 0)
1101 }
1102
1103 //-------------------------------------------------------------------
1104 // send to E2T Subscription Request
1105 //-------------------------------------------------------------------
1106 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1107         var err error
1108         var event interface{} = nil
1109         var timedOut bool = false
1110         const ricRequestorId = 123
1111
1112         subReqMsg := subs.SubReqMsg
1113         subReqMsg.RequestId = subs.GetReqId().RequestId
1114         subReqMsg.RequestId.Id = ricRequestorId
1115         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1116         if err != nil {
1117                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1118                 return &PackSubscriptionRequestErrortEvent{
1119                         ErrorInfo{
1120                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1121                                 ErrorCause:  err.Error(),
1122                         },
1123                 }
1124         }
1125
1126         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1127         err = c.WriteSubscriptionToDb(subs)
1128         if err != nil {
1129                 return &SDLWriteErrortEvent{
1130                         ErrorInfo{
1131                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1132                                 ErrorCause:  err.Error(),
1133                         },
1134                 }
1135         }
1136
1137         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1138                 desc := fmt.Sprintf("(retry %d)", retries)
1139                 if retries == 0 {
1140                         c.UpdateCounter(cSubReqToE2)
1141                 } else {
1142                         c.UpdateCounter(cSubReReqToE2)
1143                 }
1144                 c.rmrSendToE2T(desc, subs, trans)
1145                 if subs.DoNotWaitSubResp == false {
1146                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1147                         if timedOut {
1148                                 c.UpdateCounter(cSubReqTimerExpiry)
1149                                 continue
1150                         }
1151                 } else {
1152                         // Simulating case where subscrition request has been sent but response has not been received before restart
1153                         event = &SubmgrRestartTestEvent{}
1154                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1155                 }
1156                 break
1157         }
1158         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1159         return event
1160 }
1161
1162 //-------------------------------------------------------------------
1163 // send to E2T Subscription Delete Request
1164 //-------------------------------------------------------------------
1165
1166 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1167         var err error
1168         var event interface{}
1169         var timedOut bool
1170         const ricRequestorId = 123
1171
1172         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1173         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1174         subDelReqMsg.RequestId.Id = ricRequestorId
1175         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1176         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1177         if err != nil {
1178                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1179                 return event
1180         }
1181
1182         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1183                 desc := fmt.Sprintf("(retry %d)", retries)
1184                 if retries == 0 {
1185                         c.UpdateCounter(cSubDelReqToE2)
1186                 } else {
1187                         c.UpdateCounter(cSubDelReReqToE2)
1188                 }
1189                 c.rmrSendToE2T(desc, subs, trans)
1190                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1191                 if timedOut {
1192                         c.UpdateCounter(cSubDelReqTimerExpiry)
1193                         continue
1194                 }
1195                 break
1196         }
1197         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1198         return event
1199 }
1200
1201 //-------------------------------------------------------------------
1202 // handle from E2T Subscription Response
1203 //-------------------------------------------------------------------
1204 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1205         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1206         c.UpdateCounter(cSubRespFromE2)
1207
1208         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1209         if err != nil {
1210                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1211                 return
1212         }
1213         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1214         if err != nil {
1215                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1216                 return
1217         }
1218         trans := subs.GetTransaction()
1219         if trans == nil {
1220                 err = fmt.Errorf("Ongoing transaction not found")
1221                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1222                 return
1223         }
1224         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1225         if sendOk == false {
1226                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1227                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1228         }
1229         return
1230 }
1231
1232 //-------------------------------------------------------------------
1233 // handle from E2T Subscription Failure
1234 //-------------------------------------------------------------------
1235 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1236         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1237         c.UpdateCounter(cSubFailFromE2)
1238         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1239         if err != nil {
1240                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1241                 return
1242         }
1243         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1244         if err != nil {
1245                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1246                 return
1247         }
1248         trans := subs.GetTransaction()
1249         if trans == nil {
1250                 err = fmt.Errorf("Ongoing transaction not found")
1251                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1252                 return
1253         }
1254         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1255         if sendOk == false {
1256                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1257                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1258         }
1259         return
1260 }
1261
1262 //-------------------------------------------------------------------
1263 // handle from E2T Subscription Delete Response
1264 //-------------------------------------------------------------------
1265 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1266         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1267         c.UpdateCounter(cSubDelRespFromE2)
1268         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1269         if err != nil {
1270                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1271                 return
1272         }
1273         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1274         if err != nil {
1275                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1276                 return
1277         }
1278         trans := subs.GetTransaction()
1279         if trans == nil {
1280                 err = fmt.Errorf("Ongoing transaction not found")
1281                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1282                 return
1283         }
1284         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1285         if sendOk == false {
1286                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1287                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1288         }
1289         return
1290 }
1291
1292 //-------------------------------------------------------------------
1293 // handle from E2T Subscription Delete Failure
1294 //-------------------------------------------------------------------
1295 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1296         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1297         c.UpdateCounter(cSubDelFailFromE2)
1298         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1299         if err != nil {
1300                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1301                 return
1302         }
1303         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1304         if err != nil {
1305                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1306                 return
1307         }
1308         trans := subs.GetTransaction()
1309         if trans == nil {
1310                 err = fmt.Errorf("Ongoing transaction not found")
1311                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1312                 return
1313         }
1314         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1315         if sendOk == false {
1316                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1317                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1318         }
1319         return
1320 }
1321
1322 //-------------------------------------------------------------------
1323 //
1324 //-------------------------------------------------------------------
1325 func typeofSubsMessage(v interface{}) string {
1326         if v == nil {
1327                 return "NIL"
1328         }
1329         switch v.(type) {
1330         //case *e2ap.E2APSubscriptionRequest:
1331         //      return "SubReq"
1332         case *e2ap.E2APSubscriptionResponse:
1333                 return "SubResp"
1334         case *e2ap.E2APSubscriptionFailure:
1335                 return "SubFail"
1336         //case *e2ap.E2APSubscriptionDeleteRequest:
1337         //      return "SubDelReq"
1338         case *e2ap.E2APSubscriptionDeleteResponse:
1339                 return "SubDelResp"
1340         case *e2ap.E2APSubscriptionDeleteFailure:
1341                 return "SubDelFail"
1342         default:
1343                 return "Unknown"
1344         }
1345 }
1346
1347 //-------------------------------------------------------------------
1348 //
1349 //-------------------------------------------------------------------
1350 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1351         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1352         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1353         if err != nil {
1354                 xapp.Logger.Error("%v", err)
1355                 return err
1356         }
1357         return nil
1358 }
1359
1360 //-------------------------------------------------------------------
1361 //
1362 //-------------------------------------------------------------------
1363 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1364
1365         if removeSubscriptionFromDb == true {
1366                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1367                 c.RemoveSubscriptionFromDb(subs)
1368         } else {
1369                 // Update is needed for successful response and merge case here
1370                 if subs.RetryFromXapp == false {
1371                         err := c.WriteSubscriptionToDb(subs)
1372                         return err
1373                 }
1374         }
1375         subs.RetryFromXapp = false
1376         return nil
1377 }
1378
1379 //-------------------------------------------------------------------
1380 //
1381 //-------------------------------------------------------------------
1382 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1383         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1384         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1385         if err != nil {
1386                 xapp.Logger.Error("%v", err)
1387         }
1388 }
1389
1390 //-------------------------------------------------------------------
1391 //
1392 //-------------------------------------------------------------------
1393 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1394         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1395         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1396         if err != nil {
1397                 xapp.Logger.Error("%v", err)
1398         }
1399 }
1400
1401 //-------------------------------------------------------------------
1402 //
1403 //-------------------------------------------------------------------
1404 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1405
1406         if removeRestSubscriptionFromDb == true {
1407                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1408                 c.RemoveRESTSubscriptionFromDb(restSubId)
1409         } else {
1410                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1411         }
1412 }
1413
1414 //-------------------------------------------------------------------
1415 //
1416 //-------------------------------------------------------------------
1417 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1418         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1419         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1420         if err != nil {
1421                 xapp.Logger.Error("%v", err)
1422         }
1423 }
1424
1425 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1426
1427         const ricRequestorId = 123
1428         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1429
1430         // Send delete for every endpoint in the subscription
1431         if subs.PolicyUpdate == false {
1432                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1433                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1434                 subDelReqMsg.RequestId.Id = ricRequestorId
1435                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1436                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1437                 if err != nil {
1438                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1439                         return
1440                 }
1441                 for _, endPoint := range subs.EpList.Endpoints {
1442                         params := &xapp.RMRParams{}
1443                         params.Mtype = mType
1444                         params.SubId = int(subs.GetReqId().InstanceId)
1445                         params.Xid = ""
1446                         params.Meid = subs.Meid
1447                         params.Src = endPoint.String()
1448                         params.PayloadLen = len(payload.Buf)
1449                         params.Payload = payload.Buf
1450                         params.Mbuf = nil
1451                         subs.DeleteFromDb = true
1452                         c.handleXAPPSubscriptionDeleteRequest(params)
1453                 }
1454         }
1455 }
1456
1457 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1458
1459         fmt.Println("CRESTSubscriptionRequest")
1460
1461         if p == nil {
1462                 return
1463         }
1464
1465         if p.SubscriptionID != "" {
1466                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1467         } else {
1468                 fmt.Println("  SubscriptionID = ''")
1469         }
1470
1471         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1472
1473         if p.ClientEndpoint.HTTPPort != nil {
1474                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1475         } else {
1476                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1477         }
1478
1479         if p.ClientEndpoint.RMRPort != nil {
1480                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1481         } else {
1482                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1483         }
1484
1485         if p.Meid != nil {
1486                 fmt.Printf("  Meid = %s\n", *p.Meid)
1487         } else {
1488                 fmt.Println("  Meid = nil")
1489         }
1490
1491         if p.E2SubscriptionDirectives == nil {
1492                 fmt.Println("  E2SubscriptionDirectives = nil")
1493         } else {
1494                 fmt.Println("  E2SubscriptionDirectives")
1495                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1496                         fmt.Println("    E2RetryCount == nil")
1497                 } else {
1498                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1499                 }
1500                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1501                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1502         }
1503         for _, subscriptionDetail := range p.SubscriptionDetails {
1504                 if p.RANFunctionID != nil {
1505                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1506                 } else {
1507                         fmt.Println("  RANFunctionID = nil")
1508                 }
1509                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1510                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1511
1512                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1513                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1514                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1515                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1516
1517                         if actionToBeSetup.SubsequentAction != nil {
1518                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1519                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1520                         } else {
1521                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1522                         }
1523                 }
1524         }
1525 }