Restore SDL write at the beginning of subscription create
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
76 var dbTryCount int
77
78 type Control struct {
79         *xapp.RMRClient
80         e2ap          *E2ap
81         registry      *Registry
82         tracker       *Tracker
83         e2SubsDb      Sdlnterface
84         restSubsDb    Sdlnterface
85         CntRecvMsg    uint64
86         ResetTestFlag bool
87         Counters      map[string]xapp.Counter
88         LoggerLevel   uint32
89 }
90
91 type RMRMeid struct {
92         PlmnID  string
93         EnbID   string
94         RanName string
95 }
96
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
99
100 func init() {
101         xapp.Logger.Info("SUBMGR")
102         viper.AutomaticEnv()
103         viper.SetEnvPrefix("submgr")
104         viper.AllowEmptyEnv(true)
105 }
106
107 func NewControl() *Control {
108
109         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
111
112         registry := new(Registry)
113         registry.Initialize()
114         registry.rtmgrClient = &rtmgrClient
115
116         tracker := new(Tracker)
117         tracker.Init()
118
119         c := &Control{e2ap: new(E2ap),
120                 registry:    registry,
121                 tracker:     tracker,
122                 e2SubsDb:    CreateSdl(),
123                 restSubsDb:  CreateRESTSdl(),
124                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
125                 LoggerLevel: 3,
126         }
127         c.ReadConfigParameters("")
128
129         // Register REST handler for testing support
130         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
133
134         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
135
136         if readSubsFromDb == "false" {
137                 return c
138         }
139
140         restDuplicateCtrl.Init()
141
142         // Read subscriptions from db
143         c.ReadE2Subscriptions()
144         c.ReadRESTSubscriptions()
145
146         /*
147                 xapp.Logger.Info("Reading subscriptions from db")
148                 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
149                 if err != nil {
150                         xapp.Logger.Error("%v", err)
151                 } else {
152                         c.registry.subIds = subIds
153                         c.registry.register = register
154                         c.HandleUncompletedSubscriptions(register)
155                 }
156
157                 restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
158                 if err != nil {
159                         xapp.Logger.Error("%v", err)
160                 } else {
161                         c.registry.restSubscriptions = restSubscriptions
162                 }
163         */
164         return c
165 }
166
167 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
168         subscriptions, _ := c.registry.QueryHandler()
169         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
170 }
171
172 //-------------------------------------------------------------------
173 //
174 //-------------------------------------------------------------------
175 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
176         xapp.Logger.Info("GetAllRestSubscriptions() called")
177         response := c.registry.GetAllRestSubscriptions()
178         w.Write(response)
179 }
180
181 //-------------------------------------------------------------------
182 //
183 //-------------------------------------------------------------------
184 func (c *Control) ReadE2Subscriptions() error {
185         var err error
186         var subIds []uint32
187         var register map[uint32]*Subscription
188         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
189                 xapp.Logger.Info("Reading E2 subscriptions from db")
190                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
191                 if err != nil {
192                         xapp.Logger.Error("%v", err)
193                         <-time.After(1 * time.Second)
194                 } else {
195                         c.registry.subIds = subIds
196                         c.registry.register = register
197                         c.HandleUncompletedSubscriptions(register)
198                         return nil
199                 }
200         }
201         xapp.Logger.Info("Continuing without retring")
202         return err
203 }
204
205 //-------------------------------------------------------------------
206 //
207 //-------------------------------------------------------------------
208 func (c *Control) ReadRESTSubscriptions() error {
209         var err error
210         var restSubscriptions map[string]*RESTSubscription
211         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
212                 xapp.Logger.Info("Reading REST subscriptions from db")
213                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
214                 if err != nil {
215                         xapp.Logger.Error("%v", err)
216                         <-time.After(1 * time.Second)
217                 } else {
218                         c.registry.restSubscriptions = restSubscriptions
219                         return nil
220                 }
221         }
222         xapp.Logger.Info("Continuing without retring")
223         return err
224 }
225
226 //-------------------------------------------------------------------
227 //
228 //-------------------------------------------------------------------
229 func (c *Control) ReadConfigParameters(f string) {
230
231         // viper.GetDuration returns nanoseconds
232         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233         if e2tSubReqTimeout == 0 {
234                 e2tSubReqTimeout = 2000 * 1000000
235         }
236         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
237         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
238         if e2tSubDelReqTime == 0 {
239                 e2tSubDelReqTime = 2000 * 1000000
240         }
241         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
242         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
243         if e2tRecvMsgTimeout == 0 {
244                 e2tRecvMsgTimeout = 2000 * 1000000
245         }
246         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
247
248         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
249         if e2tMaxSubReqTryCount == 0 {
250                 e2tMaxSubReqTryCount = 1
251         }
252         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
253
254         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
255         if e2tMaxSubDelReqTryCount == 0 {
256                 e2tMaxSubDelReqTryCount = 1
257         }
258         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
259
260         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
261         if readSubsFromDb == "" {
262                 readSubsFromDb = "true"
263         }
264         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
265
266         dbTryCount = viper.GetInt("controls.dbTryCount")
267         if dbTryCount == 0 {
268                 dbTryCount = 200
269         }
270         xapp.Logger.Info("dbTryCount %v", dbTryCount)
271
272         dbRetryForever = viper.GetString("controls.dbRetryForever")
273         if dbRetryForever == "" {
274                 dbRetryForever = "true"
275         }
276         xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
277
278         c.LoggerLevel = viper.GetUint32("logger.level")
279         if c.LoggerLevel == 0 {
280                 c.LoggerLevel = 3
281         }
282         xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
283
284         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
285         // value 100ms used currently only in unittests.
286         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
287         if waitRouteCleanup_ms == 0 {
288                 waitRouteCleanup_ms = 5000 * 1000000
289         }
290         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
291 }
292
293 //-------------------------------------------------------------------
294 //
295 //-------------------------------------------------------------------
296 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
297
298         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
299         for subId, subs := range register {
300                 if subs.SubRespRcvd == false {
301                         subs.NoRespToXapp = true
302                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
303                         c.SendSubscriptionDeleteReq(subs)
304                 }
305         }
306 }
307
308 func (c *Control) ReadyCB(data interface{}) {
309         if c.RMRClient == nil {
310                 c.RMRClient = xapp.Rmr
311         }
312 }
313
314 func (c *Control) Run() {
315         xapp.SetReadyCB(c.ReadyCB, nil)
316         xapp.AddConfigChangeListener(c.ReadConfigParameters)
317         xapp.Run(c)
318 }
319
320 //-------------------------------------------------------------------
321 //
322 //-------------------------------------------------------------------
323 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
324
325         var restSubId string
326         var restSubscription *RESTSubscription
327         var err error
328
329         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
330         if p.SubscriptionID == "" {
331                 if exists {
332                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333                         if restSubscription != nil {
334                                 restSubId = prevRestSubsId
335                                 if err == nil {
336                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
337                                 } else {
338                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
339                                 }
340                         } else {
341                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
343                         }
344                 }
345
346                 if restSubscription == nil {
347                         restSubId = ksuid.New().String()
348                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
349                         if err != nil {
350                                 xapp.Logger.Error("%s", err.Error())
351                                 c.UpdateCounter(cRestSubFailToXapp)
352                                 return nil, "", err
353                         }
354                 }
355         } else {
356                 restSubId = p.SubscriptionID
357
358                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
359
360                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
361                 if err != nil {
362                         xapp.Logger.Error("%s", err.Error())
363                         c.UpdateCounter(cRestSubFailToXapp)
364                         return nil, "", err
365                 }
366
367                 if !exists {
368                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
369                 } else {
370                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
371                 }
372         }
373
374         return restSubscription, restSubId, nil
375 }
376
377 //-------------------------------------------------------------------
378 //
379 //-------------------------------------------------------------------
380 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
381
382         c.CntRecvMsg++
383         c.UpdateCounter(cRestSubReqFromXapp)
384
385         subResp := models.SubscriptionResponse{}
386         p := params.(*models.SubscriptionParams)
387
388         if c.LoggerLevel > 2 {
389                 c.PrintRESTSubscriptionRequest(p)
390         }
391
392         if p.ClientEndpoint == nil {
393                 xapp.Logger.Error("ClientEndpoint == nil")
394                 c.UpdateCounter(cRestSubFailToXapp)
395                 return nil, fmt.Errorf("")
396         }
397
398         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
399         if err != nil {
400                 xapp.Logger.Error("%s", err.Error())
401                 c.UpdateCounter(cRestSubFailToXapp)
402                 return nil, err
403         }
404
405         md5sum, err := CalculateRequestMd5sum(params)
406         if err != nil {
407                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
408         }
409
410         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
411         if err != nil {
412                 xapp.Logger.Error("Failed to get/allocate REST subscription")
413                 return nil, err
414         }
415
416         subResp.SubscriptionID = &restSubId
417         subReqList := e2ap.SubscriptionRequestList{}
418         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
419         if err != nil {
420                 xapp.Logger.Error("%s", err.Error())
421                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
422                 c.registry.DeleteRESTSubscription(&restSubId)
423                 c.UpdateCounter(cRestSubFailToXapp)
424                 return nil, err
425         }
426
427         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
428         if duplicate {
429                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
430                 c.UpdateCounter(cRestSubRespToXapp)
431                 return &subResp, nil
432         }
433
434         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
435
436         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
437
438         c.UpdateCounter(cRestSubRespToXapp)
439         return &subResp, nil
440 }
441
442 //-------------------------------------------------------------------
443 //
444 //-------------------------------------------------------------------
445
446 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
447         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
448
449         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
450
451         var xAppEventInstanceID int64
452         var e2EventInstanceID int64
453
454         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
455
456         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
457                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
458                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
459
460                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
461                 if trans == nil {
462                         // Send notification to xApp that prosessing of a Subscription Request has failed.
463                         err := fmt.Errorf("Tracking failure")
464                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
465                         continue
466                 }
467
468                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
469
470                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
471
472                 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
473
474                 if err != nil {
475                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
476                 } else {
477                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
478                         restSubscription.AddMd5Sum(md5sum)
479                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
480                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
481                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
482                 }
483                 trans.Release()
484         }
485 }
486
487 //-------------------------------------------------------------------
488 //
489 //------------------------------------------------------------------
490 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
491         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
492
493         err := c.tracker.Track(trans)
494         if err != nil {
495                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
496                 err = fmt.Errorf("Tracking failure")
497                 return nil, err
498         }
499
500         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
501         if err != nil {
502                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
503                 return nil, err
504         }
505
506         //
507         // Wake subs request
508         //
509         go c.handleSubscriptionCreate(subs, trans)
510         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
511
512         err = nil
513         if event != nil {
514                 switch themsg := event.(type) {
515                 case *e2ap.E2APSubscriptionResponse:
516                         trans.Release()
517                         return themsg, nil
518                 case *e2ap.E2APSubscriptionFailure:
519                         err = fmt.Errorf("E2 SubscriptionFailure received")
520                         return nil, err
521                 default:
522                         err = fmt.Errorf("unexpected E2 subscription response received")
523                         break
524                 }
525         } else {
526                 err = fmt.Errorf("E2 subscription response timeout")
527         }
528
529         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
530         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
531         return nil, err
532 }
533
534 //-------------------------------------------------------------------
535 //
536 //-------------------------------------------------------------------
537 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
538         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
539
540         // Send notification to xApp that prosessing of a Subscription Request has failed.
541         e2EventInstanceID := (int64)(0)
542         errorCause := err.Error()
543         resp := &models.SubscriptionResponse{
544                 SubscriptionID: restSubId,
545                 SubscriptionInstances: []*models.SubscriptionInstance{
546                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
547                                 ErrorCause:          &errorCause,
548                                 XappEventInstanceID: &xAppEventInstanceID},
549                 },
550         }
551         // Mark REST subscription request processed.
552         restSubscription.SetProcessed(err)
553         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
554         if trans != nil {
555                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
556                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
557         } else {
558                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
559                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
560         }
561
562         c.UpdateCounter(cRestSubFailNotifToXapp)
563         xapp.Subscription.Notify(resp, *clientEndpoint)
564 }
565
566 //-------------------------------------------------------------------
567 //
568 //-------------------------------------------------------------------
569 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
570         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
571
572         // Store successfully processed InstanceId for deletion
573         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
574         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
575
576         // Send notification to xApp that a Subscription Request has been processed.
577         resp := &models.SubscriptionResponse{
578                 SubscriptionID: restSubId,
579                 SubscriptionInstances: []*models.SubscriptionInstance{
580                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
581                                 ErrorCause:          nil,
582                                 XappEventInstanceID: &xAppEventInstanceID},
583                 },
584         }
585         // Mark REST subscription request processesd.
586         restSubscription.SetProcessed(nil)
587         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
588         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
589                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
590
591         c.UpdateCounter(cRestSubNotifToXapp)
592         xapp.Subscription.Notify(resp, *clientEndpoint)
593 }
594
595 //-------------------------------------------------------------------
596 //
597 //-------------------------------------------------------------------
598 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
599
600         c.CntRecvMsg++
601         c.UpdateCounter(cRestSubDelReqFromXapp)
602
603         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
604
605         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
606         if err != nil {
607                 xapp.Logger.Error("%s", err.Error())
608                 if restSubscription == nil {
609                         // Subscription was not found
610                         return nil
611                 } else {
612                         if restSubscription.SubReqOngoing == true {
613                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
614                                 xapp.Logger.Error("%s", err.Error())
615                                 return err
616                         } else if restSubscription.SubDelReqOngoing == true {
617                                 // Previous request for same restSubId still ongoing
618                                 return nil
619                         }
620                 }
621         }
622
623         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
624         go func() {
625                 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
626                 for _, instanceId := range restSubscription.InstanceIds {
627                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
628
629                         if err != nil {
630                                 xapp.Logger.Error("%s", err.Error())
631                                 //return err
632                         }
633                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
634                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
635                         restSubscription.DeleteE2InstanceId(instanceId)
636                 }
637                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
638                 c.registry.DeleteRESTSubscription(&restSubId)
639                 c.RemoveRESTSubscriptionFromDb(restSubId)
640         }()
641
642         c.UpdateCounter(cRestSubDelRespToXapp)
643
644         return nil
645 }
646
647 //-------------------------------------------------------------------
648 //
649 //-------------------------------------------------------------------
650 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
651
652         var xAppEventInstanceID int64
653         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
654         if err != nil {
655                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
656                         restSubId, instanceId, idstring(err, nil))
657                 return xAppEventInstanceID, nil
658         }
659
660         xAppEventInstanceID = int64(subs.ReqId.Id)
661         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
662         if trans == nil {
663                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
664                 xapp.Logger.Error("%s", err.Error())
665         }
666         defer trans.Release()
667
668         err = c.tracker.Track(trans)
669         if err != nil {
670                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
671                 xapp.Logger.Error("%s", err.Error())
672                 return xAppEventInstanceID, &time.ParseError{}
673         }
674         //
675         // Wake subs delete
676         //
677         go c.handleSubscriptionDelete(subs, trans)
678         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
679
680         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
681
682         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
683
684         return xAppEventInstanceID, nil
685 }
686
687 //-------------------------------------------------------------------
688 //
689 //-------------------------------------------------------------------
690 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
691         xapp.Logger.Info("QueryHandler() called")
692
693         c.CntRecvMsg++
694
695         return c.registry.QueryHandler()
696 }
697
698 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
699         xapp.Logger.Info("TestRestHandler() called")
700
701         pathParams := mux.Vars(r)
702         s := pathParams["testId"]
703
704         // This can be used to delete single subscription from db
705         if contains := strings.Contains(s, "deletesubid="); contains == true {
706                 var splits = strings.Split(s, "=")
707                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
708                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
709                         c.RemoveSubscriptionFromSdl(uint32(subId))
710                         return
711                 }
712         }
713
714         // This can be used to remove all subscriptions db from
715         if s == "emptydb" {
716                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
717                 c.RemoveAllSubscriptionsFromSdl()
718                 c.RemoveAllRESTSubscriptionsFromSdl()
719                 return
720         }
721
722         // This is meant to cause submgr's restart in testing
723         if s == "restart" {
724                 xapp.Logger.Info("os.Exit(1) called")
725                 os.Exit(1)
726         }
727
728         xapp.Logger.Info("Unsupported rest command received %s", s)
729 }
730
731 //-------------------------------------------------------------------
732 //
733 //-------------------------------------------------------------------
734
735 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
736         params := &xapp.RMRParams{}
737         params.Mtype = trans.GetMtype()
738         params.SubId = int(subs.GetReqId().InstanceId)
739         params.Xid = ""
740         params.Meid = subs.GetMeid()
741         params.Src = ""
742         params.PayloadLen = len(trans.Payload.Buf)
743         params.Payload = trans.Payload.Buf
744         params.Mbuf = nil
745         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
746         err = c.SendWithRetry(params, false, 5)
747         if err != nil {
748                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
749         }
750         return err
751 }
752
753 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
754
755         params := &xapp.RMRParams{}
756         params.Mtype = trans.GetMtype()
757         params.SubId = int(subs.GetReqId().InstanceId)
758         params.Xid = trans.GetXid()
759         params.Meid = trans.GetMeid()
760         params.Src = ""
761         params.PayloadLen = len(trans.Payload.Buf)
762         params.Payload = trans.Payload.Buf
763         params.Mbuf = nil
764         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
765         err = c.SendWithRetry(params, false, 5)
766         if err != nil {
767                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
768         }
769         return err
770 }
771
772 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
773         if c.RMRClient == nil {
774                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
775                 xapp.Logger.Error("%s", err.Error())
776                 return
777         }
778         c.CntRecvMsg++
779
780         defer c.RMRClient.Free(msg.Mbuf)
781
782         // xapp-frame might use direct access to c buffer and
783         // when msg.Mbuf is freed, someone might take it into use
784         // and payload data might be invalid inside message handle function
785         //
786         // subscriptions won't load system a lot so there is no
787         // real performance hit by cloning buffer into new go byte slice
788         cPay := append(msg.Payload[:0:0], msg.Payload...)
789         msg.Payload = cPay
790         msg.PayloadLen = len(cPay)
791
792         switch msg.Mtype {
793         case xapp.RIC_SUB_REQ:
794                 go c.handleXAPPSubscriptionRequest(msg)
795         case xapp.RIC_SUB_RESP:
796                 go c.handleE2TSubscriptionResponse(msg)
797         case xapp.RIC_SUB_FAILURE:
798                 go c.handleE2TSubscriptionFailure(msg)
799         case xapp.RIC_SUB_DEL_REQ:
800                 go c.handleXAPPSubscriptionDeleteRequest(msg)
801         case xapp.RIC_SUB_DEL_RESP:
802                 go c.handleE2TSubscriptionDeleteResponse(msg)
803         case xapp.RIC_SUB_DEL_FAILURE:
804                 go c.handleE2TSubscriptionDeleteFailure(msg)
805         default:
806                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
807         }
808         return
809 }
810
811 //-------------------------------------------------------------------
812 // handle from XAPP Subscription Request
813 //------------------------------------------------------------------
814 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
815         xapp.Logger.Info("MSG from XAPP: %s", params.String())
816         c.UpdateCounter(cSubReqFromXapp)
817
818         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
819         if err != nil {
820                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
821                 return
822         }
823
824         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
825         if trans == nil {
826                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
827                 return
828         }
829         defer trans.Release()
830
831         if err = c.tracker.Track(trans); err != nil {
832                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
833                 return
834         }
835
836         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
837         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
838         if err != nil {
839                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
840                 return
841         }
842
843         c.wakeSubscriptionRequest(subs, trans)
844 }
845
846 //-------------------------------------------------------------------
847 // Wake Subscription Request to E2node
848 //------------------------------------------------------------------
849 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
850
851         go c.handleSubscriptionCreate(subs, trans)
852         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
853         var err error
854         if event != nil {
855                 switch themsg := event.(type) {
856                 case *e2ap.E2APSubscriptionResponse:
857                         themsg.RequestId.Id = trans.RequestId.Id
858                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
859                         if err == nil {
860                                 trans.Release()
861                                 c.UpdateCounter(cSubRespToXapp)
862                                 c.rmrSendToXapp("", subs, trans)
863                                 return
864                         }
865                 case *e2ap.E2APSubscriptionFailure:
866                         themsg.RequestId.Id = trans.RequestId.Id
867                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
868                         if err == nil {
869                                 c.UpdateCounter(cSubFailToXapp)
870                                 c.rmrSendToXapp("", subs, trans)
871                         }
872                 default:
873                         break
874                 }
875         }
876         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
877         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
878 }
879
880 //-------------------------------------------------------------------
881 // handle from XAPP Subscription Delete Request
882 //------------------------------------------------------------------
883 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
884         xapp.Logger.Info("MSG from XAPP: %s", params.String())
885         c.UpdateCounter(cSubDelReqFromXapp)
886
887         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
888         if err != nil {
889                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
890                 return
891         }
892
893         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
894         if trans == nil {
895                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
896                 return
897         }
898         defer trans.Release()
899
900         err = c.tracker.Track(trans)
901         if err != nil {
902                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
903                 return
904         }
905
906         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
907         if err != nil {
908                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
909                 return
910         }
911
912         //
913         // Wake subs delete
914         //
915         go c.handleSubscriptionDelete(subs, trans)
916         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
917
918         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
919
920         if subs.NoRespToXapp == true {
921                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
922                 return
923         }
924
925         // Whatever is received success, fail or timeout, send successful delete response
926         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
927         subDelRespMsg.RequestId.Id = trans.RequestId.Id
928         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
929         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
930         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
931         if err == nil {
932                 c.UpdateCounter(cSubDelRespToXapp)
933                 c.rmrSendToXapp("", subs, trans)
934         }
935
936         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
937         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
938 }
939
940 //-------------------------------------------------------------------
941 // SUBS CREATE Handling
942 //-------------------------------------------------------------------
943 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
944
945         var removeSubscriptionFromDb bool = false
946         trans := c.tracker.NewSubsTransaction(subs)
947         subs.WaitTransactionTurn(trans)
948         defer subs.ReleaseTransactionTurn(trans)
949         defer trans.Release()
950
951         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
952
953         subRfMsg, valid := subs.GetCachedResponse()
954         if subRfMsg == nil && valid == true {
955                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
956                 switch event.(type) {
957                 case *e2ap.E2APSubscriptionResponse:
958                         subRfMsg, valid = subs.SetCachedResponse(event, true)
959                         subs.SubRespRcvd = true
960                 case *e2ap.E2APSubscriptionFailure:
961                         removeSubscriptionFromDb = true
962                         subRfMsg, valid = subs.SetCachedResponse(event, false)
963                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
964                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
965                 case *SubmgrRestartTestEvent:
966                         // This simulates that no response has been received and after restart subscriptions are restored from db
967                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
968                         return
969                 default:
970                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
971                         removeSubscriptionFromDb = true
972                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
973                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
974                 }
975                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
976         } else {
977                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
978         }
979
980         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
981         if valid == false {
982                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
983         }
984
985         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
986         parentTrans.SendEvent(subRfMsg, 0)
987 }
988
989 //-------------------------------------------------------------------
990 // SUBS DELETE Handling
991 //-------------------------------------------------------------------
992
993 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
994
995         trans := c.tracker.NewSubsTransaction(subs)
996         subs.WaitTransactionTurn(trans)
997         defer subs.ReleaseTransactionTurn(trans)
998         defer trans.Release()
999
1000         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1001
1002         subs.mutex.Lock()
1003
1004         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1005                 subs.valid = false
1006                 subs.mutex.Unlock()
1007                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1008         } else {
1009                 subs.mutex.Unlock()
1010         }
1011         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1012         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1013         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1014         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1015         c.registry.UpdateSubscriptionToDb(subs, c)
1016         parentTrans.SendEvent(nil, 0)
1017 }
1018
1019 //-------------------------------------------------------------------
1020 // send to E2T Subscription Request
1021 //-------------------------------------------------------------------
1022 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1023         var err error
1024         var event interface{} = nil
1025         var timedOut bool = false
1026         const ricRequestorId = 123
1027
1028         subReqMsg := subs.SubReqMsg
1029         subReqMsg.RequestId = subs.GetReqId().RequestId
1030         subReqMsg.RequestId.Id = ricRequestorId
1031         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1032         if err != nil {
1033                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1034                 return event
1035         }
1036
1037         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1038         c.WriteSubscriptionToDb(subs)
1039
1040         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1041                 desc := fmt.Sprintf("(retry %d)", retries)
1042                 if retries == 0 {
1043                         c.UpdateCounter(cSubReqToE2)
1044                 } else {
1045                         c.UpdateCounter(cSubReReqToE2)
1046                 }
1047                 c.rmrSendToE2T(desc, subs, trans)
1048                 if subs.DoNotWaitSubResp == false {
1049                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1050                         if timedOut {
1051                                 c.UpdateCounter(cSubReqTimerExpiry)
1052                                 continue
1053                         }
1054                 } else {
1055                         // Simulating case where subscrition request has been sent but response has not been received before restart
1056                         event = &SubmgrRestartTestEvent{}
1057                 }
1058                 break
1059         }
1060         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1061         return event
1062 }
1063
1064 //-------------------------------------------------------------------
1065 // send to E2T Subscription Delete Request
1066 //-------------------------------------------------------------------
1067
1068 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1069         var err error
1070         var event interface{}
1071         var timedOut bool
1072         const ricRequestorId = 123
1073
1074         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1075         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1076         subDelReqMsg.RequestId.Id = ricRequestorId
1077         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1078         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1079         if err != nil {
1080                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1081                 return event
1082         }
1083
1084         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1085                 desc := fmt.Sprintf("(retry %d)", retries)
1086                 if retries == 0 {
1087                         c.UpdateCounter(cSubDelReqToE2)
1088                 } else {
1089                         c.UpdateCounter(cSubDelReReqToE2)
1090                 }
1091                 c.rmrSendToE2T(desc, subs, trans)
1092                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1093                 if timedOut {
1094                         c.UpdateCounter(cSubDelReqTimerExpiry)
1095                         continue
1096                 }
1097                 break
1098         }
1099         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1100         return event
1101 }
1102
1103 //-------------------------------------------------------------------
1104 // handle from E2T Subscription Response
1105 //-------------------------------------------------------------------
1106 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1107         xapp.Logger.Info("MSG from E2T: %s", params.String())
1108         c.UpdateCounter(cSubRespFromE2)
1109
1110         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1111         if err != nil {
1112                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1113                 return
1114         }
1115         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1116         if err != nil {
1117                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1118                 return
1119         }
1120         trans := subs.GetTransaction()
1121         if trans == nil {
1122                 err = fmt.Errorf("Ongoing transaction not found")
1123                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1124                 return
1125         }
1126         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1127         if sendOk == false {
1128                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1129                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1130         }
1131         return
1132 }
1133
1134 //-------------------------------------------------------------------
1135 // handle from E2T Subscription Failure
1136 //-------------------------------------------------------------------
1137 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1138         xapp.Logger.Info("MSG from E2T: %s", params.String())
1139         c.UpdateCounter(cSubFailFromE2)
1140         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1141         if err != nil {
1142                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1143                 return
1144         }
1145         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1146         if err != nil {
1147                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1148                 return
1149         }
1150         trans := subs.GetTransaction()
1151         if trans == nil {
1152                 err = fmt.Errorf("Ongoing transaction not found")
1153                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1154                 return
1155         }
1156         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1157         if sendOk == false {
1158                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1159                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1160         }
1161         return
1162 }
1163
1164 //-------------------------------------------------------------------
1165 // handle from E2T Subscription Delete Response
1166 //-------------------------------------------------------------------
1167 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1168         xapp.Logger.Info("MSG from E2T: %s", params.String())
1169         c.UpdateCounter(cSubDelRespFromE2)
1170         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1171         if err != nil {
1172                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1173                 return
1174         }
1175         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1176         if err != nil {
1177                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1178                 return
1179         }
1180         trans := subs.GetTransaction()
1181         if trans == nil {
1182                 err = fmt.Errorf("Ongoing transaction not found")
1183                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1184                 return
1185         }
1186         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1187         if sendOk == false {
1188                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1189                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1190         }
1191         return
1192 }
1193
1194 //-------------------------------------------------------------------
1195 // handle from E2T Subscription Delete Failure
1196 //-------------------------------------------------------------------
1197 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1198         xapp.Logger.Info("MSG from E2T: %s", params.String())
1199         c.UpdateCounter(cSubDelFailFromE2)
1200         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1201         if err != nil {
1202                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1203                 return
1204         }
1205         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1206         if err != nil {
1207                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1208                 return
1209         }
1210         trans := subs.GetTransaction()
1211         if trans == nil {
1212                 err = fmt.Errorf("Ongoing transaction not found")
1213                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1214                 return
1215         }
1216         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1217         if sendOk == false {
1218                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1219                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1220         }
1221         return
1222 }
1223
1224 //-------------------------------------------------------------------
1225 //
1226 //-------------------------------------------------------------------
1227 func typeofSubsMessage(v interface{}) string {
1228         if v == nil {
1229                 return "NIL"
1230         }
1231         switch v.(type) {
1232         //case *e2ap.E2APSubscriptionRequest:
1233         //      return "SubReq"
1234         case *e2ap.E2APSubscriptionResponse:
1235                 return "SubResp"
1236         case *e2ap.E2APSubscriptionFailure:
1237                 return "SubFail"
1238         //case *e2ap.E2APSubscriptionDeleteRequest:
1239         //      return "SubDelReq"
1240         case *e2ap.E2APSubscriptionDeleteResponse:
1241                 return "SubDelResp"
1242         case *e2ap.E2APSubscriptionDeleteFailure:
1243                 return "SubDelFail"
1244         default:
1245                 return "Unknown"
1246         }
1247 }
1248
1249 //-------------------------------------------------------------------
1250 //
1251 //-------------------------------------------------------------------
1252 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1253         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1254         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1255         if err != nil {
1256                 xapp.Logger.Error("%v", err)
1257         }
1258 }
1259
1260 //-------------------------------------------------------------------
1261 //
1262 //-------------------------------------------------------------------
1263 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1264
1265         if removeSubscriptionFromDb == true {
1266                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1267                 c.RemoveSubscriptionFromDb(subs)
1268         } else {
1269                 // Update is needed for successful response and merge case here
1270                 if subs.RetryFromXapp == false {
1271                         c.WriteSubscriptionToDb(subs)
1272                 }
1273         }
1274         subs.RetryFromXapp = false
1275 }
1276
1277 //-------------------------------------------------------------------
1278 //
1279 //-------------------------------------------------------------------
1280 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1281         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1282         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1283         if err != nil {
1284                 xapp.Logger.Error("%v", err)
1285         }
1286 }
1287
1288 //-------------------------------------------------------------------
1289 //
1290 //-------------------------------------------------------------------
1291 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1292         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1293         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1294         if err != nil {
1295                 xapp.Logger.Error("%v", err)
1296         }
1297 }
1298
1299 //-------------------------------------------------------------------
1300 //
1301 //-------------------------------------------------------------------
1302 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1303
1304         if removeRestSubscriptionFromDb == true {
1305                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1306                 c.RemoveRESTSubscriptionFromDb(restSubId)
1307         } else {
1308                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1309         }
1310 }
1311
1312 //-------------------------------------------------------------------
1313 //
1314 //-------------------------------------------------------------------
1315 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1316         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1317         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1318         if err != nil {
1319                 xapp.Logger.Error("%v", err)
1320         }
1321 }
1322
1323 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1324
1325         const ricRequestorId = 123
1326         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1327
1328         // Send delete for every endpoint in the subscription
1329         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1330         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1331         subDelReqMsg.RequestId.Id = ricRequestorId
1332         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1333         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1334         if err != nil {
1335                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1336                 return
1337         }
1338         for _, endPoint := range subs.EpList.Endpoints {
1339                 params := &xapp.RMRParams{}
1340                 params.Mtype = mType
1341                 params.SubId = int(subs.GetReqId().InstanceId)
1342                 params.Xid = ""
1343                 params.Meid = subs.Meid
1344                 params.Src = endPoint.String()
1345                 params.PayloadLen = len(payload.Buf)
1346                 params.Payload = payload.Buf
1347                 params.Mbuf = nil
1348                 subs.DeleteFromDb = true
1349                 c.handleXAPPSubscriptionDeleteRequest(params)
1350         }
1351 }
1352
1353 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1354
1355         fmt.Println("CRESTSubscriptionRequest")
1356
1357         if p == nil {
1358                 return
1359         }
1360
1361         if p.SubscriptionID != "" {
1362                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1363         } else {
1364                 fmt.Println("  SubscriptionID = ''")
1365         }
1366
1367         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1368
1369         if p.ClientEndpoint.HTTPPort != nil {
1370                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1371         } else {
1372                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1373         }
1374
1375         if p.ClientEndpoint.RMRPort != nil {
1376                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1377         } else {
1378                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1379         }
1380
1381         if p.Meid != nil {
1382                 fmt.Printf("  Meid = %s\n", *p.Meid)
1383         } else {
1384                 fmt.Println("  Meid = nil")
1385         }
1386
1387         for _, subscriptionDetail := range p.SubscriptionDetails {
1388                 if p.RANFunctionID != nil {
1389                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1390                 } else {
1391                         fmt.Println("  RANFunctionID = nil")
1392                 }
1393                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1394                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1395
1396                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1397                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1398                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1399                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1400
1401                         if actionToBeSetup.SubsequentAction != nil {
1402                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1403                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1404                         } else {
1405                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1406                         }
1407                 }
1408         }
1409 }