Subscription REST interface update
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35         httptransport "github.com/go-openapi/runtime/client"
36         "github.com/go-openapi/strfmt"
37         "github.com/gorilla/mux"
38         "github.com/segmentio/ksuid"
39         "github.com/spf13/viper"
40 )
41
42 //-----------------------------------------------------------------------------
43 //
44 //-----------------------------------------------------------------------------
45
46 func idstring(err error, entries ...fmt.Stringer) string {
47         var retval string = ""
48         var filler string = ""
49         for _, entry := range entries {
50                 if entry != nil {
51                         retval += filler + entry.String()
52                         filler = " "
53                 } else {
54                         retval += filler + "(NIL)"
55                 }
56         }
57         if err != nil {
58                 retval += filler + "err(" + err.Error() + ")"
59                 filler = " "
60         }
61         return retval
62 }
63
64 //-----------------------------------------------------------------------------
65 //
66 //-----------------------------------------------------------------------------
67
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64    // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var restDuplicateCtrl duplicateCtrl
76 var dbRetryForever string
77 var dbTryCount int
78
79 type Control struct {
80         *xapp.RMRClient
81         e2ap          *E2ap
82         registry      *Registry
83         tracker       *Tracker
84         e2SubsDb      Sdlnterface
85         restSubsDb    Sdlnterface
86         CntRecvMsg    uint64
87         ResetTestFlag bool
88         Counters      map[string]xapp.Counter
89         LoggerLevel   int
90 }
91
92 type RMRMeid struct {
93         PlmnID  string
94         EnbID   string
95         RanName string
96 }
97
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
100 type PackSubscriptionRequestErrortEvent struct {
101         ErrorInfo ErrorInfo
102 }
103
104 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
105         p.ErrorInfo = *errorInfo
106 }
107
108 type SDLWriteErrortEvent struct {
109         ErrorInfo ErrorInfo
110 }
111
112 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
113         s.ErrorInfo = *errorInfo
114 }
115
116 func init() {
117         xapp.Logger.Debug("SUBMGR")
118         viper.AutomaticEnv()
119         viper.SetEnvPrefix("submgr")
120         viper.AllowEmptyEnv(true)
121 }
122
123 func NewControl() *Control {
124
125         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
126         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
127
128         registry := new(Registry)
129         registry.Initialize()
130         registry.rtmgrClient = &rtmgrClient
131
132         tracker := new(Tracker)
133         tracker.Init()
134
135         c := &Control{e2ap: new(E2ap),
136                 registry:    registry,
137                 tracker:     tracker,
138                 e2SubsDb:    CreateSdl(),
139                 restSubsDb:  CreateRESTSdl(),
140                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
141                 LoggerLevel: 3,
142         }
143         c.ReadConfigParameters("")
144
145         // Register REST handler for testing support
146         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
147         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
148         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
149
150         go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
151
152         if readSubsFromDb == "false" {
153                 return c
154         }
155
156         restDuplicateCtrl.Init()
157
158         // Read subscriptions from db
159         c.ReadE2Subscriptions()
160         c.ReadRESTSubscriptions()
161         return c
162 }
163
164 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
165         subscriptions, _ := c.registry.QueryHandler()
166         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
167 }
168
169 //-------------------------------------------------------------------
170 //
171 //-------------------------------------------------------------------
172 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
173         xapp.Logger.Debug("GetAllRestSubscriptions() called")
174         response := c.registry.GetAllRestSubscriptions()
175         w.Write(response)
176 }
177
178 //-------------------------------------------------------------------
179 //
180 //-------------------------------------------------------------------
181 func (c *Control) ReadE2Subscriptions() error {
182         var err error
183         var subIds []uint32
184         var register map[uint32]*Subscription
185         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
186                 xapp.Logger.Debug("Reading E2 subscriptions from db")
187                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
188                 if err != nil {
189                         xapp.Logger.Error("%v", err)
190                         <-time.After(1 * time.Second)
191                 } else {
192                         c.registry.subIds = subIds
193                         c.registry.register = register
194                         c.HandleUncompletedSubscriptions(register)
195                         return nil
196                 }
197         }
198         xapp.Logger.Debug("Continuing without retring")
199         return err
200 }
201
202 //-------------------------------------------------------------------
203 //
204 //-------------------------------------------------------------------
205 func (c *Control) ReadRESTSubscriptions() error {
206         var err error
207         var restSubscriptions map[string]*RESTSubscription
208         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
209                 xapp.Logger.Debug("Reading REST subscriptions from db")
210                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
211                 if err != nil {
212                         xapp.Logger.Error("%v", err)
213                         <-time.After(1 * time.Second)
214                 } else {
215                         c.registry.restSubscriptions = restSubscriptions
216                         return nil
217                 }
218         }
219         xapp.Logger.Debug("Continuing without retring")
220         return err
221 }
222
223 //-------------------------------------------------------------------
224 //
225 //-------------------------------------------------------------------
226 func (c *Control) ReadConfigParameters(f string) {
227
228         c.LoggerLevel = int(xapp.Logger.GetLevel())
229         xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
230
231         // viper.GetDuration returns nanoseconds
232         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233         if e2tSubReqTimeout == 0 {
234                 e2tSubReqTimeout = 2000 * 1000000
235         }
236         xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
237
238         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
239         if e2tSubDelReqTime == 0 {
240                 e2tSubDelReqTime = 2000 * 1000000
241         }
242         xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
243         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
244         if e2tRecvMsgTimeout == 0 {
245                 e2tRecvMsgTimeout = 2000 * 1000000
246         }
247         xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
248
249         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
250         if e2tMaxSubReqTryCount == 0 {
251                 e2tMaxSubReqTryCount = 1
252         }
253         xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
254
255         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
256         if e2tMaxSubDelReqTryCount == 0 {
257                 e2tMaxSubDelReqTryCount = 1
258         }
259         xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
260
261         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
262         if readSubsFromDb == "" {
263                 readSubsFromDb = "true"
264         }
265         xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
266
267         dbTryCount = viper.GetInt("controls.dbTryCount")
268         if dbTryCount == 0 {
269                 dbTryCount = 200
270         }
271         xapp.Logger.Debug("dbTryCount %v", dbTryCount)
272
273         dbRetryForever = viper.GetString("controls.dbRetryForever")
274         if dbRetryForever == "" {
275                 dbRetryForever = "true"
276         }
277         xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
278
279         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
280         // value 100ms used currently only in unittests.
281         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
282         if waitRouteCleanup_ms == 0 {
283                 waitRouteCleanup_ms = 5000 * 1000000
284         }
285         xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
286 }
287
288 //-------------------------------------------------------------------
289 //
290 //-------------------------------------------------------------------
291 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
292
293         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
294         for subId, subs := range register {
295                 if subs.SubRespRcvd == false {
296                         // If policy subscription has already been made successfully unsuccessful update should not be deleted.
297                         if subs.PolicyUpdate == false {
298                                 subs.NoRespToXapp = true
299                                 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
300                                 c.SendSubscriptionDeleteReq(subs)
301                         }
302                 }
303         }
304 }
305
306 func (c *Control) ReadyCB(data interface{}) {
307         if c.RMRClient == nil {
308                 c.RMRClient = xapp.Rmr
309         }
310 }
311
312 func (c *Control) Run() {
313         xapp.SetReadyCB(c.ReadyCB, nil)
314         xapp.AddConfigChangeListener(c.ReadConfigParameters)
315         xapp.Run(c)
316 }
317
318 //-------------------------------------------------------------------
319 //
320 //-------------------------------------------------------------------
321 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
322
323         var restSubId string
324         var restSubscription *RESTSubscription
325         var err error
326
327         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
328         if p.SubscriptionID == "" {
329                 // Subscription does not contain REST subscription Id
330                 if exists {
331                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
332                         if restSubscription != nil {
333                                 // Subscription not found
334                                 restSubId = prevRestSubsId
335                                 if err == nil {
336                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
337                                 } else {
338                                         xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
339                                 }
340                         } else {
341                                 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
343                         }
344                 }
345
346                 if restSubscription == nil {
347                         restSubId = ksuid.New().String()
348                         restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
349                 }
350         } else {
351                 // Subscription contains REST subscription Id
352                 restSubId = p.SubscriptionID
353
354                 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
355                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
356                 if err != nil {
357                         // Subscription with id in REST request does not exist
358                         xapp.Logger.Error("%s", err.Error())
359                         c.UpdateCounter(cRestSubFailToXapp)
360                         return nil, "", err
361                 }
362
363                 if !exists {
364                         xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
365                 } else {
366                         xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
367                 }
368         }
369
370         return restSubscription, restSubId, nil
371 }
372
373 //-------------------------------------------------------------------
374 //
375 //-------------------------------------------------------------------
376 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
377
378         c.CntRecvMsg++
379         c.UpdateCounter(cRestSubReqFromXapp)
380
381         subResp := models.SubscriptionResponse{}
382         p := params.(*models.SubscriptionParams)
383
384         if c.LoggerLevel > 2 {
385                 c.PrintRESTSubscriptionRequest(p)
386         }
387
388         if p.ClientEndpoint == nil {
389                 err := fmt.Errorf("ClientEndpoint == nil")
390                 xapp.Logger.Error("%v", err)
391                 c.UpdateCounter(cRestSubFailToXapp)
392                 return nil, common.SubscribeBadRequestCode
393         }
394
395         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
396         if err != nil {
397                 xapp.Logger.Error("%s", err.Error())
398                 c.UpdateCounter(cRestSubFailToXapp)
399                 return nil, common.SubscribeBadRequestCode
400         }
401
402         md5sum, err := CalculateRequestMd5sum(params)
403         if err != nil {
404                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
405         }
406
407         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
408         if err != nil {
409                 xapp.Logger.Error("Subscription with id in REST request does not exist")
410                 return nil, common.SubscribeNotFoundCode
411         }
412
413         subResp.SubscriptionID = &restSubId
414         subReqList := e2ap.SubscriptionRequestList{}
415         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
416         if err != nil {
417                 xapp.Logger.Error("%s", err.Error())
418                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
419                 c.registry.DeleteRESTSubscription(&restSubId)
420                 c.UpdateCounter(cRestSubFailToXapp)
421                 return nil, common.SubscribeBadRequestCode
422         }
423
424         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
425         if duplicate {
426                 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
427                 xapp.Logger.Debug("%s", err)
428                 c.UpdateCounter(cRestSubRespToXapp)
429                 return &subResp, common.SubscribeCreatedCode
430         }
431
432         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
433         e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
434         if err != nil {
435                 xapp.Logger.Error("%s", err)
436                 return nil, common.SubscribeBadRequestCode
437         }
438         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
439
440         c.UpdateCounter(cRestSubRespToXapp)
441         return &subResp, common.SubscribeCreatedCode
442 }
443
444 //-------------------------------------------------------------------
445 //
446 //-------------------------------------------------------------------
447 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
448
449         e2SubscriptionDirectives := &E2SubscriptionDirectives{}
450         if p == nil || p.E2SubscriptionDirectives == nil {
451                 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
452                 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
453                 e2SubscriptionDirectives.CreateRMRRoute = true
454                 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
455         } else {
456                 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
457                         e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
458                 } else {
459                         return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
460                 }
461                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
462                         xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
463                         e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
464                 } else {
465                         if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
466                                 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
467                         } else {
468                                 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
469                         }
470                 }
471                 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
472         }
473         xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
474         xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
475         xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
476         return e2SubscriptionDirectives, nil
477 }
478
479 //-------------------------------------------------------------------
480 //
481 //-------------------------------------------------------------------
482
483 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
484         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
485
486         xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
487
488         var xAppEventInstanceID int64
489         var e2EventInstanceID int64
490         errorInfo := &ErrorInfo{}
491
492         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
493
494         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
495                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
496                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
497
498                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
499                 if trans == nil {
500                         // Send notification to xApp that prosessing of a Subscription Request has failed.
501                         err := fmt.Errorf("Tracking failure")
502                         errorInfo.ErrorCause = err.Error()
503                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
504                         continue
505                 }
506
507                 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
508
509                 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
510
511                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
512
513                 if err != nil {
514                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
515                 } else {
516                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
517                         restSubscription.AddMd5Sum(md5sum)
518                         xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
519                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
520                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
521                 }
522                 trans.Release()
523         }
524 }
525
526 //-------------------------------------------------------------------
527 //
528 //------------------------------------------------------------------
529 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
530         restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
531
532         errorInfo := ErrorInfo{}
533
534         err := c.tracker.Track(trans)
535         if err != nil {
536                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
537                 errorInfo.ErrorCause = err.Error()
538                 err = fmt.Errorf("Tracking failure")
539                 return nil, &errorInfo, err
540         }
541
542         subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
543         if err != nil {
544                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
545                 return nil, &errorInfo, err
546         }
547
548         //
549         // Wake subs request
550         //
551         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
552         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
553
554         err = nil
555         if event != nil {
556                 switch themsg := event.(type) {
557                 case *e2ap.E2APSubscriptionResponse:
558                         trans.Release()
559                         return themsg, &errorInfo, nil
560                 case *e2ap.E2APSubscriptionFailure:
561                         err = fmt.Errorf("E2 SubscriptionFailure received")
562                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
563                         return nil, &errorInfo, err
564                 case *PackSubscriptionRequestErrortEvent:
565                         err = fmt.Errorf("E2 SubscriptionRequest pack failure")
566                         return nil, &themsg.ErrorInfo, err
567                 case *SDLWriteErrortEvent:
568                         err = fmt.Errorf("SDL write failure")
569                         return nil, &themsg.ErrorInfo, err
570                 default:
571                         err = fmt.Errorf("Unexpected E2 subscription response received")
572                         errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
573                         break
574                 }
575         } else {
576                 err = fmt.Errorf("E2 subscription response timeout")
577                 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
578                 if subs.PolicyUpdate == true {
579                         return nil, &errorInfo, err
580                 }
581         }
582
583         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
584         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
585         return nil, &errorInfo, err
586 }
587
588 //-------------------------------------------------------------------
589 //
590 //-------------------------------------------------------------------
591 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
592         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
593
594         // Send notification to xApp that prosessing of a Subscription Request has failed.
595         e2EventInstanceID := (int64)(0)
596         if errorInfo.ErrorSource == "" {
597                 // Submgr is default source of error
598                 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
599         }
600         resp := &models.SubscriptionResponse{
601                 SubscriptionID: restSubId,
602                 SubscriptionInstances: []*models.SubscriptionInstance{
603                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
604                                 ErrorCause:          errorInfo.ErrorCause,
605                                 ErrorSource:         errorInfo.ErrorSource,
606                                 TimeoutType:         errorInfo.TimeoutType,
607                                 XappEventInstanceID: &xAppEventInstanceID},
608                 },
609         }
610         // Mark REST subscription request processed.
611         restSubscription.SetProcessed(err)
612         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
613         if trans != nil {
614                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
615                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
616         } else {
617                 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
618                         errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
619         }
620
621         c.UpdateCounter(cRestSubFailNotifToXapp)
622         xapp.Subscription.Notify(resp, *clientEndpoint)
623 }
624
625 //-------------------------------------------------------------------
626 //
627 //-------------------------------------------------------------------
628 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
629         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
630
631         // Store successfully processed InstanceId for deletion
632         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
633         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
634
635         // Send notification to xApp that a Subscription Request has been processed.
636         resp := &models.SubscriptionResponse{
637                 SubscriptionID: restSubId,
638                 SubscriptionInstances: []*models.SubscriptionInstance{
639                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
640                                 ErrorCause:          "",
641                                 XappEventInstanceID: &xAppEventInstanceID},
642                 },
643         }
644         // Mark REST subscription request processesd.
645         restSubscription.SetProcessed(nil)
646         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
647         xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
648                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
649
650         c.UpdateCounter(cRestSubNotifToXapp)
651         xapp.Subscription.Notify(resp, *clientEndpoint)
652 }
653
654 //-------------------------------------------------------------------
655 //
656 //-------------------------------------------------------------------
657 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
658
659         c.CntRecvMsg++
660         c.UpdateCounter(cRestSubDelReqFromXapp)
661
662         xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
663
664         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
665         if err != nil {
666                 xapp.Logger.Error("%s", err.Error())
667                 if restSubscription == nil {
668                         // Subscription was not found
669                         return common.UnsubscribeNoContentCode
670                 } else {
671                         if restSubscription.SubReqOngoing == true {
672                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
673                                 xapp.Logger.Error("%s", err.Error())
674                                 return common.UnsubscribeBadRequestCode
675                         } else if restSubscription.SubDelReqOngoing == true {
676                                 // Previous request for same restSubId still ongoing
677                                 return common.UnsubscribeBadRequestCode
678                         }
679                 }
680         }
681
682         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
683         go func() {
684                 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
685                 for _, instanceId := range restSubscription.InstanceIds {
686                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
687
688                         if err != nil {
689                                 xapp.Logger.Error("%s", err.Error())
690                         }
691                         xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
692                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
693                         restSubscription.DeleteE2InstanceId(instanceId)
694                 }
695                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
696                 c.registry.DeleteRESTSubscription(&restSubId)
697                 c.RemoveRESTSubscriptionFromDb(restSubId)
698         }()
699
700         c.UpdateCounter(cRestSubDelRespToXapp)
701
702         return common.UnsubscribeNoContentCode
703 }
704
705 //-------------------------------------------------------------------
706 //
707 //-------------------------------------------------------------------
708 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
709
710         var xAppEventInstanceID int64
711         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
712         if err != nil {
713                 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
714                         restSubId, instanceId, idstring(err, nil))
715                 return xAppEventInstanceID, nil
716         }
717
718         xAppEventInstanceID = int64(subs.ReqId.Id)
719         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
720         if trans == nil {
721                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
722                 xapp.Logger.Error("%s", err.Error())
723         }
724         defer trans.Release()
725
726         err = c.tracker.Track(trans)
727         if err != nil {
728                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
729                 xapp.Logger.Error("%s", err.Error())
730                 return xAppEventInstanceID, &time.ParseError{}
731         }
732         //
733         // Wake subs delete
734         //
735         go c.handleSubscriptionDelete(subs, trans)
736         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
737
738         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
739
740         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
741
742         return xAppEventInstanceID, nil
743 }
744
745 //-------------------------------------------------------------------
746 //
747 //-------------------------------------------------------------------
748 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
749         xapp.Logger.Debug("RESTQueryHandler() called")
750
751         c.CntRecvMsg++
752
753         return c.registry.QueryHandler()
754 }
755
756 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
757         xapp.Logger.Debug("RESTTestRestHandler() called")
758
759         pathParams := mux.Vars(r)
760         s := pathParams["testId"]
761
762         // This can be used to delete single subscription from db
763         if contains := strings.Contains(s, "deletesubid="); contains == true {
764                 var splits = strings.Split(s, "=")
765                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
766                         xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
767                         c.RemoveSubscriptionFromSdl(uint32(subId))
768                         return
769                 }
770         }
771
772         // This can be used to remove all subscriptions db from
773         if s == "emptydb" {
774                 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
775                 c.RemoveAllSubscriptionsFromSdl()
776                 c.RemoveAllRESTSubscriptionsFromSdl()
777                 return
778         }
779
780         // This is meant to cause submgr's restart in testing
781         if s == "restart" {
782                 xapp.Logger.Debug("os.Exit(1) called")
783                 os.Exit(1)
784         }
785
786         xapp.Logger.Debug("Unsupported rest command received %s", s)
787 }
788
789 //-------------------------------------------------------------------
790 //
791 //-------------------------------------------------------------------
792
793 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
794         params := &xapp.RMRParams{}
795         params.Mtype = trans.GetMtype()
796         params.SubId = int(subs.GetReqId().InstanceId)
797         params.Xid = ""
798         params.Meid = subs.GetMeid()
799         params.Src = ""
800         params.PayloadLen = len(trans.Payload.Buf)
801         params.Payload = trans.Payload.Buf
802         params.Mbuf = nil
803         xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
804         err = c.SendWithRetry(params, false, 5)
805         if err != nil {
806                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
807         }
808         return err
809 }
810
811 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
812
813         params := &xapp.RMRParams{}
814         params.Mtype = trans.GetMtype()
815         params.SubId = int(subs.GetReqId().InstanceId)
816         params.Xid = trans.GetXid()
817         params.Meid = trans.GetMeid()
818         params.Src = ""
819         params.PayloadLen = len(trans.Payload.Buf)
820         params.Payload = trans.Payload.Buf
821         params.Mbuf = nil
822         xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
823         err = c.SendWithRetry(params, false, 5)
824         if err != nil {
825                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
826         }
827         return err
828 }
829
830 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
831         if c.RMRClient == nil {
832                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
833                 xapp.Logger.Error("%s", err.Error())
834                 return
835         }
836         c.CntRecvMsg++
837
838         defer c.RMRClient.Free(msg.Mbuf)
839
840         // xapp-frame might use direct access to c buffer and
841         // when msg.Mbuf is freed, someone might take it into use
842         // and payload data might be invalid inside message handle function
843         //
844         // subscriptions won't load system a lot so there is no
845         // real performance hit by cloning buffer into new go byte slice
846         cPay := append(msg.Payload[:0:0], msg.Payload...)
847         msg.Payload = cPay
848         msg.PayloadLen = len(cPay)
849
850         switch msg.Mtype {
851         case xapp.RIC_SUB_REQ:
852                 go c.handleXAPPSubscriptionRequest(msg)
853         case xapp.RIC_SUB_RESP:
854                 go c.handleE2TSubscriptionResponse(msg)
855         case xapp.RIC_SUB_FAILURE:
856                 go c.handleE2TSubscriptionFailure(msg)
857         case xapp.RIC_SUB_DEL_REQ:
858                 go c.handleXAPPSubscriptionDeleteRequest(msg)
859         case xapp.RIC_SUB_DEL_RESP:
860                 go c.handleE2TSubscriptionDeleteResponse(msg)
861         case xapp.RIC_SUB_DEL_FAILURE:
862                 go c.handleE2TSubscriptionDeleteFailure(msg)
863         default:
864                 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
865         }
866         return
867 }
868
869 //-------------------------------------------------------------------
870 // handle from XAPP Subscription Request
871 //------------------------------------------------------------------
872 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
873         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
874         c.UpdateCounter(cSubReqFromXapp)
875
876         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
877         if err != nil {
878                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
879                 return
880         }
881
882         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
883         if trans == nil {
884                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
885                 return
886         }
887         defer trans.Release()
888
889         if err = c.tracker.Track(trans); err != nil {
890                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
891                 return
892         }
893
894         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
895         subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
896         if err != nil {
897                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
898                 return
899         }
900
901         c.wakeSubscriptionRequest(subs, trans)
902 }
903
904 //-------------------------------------------------------------------
905 // Wake Subscription Request to E2node
906 //------------------------------------------------------------------
907 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
908
909         e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
910         go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
911         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
912         var err error
913         if event != nil {
914                 switch themsg := event.(type) {
915                 case *e2ap.E2APSubscriptionResponse:
916                         themsg.RequestId.Id = trans.RequestId.Id
917                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
918                         if err == nil {
919                                 trans.Release()
920                                 c.UpdateCounter(cSubRespToXapp)
921                                 c.rmrSendToXapp("", subs, trans)
922                                 return
923                         }
924                 case *e2ap.E2APSubscriptionFailure:
925                         themsg.RequestId.Id = trans.RequestId.Id
926                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
927                         if err == nil {
928                                 c.UpdateCounter(cSubFailToXapp)
929                                 c.rmrSendToXapp("", subs, trans)
930                         }
931                 default:
932                         break
933                 }
934         }
935         xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
936         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
937 }
938
939 //-------------------------------------------------------------------
940 // handle from XAPP Subscription Delete Request
941 //------------------------------------------------------------------
942 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
943         xapp.Logger.Debug("MSG from XAPP: %s", params.String())
944         c.UpdateCounter(cSubDelReqFromXapp)
945
946         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
947         if err != nil {
948                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
949                 return
950         }
951
952         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
953         if trans == nil {
954                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
955                 return
956         }
957         defer trans.Release()
958
959         err = c.tracker.Track(trans)
960         if err != nil {
961                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
962                 return
963         }
964
965         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
966         if err != nil {
967                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
968                 return
969         }
970
971         //
972         // Wake subs delete
973         //
974         go c.handleSubscriptionDelete(subs, trans)
975         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
976
977         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
978
979         if subs.NoRespToXapp == true {
980                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
981                 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
982                 return
983         }
984
985         // Whatever is received success, fail or timeout, send successful delete response
986         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
987         subDelRespMsg.RequestId.Id = trans.RequestId.Id
988         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
989         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
990         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
991         if err == nil {
992                 c.UpdateCounter(cSubDelRespToXapp)
993                 c.rmrSendToXapp("", subs, trans)
994         }
995
996         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
997         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
998 }
999
1000 //-------------------------------------------------------------------
1001 // SUBS CREATE Handling
1002 //-------------------------------------------------------------------
1003 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1004
1005         var event interface{} = nil
1006         var removeSubscriptionFromDb bool = false
1007         trans := c.tracker.NewSubsTransaction(subs)
1008         subs.WaitTransactionTurn(trans)
1009         defer subs.ReleaseTransactionTurn(trans)
1010         defer trans.Release()
1011
1012         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1013
1014         subRfMsg, valid := subs.GetCachedResponse()
1015         if subRfMsg == nil && valid == true {
1016                 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1017                 switch event.(type) {
1018                 case *e2ap.E2APSubscriptionResponse:
1019                         subRfMsg, valid = subs.SetCachedResponse(event, true)
1020                         subs.SubRespRcvd = true
1021                 case *e2ap.E2APSubscriptionFailure:
1022                         removeSubscriptionFromDb = true
1023                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1024                         xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1025                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1026                 case *SubmgrRestartTestEvent:
1027                         // This simulates that no response has been received and after restart subscriptions are restored from db
1028                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1029                 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1030                         subRfMsg, valid = subs.SetCachedResponse(event, false)
1031                 default:
1032                         if subs.PolicyUpdate == false {
1033                                 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1034                                 removeSubscriptionFromDb = true
1035                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1036                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1037                         }
1038                 }
1039                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1040         } else {
1041                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1042         }
1043
1044         err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1045         if err != nil {
1046                 subRfMsg, valid = subs.SetCachedResponse(event, false)
1047                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1048         }
1049
1050         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1051         if valid == false {
1052                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1053         }
1054
1055         parentTrans.SendEvent(subRfMsg, 0)
1056 }
1057
1058 //-------------------------------------------------------------------
1059 // SUBS DELETE Handling
1060 //-------------------------------------------------------------------
1061
1062 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1063
1064         trans := c.tracker.NewSubsTransaction(subs)
1065         subs.WaitTransactionTurn(trans)
1066         defer subs.ReleaseTransactionTurn(trans)
1067         defer trans.Release()
1068
1069         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1070
1071         subs.mutex.Lock()
1072
1073         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1074                 subs.valid = false
1075                 subs.mutex.Unlock()
1076                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1077         } else {
1078                 subs.mutex.Unlock()
1079         }
1080         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1081         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1082         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1083         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1084         c.registry.UpdateSubscriptionToDb(subs, c)
1085         parentTrans.SendEvent(nil, 0)
1086 }
1087
1088 //-------------------------------------------------------------------
1089 // send to E2T Subscription Request
1090 //-------------------------------------------------------------------
1091 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1092         var err error
1093         var event interface{} = nil
1094         var timedOut bool = false
1095         const ricRequestorId = 123
1096
1097         subReqMsg := subs.SubReqMsg
1098         subReqMsg.RequestId = subs.GetReqId().RequestId
1099         subReqMsg.RequestId.Id = ricRequestorId
1100         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1101         if err != nil {
1102                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1103                 return &PackSubscriptionRequestErrortEvent{
1104                         ErrorInfo{
1105                                 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1106                                 ErrorCause:  err.Error(),
1107                         },
1108                 }
1109         }
1110
1111         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1112         err = c.WriteSubscriptionToDb(subs)
1113         if err != nil {
1114                 return &SDLWriteErrortEvent{
1115                         ErrorInfo{
1116                                 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1117                                 ErrorCause:  err.Error(),
1118                         },
1119                 }
1120         }
1121
1122         for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1123                 desc := fmt.Sprintf("(retry %d)", retries)
1124                 if retries == 0 {
1125                         c.UpdateCounter(cSubReqToE2)
1126                 } else {
1127                         c.UpdateCounter(cSubReReqToE2)
1128                 }
1129                 c.rmrSendToE2T(desc, subs, trans)
1130                 if subs.DoNotWaitSubResp == false {
1131                         event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1132                         if timedOut {
1133                                 c.UpdateCounter(cSubReqTimerExpiry)
1134                                 continue
1135                         }
1136                 } else {
1137                         // Simulating case where subscrition request has been sent but response has not been received before restart
1138                         event = &SubmgrRestartTestEvent{}
1139                         xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1140                 }
1141                 break
1142         }
1143         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1144         return event
1145 }
1146
1147 //-------------------------------------------------------------------
1148 // send to E2T Subscription Delete Request
1149 //-------------------------------------------------------------------
1150
1151 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1152         var err error
1153         var event interface{}
1154         var timedOut bool
1155         const ricRequestorId = 123
1156
1157         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1158         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1159         subDelReqMsg.RequestId.Id = ricRequestorId
1160         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1161         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1162         if err != nil {
1163                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1164                 return event
1165         }
1166
1167         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1168                 desc := fmt.Sprintf("(retry %d)", retries)
1169                 if retries == 0 {
1170                         c.UpdateCounter(cSubDelReqToE2)
1171                 } else {
1172                         c.UpdateCounter(cSubDelReReqToE2)
1173                 }
1174                 c.rmrSendToE2T(desc, subs, trans)
1175                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1176                 if timedOut {
1177                         c.UpdateCounter(cSubDelReqTimerExpiry)
1178                         continue
1179                 }
1180                 break
1181         }
1182         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1183         return event
1184 }
1185
1186 //-------------------------------------------------------------------
1187 // handle from E2T Subscription Response
1188 //-------------------------------------------------------------------
1189 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1190         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1191         c.UpdateCounter(cSubRespFromE2)
1192
1193         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1194         if err != nil {
1195                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1196                 return
1197         }
1198         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1199         if err != nil {
1200                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1201                 return
1202         }
1203         trans := subs.GetTransaction()
1204         if trans == nil {
1205                 err = fmt.Errorf("Ongoing transaction not found")
1206                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1207                 return
1208         }
1209         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1210         if sendOk == false {
1211                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1212                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1213         }
1214         return
1215 }
1216
1217 //-------------------------------------------------------------------
1218 // handle from E2T Subscription Failure
1219 //-------------------------------------------------------------------
1220 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1221         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1222         c.UpdateCounter(cSubFailFromE2)
1223         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1224         if err != nil {
1225                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1226                 return
1227         }
1228         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1229         if err != nil {
1230                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1231                 return
1232         }
1233         trans := subs.GetTransaction()
1234         if trans == nil {
1235                 err = fmt.Errorf("Ongoing transaction not found")
1236                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1237                 return
1238         }
1239         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1240         if sendOk == false {
1241                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1242                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1243         }
1244         return
1245 }
1246
1247 //-------------------------------------------------------------------
1248 // handle from E2T Subscription Delete Response
1249 //-------------------------------------------------------------------
1250 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1251         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1252         c.UpdateCounter(cSubDelRespFromE2)
1253         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1254         if err != nil {
1255                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1256                 return
1257         }
1258         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1259         if err != nil {
1260                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1261                 return
1262         }
1263         trans := subs.GetTransaction()
1264         if trans == nil {
1265                 err = fmt.Errorf("Ongoing transaction not found")
1266                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1267                 return
1268         }
1269         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1270         if sendOk == false {
1271                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1272                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1273         }
1274         return
1275 }
1276
1277 //-------------------------------------------------------------------
1278 // handle from E2T Subscription Delete Failure
1279 //-------------------------------------------------------------------
1280 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1281         xapp.Logger.Debug("MSG from E2T: %s", params.String())
1282         c.UpdateCounter(cSubDelFailFromE2)
1283         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1284         if err != nil {
1285                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1286                 return
1287         }
1288         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1289         if err != nil {
1290                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1291                 return
1292         }
1293         trans := subs.GetTransaction()
1294         if trans == nil {
1295                 err = fmt.Errorf("Ongoing transaction not found")
1296                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1297                 return
1298         }
1299         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1300         if sendOk == false {
1301                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1302                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1303         }
1304         return
1305 }
1306
1307 //-------------------------------------------------------------------
1308 //
1309 //-------------------------------------------------------------------
1310 func typeofSubsMessage(v interface{}) string {
1311         if v == nil {
1312                 return "NIL"
1313         }
1314         switch v.(type) {
1315         //case *e2ap.E2APSubscriptionRequest:
1316         //      return "SubReq"
1317         case *e2ap.E2APSubscriptionResponse:
1318                 return "SubResp"
1319         case *e2ap.E2APSubscriptionFailure:
1320                 return "SubFail"
1321         //case *e2ap.E2APSubscriptionDeleteRequest:
1322         //      return "SubDelReq"
1323         case *e2ap.E2APSubscriptionDeleteResponse:
1324                 return "SubDelResp"
1325         case *e2ap.E2APSubscriptionDeleteFailure:
1326                 return "SubDelFail"
1327         default:
1328                 return "Unknown"
1329         }
1330 }
1331
1332 //-------------------------------------------------------------------
1333 //
1334 //-------------------------------------------------------------------
1335 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1336         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1337         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1338         if err != nil {
1339                 xapp.Logger.Error("%v", err)
1340                 return err
1341         }
1342         return nil
1343 }
1344
1345 //-------------------------------------------------------------------
1346 //
1347 //-------------------------------------------------------------------
1348 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1349
1350         if removeSubscriptionFromDb == true {
1351                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1352                 c.RemoveSubscriptionFromDb(subs)
1353         } else {
1354                 // Update is needed for successful response and merge case here
1355                 if subs.RetryFromXapp == false {
1356                         err := c.WriteSubscriptionToDb(subs)
1357                         return err
1358                 }
1359         }
1360         subs.RetryFromXapp = false
1361         return nil
1362 }
1363
1364 //-------------------------------------------------------------------
1365 //
1366 //-------------------------------------------------------------------
1367 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1368         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1369         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1370         if err != nil {
1371                 xapp.Logger.Error("%v", err)
1372         }
1373 }
1374
1375 //-------------------------------------------------------------------
1376 //
1377 //-------------------------------------------------------------------
1378 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1379         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1380         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1381         if err != nil {
1382                 xapp.Logger.Error("%v", err)
1383         }
1384 }
1385
1386 //-------------------------------------------------------------------
1387 //
1388 //-------------------------------------------------------------------
1389 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1390
1391         if removeRestSubscriptionFromDb == true {
1392                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1393                 c.RemoveRESTSubscriptionFromDb(restSubId)
1394         } else {
1395                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1396         }
1397 }
1398
1399 //-------------------------------------------------------------------
1400 //
1401 //-------------------------------------------------------------------
1402 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1403         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1404         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1405         if err != nil {
1406                 xapp.Logger.Error("%v", err)
1407         }
1408 }
1409
1410 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1411
1412         const ricRequestorId = 123
1413         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1414
1415         // Send delete for every endpoint in the subscription
1416         if subs.PolicyUpdate == false {
1417                 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1418                 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1419                 subDelReqMsg.RequestId.Id = ricRequestorId
1420                 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1421                 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1422                 if err != nil {
1423                         xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1424                         return
1425                 }
1426                 for _, endPoint := range subs.EpList.Endpoints {
1427                         params := &xapp.RMRParams{}
1428                         params.Mtype = mType
1429                         params.SubId = int(subs.GetReqId().InstanceId)
1430                         params.Xid = ""
1431                         params.Meid = subs.Meid
1432                         params.Src = endPoint.String()
1433                         params.PayloadLen = len(payload.Buf)
1434                         params.Payload = payload.Buf
1435                         params.Mbuf = nil
1436                         subs.DeleteFromDb = true
1437                         c.handleXAPPSubscriptionDeleteRequest(params)
1438                 }
1439         }
1440 }
1441
1442 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1443
1444         fmt.Println("CRESTSubscriptionRequest")
1445
1446         if p == nil {
1447                 return
1448         }
1449
1450         if p.SubscriptionID != "" {
1451                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1452         } else {
1453                 fmt.Println("  SubscriptionID = ''")
1454         }
1455
1456         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1457
1458         if p.ClientEndpoint.HTTPPort != nil {
1459                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1460         } else {
1461                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1462         }
1463
1464         if p.ClientEndpoint.RMRPort != nil {
1465                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1466         } else {
1467                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1468         }
1469
1470         if p.Meid != nil {
1471                 fmt.Printf("  Meid = %s\n", *p.Meid)
1472         } else {
1473                 fmt.Println("  Meid = nil")
1474         }
1475
1476         if p.E2SubscriptionDirectives == nil {
1477                 fmt.Println("  E2SubscriptionDirectives = nil")
1478         } else {
1479                 fmt.Println("  E2SubscriptionDirectives")
1480                 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1481                         fmt.Println("    E2RetryCount == nil")
1482                 } else {
1483                         fmt.Printf("    E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1484                 }
1485                 fmt.Printf("    E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1486                 fmt.Printf("    RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1487         }
1488         for _, subscriptionDetail := range p.SubscriptionDetails {
1489                 if p.RANFunctionID != nil {
1490                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1491                 } else {
1492                         fmt.Println("  RANFunctionID = nil")
1493                 }
1494                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1495                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1496
1497                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1498                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1499                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1500                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1501
1502                         if actionToBeSetup.SubsequentAction != nil {
1503                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1504                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1505                         } else {
1506                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1507                         }
1508                 }
1509         }
1510 }