Fix for xApp transction release place
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
76 var dbTryCount int
77
78 type Control struct {
79         *xapp.RMRClient
80         e2ap          *E2ap
81         registry      *Registry
82         tracker       *Tracker
83         e2SubsDb      Sdlnterface
84         restSubsDb    Sdlnterface
85         CntRecvMsg    uint64
86         ResetTestFlag bool
87         Counters      map[string]xapp.Counter
88         LoggerLevel   uint32
89         UTTesting     bool
90 }
91
92 type RMRMeid struct {
93         PlmnID  string
94         EnbID   string
95         RanName string
96 }
97
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
100
101 func init() {
102         xapp.Logger.Info("SUBMGR")
103         viper.AutomaticEnv()
104         viper.SetEnvPrefix("submgr")
105         viper.AllowEmptyEnv(true)
106 }
107
108 func NewControl() *Control {
109
110         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
111         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
112
113         registry := new(Registry)
114         registry.Initialize()
115         registry.rtmgrClient = &rtmgrClient
116
117         tracker := new(Tracker)
118         tracker.Init()
119
120         c := &Control{e2ap: new(E2ap),
121                 registry:    registry,
122                 tracker:     tracker,
123                 e2SubsDb:    CreateSdl(),
124                 restSubsDb:  CreateRESTSdl(),
125                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
126                 LoggerLevel: 3,
127         }
128         c.ReadConfigParameters("")
129
130         // Register REST handler for testing support
131         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
132         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
133         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
134
135         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
136
137         if readSubsFromDb == "false" {
138                 return c
139         }
140
141         restDuplicateCtrl.Init()
142
143         // Read subscriptions from db
144         c.ReadE2Subscriptions()
145         c.ReadRESTSubscriptions()
146         return c
147 }
148
149 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
150         subscriptions, _ := c.registry.QueryHandler()
151         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
152 }
153
154 //-------------------------------------------------------------------
155 //
156 //-------------------------------------------------------------------
157 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
158         xapp.Logger.Info("GetAllRestSubscriptions() called")
159         response := c.registry.GetAllRestSubscriptions()
160         w.Write(response)
161 }
162
163 //-------------------------------------------------------------------
164 //
165 //-------------------------------------------------------------------
166 func (c *Control) ReadE2Subscriptions() error {
167         var err error
168         var subIds []uint32
169         var register map[uint32]*Subscription
170         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
171                 xapp.Logger.Info("Reading E2 subscriptions from db")
172                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
173                 if err != nil {
174                         xapp.Logger.Error("%v", err)
175                         <-time.After(1 * time.Second)
176                 } else {
177                         c.registry.subIds = subIds
178                         c.registry.register = register
179                         c.HandleUncompletedSubscriptions(register)
180                         return nil
181                 }
182         }
183         xapp.Logger.Info("Continuing without retring")
184         return err
185 }
186
187 //-------------------------------------------------------------------
188 //
189 //-------------------------------------------------------------------
190 func (c *Control) ReadRESTSubscriptions() error {
191         var err error
192         var restSubscriptions map[string]*RESTSubscription
193         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
194                 xapp.Logger.Info("Reading REST subscriptions from db")
195                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
196                 if err != nil {
197                         xapp.Logger.Error("%v", err)
198                         <-time.After(1 * time.Second)
199                 } else {
200                         c.registry.restSubscriptions = restSubscriptions
201                         return nil
202                 }
203         }
204         xapp.Logger.Info("Continuing without retring")
205         return err
206 }
207
208 //-------------------------------------------------------------------
209 //
210 //-------------------------------------------------------------------
211 func (c *Control) ReadConfigParameters(f string) {
212
213         // viper.GetDuration returns nanoseconds
214         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
215         if e2tSubReqTimeout == 0 {
216                 e2tSubReqTimeout = 2000 * 1000000
217         }
218         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
219         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
220         if e2tSubDelReqTime == 0 {
221                 e2tSubDelReqTime = 2000 * 1000000
222         }
223         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
224         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
225         if e2tRecvMsgTimeout == 0 {
226                 e2tRecvMsgTimeout = 2000 * 1000000
227         }
228         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
229
230         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
231         if e2tMaxSubReqTryCount == 0 {
232                 e2tMaxSubReqTryCount = 1
233         }
234         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
235
236         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
237         if e2tMaxSubDelReqTryCount == 0 {
238                 e2tMaxSubDelReqTryCount = 1
239         }
240         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
241
242         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
243         if readSubsFromDb == "" {
244                 readSubsFromDb = "true"
245         }
246         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
247
248         dbTryCount = viper.GetInt("controls.dbTryCount")
249         if dbTryCount == 0 {
250                 dbTryCount = 200
251         }
252         xapp.Logger.Info("dbTryCount %v", dbTryCount)
253
254         dbRetryForever = viper.GetString("controls.dbRetryForever")
255         if dbRetryForever == "" {
256                 dbRetryForever = "true"
257         }
258         xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
259
260         c.LoggerLevel = viper.GetUint32("logger.level")
261         if c.LoggerLevel == 0 {
262                 c.LoggerLevel = 3
263         }
264         xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
265
266         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
267         // value 100ms used currently only in unittests.
268         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
269         if waitRouteCleanup_ms == 0 {
270                 waitRouteCleanup_ms = 5000 * 1000000
271         }
272         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
273 }
274
275 //-------------------------------------------------------------------
276 //
277 //-------------------------------------------------------------------
278 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
279
280         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
281         for subId, subs := range register {
282                 if subs.SubRespRcvd == false {
283                         subs.NoRespToXapp = true
284                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
285                         c.SendSubscriptionDeleteReq(subs)
286                 }
287         }
288 }
289
290 func (c *Control) ReadyCB(data interface{}) {
291         if c.RMRClient == nil {
292                 c.RMRClient = xapp.Rmr
293         }
294 }
295
296 func (c *Control) Run() {
297         xapp.SetReadyCB(c.ReadyCB, nil)
298         xapp.AddConfigChangeListener(c.ReadConfigParameters)
299         xapp.Run(c)
300 }
301
302 //-------------------------------------------------------------------
303 //
304 //-------------------------------------------------------------------
305 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
306
307         var restSubId string
308         var restSubscription *RESTSubscription
309         var err error
310
311         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
312         if p.SubscriptionID == "" {
313                 if exists {
314                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
315                         if restSubscription != nil {
316                                 restSubId = prevRestSubsId
317                                 if err == nil {
318                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
319                                 } else {
320                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
321                                 }
322                         } else {
323                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
324                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
325                         }
326                 }
327
328                 if restSubscription == nil {
329                         restSubId = ksuid.New().String()
330                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
331                         if err != nil {
332                                 xapp.Logger.Error("%s", err.Error())
333                                 c.UpdateCounter(cRestSubFailToXapp)
334                                 return nil, "", err
335                         }
336                 }
337         } else {
338                 restSubId = p.SubscriptionID
339
340                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
341
342                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
343                 if err != nil {
344                         xapp.Logger.Error("%s", err.Error())
345                         c.UpdateCounter(cRestSubFailToXapp)
346                         return nil, "", err
347                 }
348
349                 if !exists {
350                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
351                 } else {
352                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
353                 }
354         }
355
356         return restSubscription, restSubId, nil
357 }
358
359 //-------------------------------------------------------------------
360 //
361 //-------------------------------------------------------------------
362 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
363
364         c.CntRecvMsg++
365         c.UpdateCounter(cRestSubReqFromXapp)
366
367         subResp := models.SubscriptionResponse{}
368         p := params.(*models.SubscriptionParams)
369
370         if c.LoggerLevel > 2 {
371                 c.PrintRESTSubscriptionRequest(p)
372         }
373
374         if p.ClientEndpoint == nil {
375                 xapp.Logger.Error("ClientEndpoint == nil")
376                 c.UpdateCounter(cRestSubFailToXapp)
377                 return nil, fmt.Errorf("")
378         }
379
380         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
381         if err != nil {
382                 xapp.Logger.Error("%s", err.Error())
383                 c.UpdateCounter(cRestSubFailToXapp)
384                 return nil, err
385         }
386
387         md5sum, err := CalculateRequestMd5sum(params)
388         if err != nil {
389                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
390         }
391
392         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
393         if err != nil {
394                 xapp.Logger.Error("Failed to get/allocate REST subscription")
395                 return nil, err
396         }
397
398         subResp.SubscriptionID = &restSubId
399         subReqList := e2ap.SubscriptionRequestList{}
400         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
401         if err != nil {
402                 xapp.Logger.Error("%s", err.Error())
403                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
404                 c.registry.DeleteRESTSubscription(&restSubId)
405                 c.UpdateCounter(cRestSubFailToXapp)
406                 return nil, err
407         }
408
409         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
410         if duplicate {
411                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
412                 c.UpdateCounter(cRestSubRespToXapp)
413                 return &subResp, nil
414         }
415
416         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
417
418         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
419
420         c.UpdateCounter(cRestSubRespToXapp)
421         return &subResp, nil
422 }
423
424 //-------------------------------------------------------------------
425 //
426 //-------------------------------------------------------------------
427
428 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
429         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
430
431         c.SubscriptionProcessingStartDelay()
432         xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
433
434         var xAppEventInstanceID int64
435         var e2EventInstanceID int64
436
437         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
438
439         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
440                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
441                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
442
443                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
444                 if trans == nil {
445                         // Send notification to xApp that prosessing of a Subscription Request has failed.
446                         err := fmt.Errorf("Tracking failure")
447                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
448                         continue
449                 }
450
451                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
452
453                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
454
455                 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
456                 trans.Release()
457
458                 if err != nil {
459                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
460                 } else {
461                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
462                         restSubscription.AddMd5Sum(md5sum)
463                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
464                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
465                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
466                 }
467         }
468 }
469
470 //-------------------------------------------------------------------
471 //
472 //------------------------------------------------------------------
473 func (c *Control) SubscriptionProcessingStartDelay() {
474         if c.UTTesting == true {
475                 // This is temporary fix for the UT problem that notification arrives before subscription response
476                 // Correct fix would be to allow notification come before response and process it correctly
477                 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
478                 <-time.After(time.Millisecond * 50)
479                 xapp.Logger.Debug("Continuing after delay")
480         }
481 }
482
483 //-------------------------------------------------------------------
484 //
485 //------------------------------------------------------------------
486 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
487         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
488
489         err := c.tracker.Track(trans)
490         if err != nil {
491                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
492                 err = fmt.Errorf("Tracking failure")
493                 return nil, err
494         }
495
496         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
497         if err != nil {
498                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
499                 return nil, err
500         }
501
502         //
503         // Wake subs request
504         //
505         go c.handleSubscriptionCreate(subs, trans)
506         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
507
508         err = nil
509         if event != nil {
510                 switch themsg := event.(type) {
511                 case *e2ap.E2APSubscriptionResponse:
512                         trans.Release()
513                         return themsg, nil
514                 case *e2ap.E2APSubscriptionFailure:
515                         err = fmt.Errorf("E2 SubscriptionFailure received")
516                         return nil, err
517                 default:
518                         err = fmt.Errorf("unexpected E2 subscription response received")
519                         break
520                 }
521         } else {
522                 err = fmt.Errorf("E2 subscription response timeout")
523         }
524
525         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
526         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
527         return nil, err
528 }
529
530 //-------------------------------------------------------------------
531 //
532 //-------------------------------------------------------------------
533 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
534         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
535
536         // Send notification to xApp that prosessing of a Subscription Request has failed.
537         e2EventInstanceID := (int64)(0)
538         errorCause := err.Error()
539         resp := &models.SubscriptionResponse{
540                 SubscriptionID: restSubId,
541                 SubscriptionInstances: []*models.SubscriptionInstance{
542                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
543                                 ErrorCause:          &errorCause,
544                                 XappEventInstanceID: &xAppEventInstanceID},
545                 },
546         }
547         // Mark REST subscription request processed.
548         restSubscription.SetProcessed(err)
549         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
550         if trans != nil {
551                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
552                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
553         } else {
554                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
555                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
556         }
557
558         c.UpdateCounter(cRestSubFailNotifToXapp)
559         xapp.Subscription.Notify(resp, *clientEndpoint)
560 }
561
562 //-------------------------------------------------------------------
563 //
564 //-------------------------------------------------------------------
565 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
566         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
567
568         // Store successfully processed InstanceId for deletion
569         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
570         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
571
572         // Send notification to xApp that a Subscription Request has been processed.
573         resp := &models.SubscriptionResponse{
574                 SubscriptionID: restSubId,
575                 SubscriptionInstances: []*models.SubscriptionInstance{
576                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
577                                 ErrorCause:          nil,
578                                 XappEventInstanceID: &xAppEventInstanceID},
579                 },
580         }
581         // Mark REST subscription request processesd.
582         restSubscription.SetProcessed(nil)
583         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
584         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
585                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
586
587         c.UpdateCounter(cRestSubNotifToXapp)
588         xapp.Subscription.Notify(resp, *clientEndpoint)
589 }
590
591 //-------------------------------------------------------------------
592 //
593 //-------------------------------------------------------------------
594 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
595
596         c.CntRecvMsg++
597         c.UpdateCounter(cRestSubDelReqFromXapp)
598
599         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
600
601         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
602         if err != nil {
603                 xapp.Logger.Error("%s", err.Error())
604                 if restSubscription == nil {
605                         // Subscription was not found
606                         return nil
607                 } else {
608                         if restSubscription.SubReqOngoing == true {
609                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
610                                 xapp.Logger.Error("%s", err.Error())
611                                 return err
612                         } else if restSubscription.SubDelReqOngoing == true {
613                                 // Previous request for same restSubId still ongoing
614                                 return nil
615                         }
616                 }
617         }
618
619         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
620         go func() {
621                 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
622                 for _, instanceId := range restSubscription.InstanceIds {
623                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
624
625                         if err != nil {
626                                 xapp.Logger.Error("%s", err.Error())
627                                 //return err
628                         }
629                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
630                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
631                         restSubscription.DeleteE2InstanceId(instanceId)
632                 }
633                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
634                 c.registry.DeleteRESTSubscription(&restSubId)
635                 c.RemoveRESTSubscriptionFromDb(restSubId)
636         }()
637
638         c.UpdateCounter(cRestSubDelRespToXapp)
639
640         return nil
641 }
642
643 //-------------------------------------------------------------------
644 //
645 //-------------------------------------------------------------------
646 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
647
648         var xAppEventInstanceID int64
649         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
650         if err != nil {
651                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
652                         restSubId, instanceId, idstring(err, nil))
653                 return xAppEventInstanceID, nil
654         }
655
656         xAppEventInstanceID = int64(subs.ReqId.Id)
657         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
658         if trans == nil {
659                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
660                 xapp.Logger.Error("%s", err.Error())
661         }
662         defer trans.Release()
663
664         err = c.tracker.Track(trans)
665         if err != nil {
666                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
667                 xapp.Logger.Error("%s", err.Error())
668                 return xAppEventInstanceID, &time.ParseError{}
669         }
670         //
671         // Wake subs delete
672         //
673         go c.handleSubscriptionDelete(subs, trans)
674         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
675
676         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
677
678         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
679
680         return xAppEventInstanceID, nil
681 }
682
683 //-------------------------------------------------------------------
684 //
685 //-------------------------------------------------------------------
686 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
687         xapp.Logger.Info("QueryHandler() called")
688
689         c.CntRecvMsg++
690
691         return c.registry.QueryHandler()
692 }
693
694 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
695         xapp.Logger.Info("TestRestHandler() called")
696
697         pathParams := mux.Vars(r)
698         s := pathParams["testId"]
699
700         // This can be used to delete single subscription from db
701         if contains := strings.Contains(s, "deletesubid="); contains == true {
702                 var splits = strings.Split(s, "=")
703                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
704                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
705                         c.RemoveSubscriptionFromSdl(uint32(subId))
706                         return
707                 }
708         }
709
710         // This can be used to remove all subscriptions db from
711         if s == "emptydb" {
712                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
713                 c.RemoveAllSubscriptionsFromSdl()
714                 c.RemoveAllRESTSubscriptionsFromSdl()
715                 return
716         }
717
718         // This is meant to cause submgr's restart in testing
719         if s == "restart" {
720                 xapp.Logger.Info("os.Exit(1) called")
721                 os.Exit(1)
722         }
723
724         xapp.Logger.Info("Unsupported rest command received %s", s)
725 }
726
727 //-------------------------------------------------------------------
728 //
729 //-------------------------------------------------------------------
730
731 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
732         params := &xapp.RMRParams{}
733         params.Mtype = trans.GetMtype()
734         params.SubId = int(subs.GetReqId().InstanceId)
735         params.Xid = ""
736         params.Meid = subs.GetMeid()
737         params.Src = ""
738         params.PayloadLen = len(trans.Payload.Buf)
739         params.Payload = trans.Payload.Buf
740         params.Mbuf = nil
741         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
742         err = c.SendWithRetry(params, false, 5)
743         if err != nil {
744                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
745         }
746         return err
747 }
748
749 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
750
751         params := &xapp.RMRParams{}
752         params.Mtype = trans.GetMtype()
753         params.SubId = int(subs.GetReqId().InstanceId)
754         params.Xid = trans.GetXid()
755         params.Meid = trans.GetMeid()
756         params.Src = ""
757         params.PayloadLen = len(trans.Payload.Buf)
758         params.Payload = trans.Payload.Buf
759         params.Mbuf = nil
760         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
761         err = c.SendWithRetry(params, false, 5)
762         if err != nil {
763                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
764         }
765         return err
766 }
767
768 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
769         if c.RMRClient == nil {
770                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
771                 xapp.Logger.Error("%s", err.Error())
772                 return
773         }
774         c.CntRecvMsg++
775
776         defer c.RMRClient.Free(msg.Mbuf)
777
778         // xapp-frame might use direct access to c buffer and
779         // when msg.Mbuf is freed, someone might take it into use
780         // and payload data might be invalid inside message handle function
781         //
782         // subscriptions won't load system a lot so there is no
783         // real performance hit by cloning buffer into new go byte slice
784         cPay := append(msg.Payload[:0:0], msg.Payload...)
785         msg.Payload = cPay
786         msg.PayloadLen = len(cPay)
787
788         switch msg.Mtype {
789         case xapp.RIC_SUB_REQ:
790                 go c.handleXAPPSubscriptionRequest(msg)
791         case xapp.RIC_SUB_RESP:
792                 go c.handleE2TSubscriptionResponse(msg)
793         case xapp.RIC_SUB_FAILURE:
794                 go c.handleE2TSubscriptionFailure(msg)
795         case xapp.RIC_SUB_DEL_REQ:
796                 go c.handleXAPPSubscriptionDeleteRequest(msg)
797         case xapp.RIC_SUB_DEL_RESP:
798                 go c.handleE2TSubscriptionDeleteResponse(msg)
799         case xapp.RIC_SUB_DEL_FAILURE:
800                 go c.handleE2TSubscriptionDeleteFailure(msg)
801         default:
802                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
803         }
804         return
805 }
806
807 //-------------------------------------------------------------------
808 // handle from XAPP Subscription Request
809 //------------------------------------------------------------------
810 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
811         xapp.Logger.Info("MSG from XAPP: %s", params.String())
812         c.UpdateCounter(cSubReqFromXapp)
813
814         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
815         if err != nil {
816                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
817                 return
818         }
819
820         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
821         if trans == nil {
822                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
823                 return
824         }
825         defer trans.Release()
826
827         if err = c.tracker.Track(trans); err != nil {
828                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
829                 return
830         }
831
832         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
833         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
834         if err != nil {
835                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
836                 return
837         }
838
839         c.wakeSubscriptionRequest(subs, trans)
840 }
841
842 //-------------------------------------------------------------------
843 // Wake Subscription Request to E2node
844 //------------------------------------------------------------------
845 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
846
847         go c.handleSubscriptionCreate(subs, trans)
848         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
849         var err error
850         if event != nil {
851                 switch themsg := event.(type) {
852                 case *e2ap.E2APSubscriptionResponse:
853                         themsg.RequestId.Id = trans.RequestId.Id
854                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
855                         if err == nil {
856                                 trans.Release()
857                                 c.UpdateCounter(cSubRespToXapp)
858                                 c.rmrSendToXapp("", subs, trans)
859                                 return
860                         }
861                 case *e2ap.E2APSubscriptionFailure:
862                         themsg.RequestId.Id = trans.RequestId.Id
863                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
864                         if err == nil {
865                                 c.UpdateCounter(cSubFailToXapp)
866                                 c.rmrSendToXapp("", subs, trans)
867                         }
868                 default:
869                         break
870                 }
871         }
872         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
873         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
874 }
875
876 //-------------------------------------------------------------------
877 // handle from XAPP Subscription Delete Request
878 //------------------------------------------------------------------
879 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
880         xapp.Logger.Info("MSG from XAPP: %s", params.String())
881         c.UpdateCounter(cSubDelReqFromXapp)
882
883         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
884         if err != nil {
885                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
886                 return
887         }
888
889         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
890         if trans == nil {
891                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
892                 return
893         }
894         defer trans.Release()
895
896         err = c.tracker.Track(trans)
897         if err != nil {
898                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
899                 return
900         }
901
902         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
903         if err != nil {
904                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
905                 return
906         }
907
908         //
909         // Wake subs delete
910         //
911         go c.handleSubscriptionDelete(subs, trans)
912         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
913
914         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
915
916         if subs.NoRespToXapp == true {
917                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
918                 return
919         }
920
921         // Whatever is received success, fail or timeout, send successful delete response
922         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
923         subDelRespMsg.RequestId.Id = trans.RequestId.Id
924         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
925         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
926         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
927         if err == nil {
928                 c.UpdateCounter(cSubDelRespToXapp)
929                 c.rmrSendToXapp("", subs, trans)
930         }
931
932         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
933         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
934 }
935
936 //-------------------------------------------------------------------
937 // SUBS CREATE Handling
938 //-------------------------------------------------------------------
939 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
940
941         var removeSubscriptionFromDb bool = false
942         trans := c.tracker.NewSubsTransaction(subs)
943         subs.WaitTransactionTurn(trans)
944         defer subs.ReleaseTransactionTurn(trans)
945         defer trans.Release()
946
947         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
948
949         subRfMsg, valid := subs.GetCachedResponse()
950         if subRfMsg == nil && valid == true {
951                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
952                 switch event.(type) {
953                 case *e2ap.E2APSubscriptionResponse:
954                         subRfMsg, valid = subs.SetCachedResponse(event, true)
955                         subs.SubRespRcvd = true
956                 case *e2ap.E2APSubscriptionFailure:
957                         removeSubscriptionFromDb = true
958                         subRfMsg, valid = subs.SetCachedResponse(event, false)
959                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
960                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
961                 case *SubmgrRestartTestEvent:
962                         // This simulates that no response has been received and after restart subscriptions are restored from db
963                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
964                         return
965                 default:
966                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
967                         removeSubscriptionFromDb = true
968                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
969                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
970                 }
971                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
972         } else {
973                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
974         }
975
976         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
977         if valid == false {
978                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
979         }
980
981         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
982         parentTrans.SendEvent(subRfMsg, 0)
983 }
984
985 //-------------------------------------------------------------------
986 // SUBS DELETE Handling
987 //-------------------------------------------------------------------
988
989 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
990
991         trans := c.tracker.NewSubsTransaction(subs)
992         subs.WaitTransactionTurn(trans)
993         defer subs.ReleaseTransactionTurn(trans)
994         defer trans.Release()
995
996         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
997
998         subs.mutex.Lock()
999
1000         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1001                 subs.valid = false
1002                 subs.mutex.Unlock()
1003                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1004         } else {
1005                 subs.mutex.Unlock()
1006         }
1007         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1008         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1009         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1010         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1011         c.registry.UpdateSubscriptionToDb(subs, c)
1012         parentTrans.SendEvent(nil, 0)
1013 }
1014
1015 //-------------------------------------------------------------------
1016 // send to E2T Subscription Request
1017 //-------------------------------------------------------------------
1018 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1019         var err error
1020         var event interface{} = nil
1021         var timedOut bool = false
1022         const ricRequestorId = 123
1023
1024         subReqMsg := subs.SubReqMsg
1025         subReqMsg.RequestId = subs.GetReqId().RequestId
1026         subReqMsg.RequestId.Id = ricRequestorId
1027         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1028         if err != nil {
1029                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1030                 return event
1031         }
1032
1033         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1034         c.WriteSubscriptionToDb(subs)
1035
1036         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1037                 desc := fmt.Sprintf("(retry %d)", retries)
1038                 if retries == 0 {
1039                         c.UpdateCounter(cSubReqToE2)
1040                 } else {
1041                         c.UpdateCounter(cSubReReqToE2)
1042                 }
1043                 c.rmrSendToE2T(desc, subs, trans)
1044                 if subs.DoNotWaitSubResp == false {
1045                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1046                         if timedOut {
1047                                 c.UpdateCounter(cSubReqTimerExpiry)
1048                                 continue
1049                         }
1050                 } else {
1051                         // Simulating case where subscrition request has been sent but response has not been received before restart
1052                         event = &SubmgrRestartTestEvent{}
1053                 }
1054                 break
1055         }
1056         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1057         return event
1058 }
1059
1060 //-------------------------------------------------------------------
1061 // send to E2T Subscription Delete Request
1062 //-------------------------------------------------------------------
1063
1064 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1065         var err error
1066         var event interface{}
1067         var timedOut bool
1068         const ricRequestorId = 123
1069
1070         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1071         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1072         subDelReqMsg.RequestId.Id = ricRequestorId
1073         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1074         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1075         if err != nil {
1076                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1077                 return event
1078         }
1079
1080         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1081                 desc := fmt.Sprintf("(retry %d)", retries)
1082                 if retries == 0 {
1083                         c.UpdateCounter(cSubDelReqToE2)
1084                 } else {
1085                         c.UpdateCounter(cSubDelReReqToE2)
1086                 }
1087                 c.rmrSendToE2T(desc, subs, trans)
1088                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1089                 if timedOut {
1090                         c.UpdateCounter(cSubDelReqTimerExpiry)
1091                         continue
1092                 }
1093                 break
1094         }
1095         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1096         return event
1097 }
1098
1099 //-------------------------------------------------------------------
1100 // handle from E2T Subscription Response
1101 //-------------------------------------------------------------------
1102 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1103         xapp.Logger.Info("MSG from E2T: %s", params.String())
1104         c.UpdateCounter(cSubRespFromE2)
1105
1106         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1107         if err != nil {
1108                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1109                 return
1110         }
1111         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1112         if err != nil {
1113                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1114                 return
1115         }
1116         trans := subs.GetTransaction()
1117         if trans == nil {
1118                 err = fmt.Errorf("Ongoing transaction not found")
1119                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1120                 return
1121         }
1122         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1123         if sendOk == false {
1124                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1125                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1126         }
1127         return
1128 }
1129
1130 //-------------------------------------------------------------------
1131 // handle from E2T Subscription Failure
1132 //-------------------------------------------------------------------
1133 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1134         xapp.Logger.Info("MSG from E2T: %s", params.String())
1135         c.UpdateCounter(cSubFailFromE2)
1136         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1137         if err != nil {
1138                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1139                 return
1140         }
1141         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1142         if err != nil {
1143                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1144                 return
1145         }
1146         trans := subs.GetTransaction()
1147         if trans == nil {
1148                 err = fmt.Errorf("Ongoing transaction not found")
1149                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1150                 return
1151         }
1152         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1153         if sendOk == false {
1154                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1155                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1156         }
1157         return
1158 }
1159
1160 //-------------------------------------------------------------------
1161 // handle from E2T Subscription Delete Response
1162 //-------------------------------------------------------------------
1163 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1164         xapp.Logger.Info("MSG from E2T: %s", params.String())
1165         c.UpdateCounter(cSubDelRespFromE2)
1166         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1167         if err != nil {
1168                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1169                 return
1170         }
1171         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1172         if err != nil {
1173                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1174                 return
1175         }
1176         trans := subs.GetTransaction()
1177         if trans == nil {
1178                 err = fmt.Errorf("Ongoing transaction not found")
1179                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1180                 return
1181         }
1182         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1183         if sendOk == false {
1184                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1185                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1186         }
1187         return
1188 }
1189
1190 //-------------------------------------------------------------------
1191 // handle from E2T Subscription Delete Failure
1192 //-------------------------------------------------------------------
1193 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1194         xapp.Logger.Info("MSG from E2T: %s", params.String())
1195         c.UpdateCounter(cSubDelFailFromE2)
1196         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1197         if err != nil {
1198                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1199                 return
1200         }
1201         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1202         if err != nil {
1203                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1204                 return
1205         }
1206         trans := subs.GetTransaction()
1207         if trans == nil {
1208                 err = fmt.Errorf("Ongoing transaction not found")
1209                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1210                 return
1211         }
1212         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1213         if sendOk == false {
1214                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1215                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1216         }
1217         return
1218 }
1219
1220 //-------------------------------------------------------------------
1221 //
1222 //-------------------------------------------------------------------
1223 func typeofSubsMessage(v interface{}) string {
1224         if v == nil {
1225                 return "NIL"
1226         }
1227         switch v.(type) {
1228         //case *e2ap.E2APSubscriptionRequest:
1229         //      return "SubReq"
1230         case *e2ap.E2APSubscriptionResponse:
1231                 return "SubResp"
1232         case *e2ap.E2APSubscriptionFailure:
1233                 return "SubFail"
1234         //case *e2ap.E2APSubscriptionDeleteRequest:
1235         //      return "SubDelReq"
1236         case *e2ap.E2APSubscriptionDeleteResponse:
1237                 return "SubDelResp"
1238         case *e2ap.E2APSubscriptionDeleteFailure:
1239                 return "SubDelFail"
1240         default:
1241                 return "Unknown"
1242         }
1243 }
1244
1245 //-------------------------------------------------------------------
1246 //
1247 //-------------------------------------------------------------------
1248 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1249         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1250         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1251         if err != nil {
1252                 xapp.Logger.Error("%v", err)
1253         }
1254 }
1255
1256 //-------------------------------------------------------------------
1257 //
1258 //-------------------------------------------------------------------
1259 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1260
1261         if removeSubscriptionFromDb == true {
1262                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1263                 c.RemoveSubscriptionFromDb(subs)
1264         } else {
1265                 // Update is needed for successful response and merge case here
1266                 if subs.RetryFromXapp == false {
1267                         c.WriteSubscriptionToDb(subs)
1268                 }
1269         }
1270         subs.RetryFromXapp = false
1271 }
1272
1273 //-------------------------------------------------------------------
1274 //
1275 //-------------------------------------------------------------------
1276 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1277         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1278         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1279         if err != nil {
1280                 xapp.Logger.Error("%v", err)
1281         }
1282 }
1283
1284 //-------------------------------------------------------------------
1285 //
1286 //-------------------------------------------------------------------
1287 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1288         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1289         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1290         if err != nil {
1291                 xapp.Logger.Error("%v", err)
1292         }
1293 }
1294
1295 //-------------------------------------------------------------------
1296 //
1297 //-------------------------------------------------------------------
1298 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1299
1300         if removeRestSubscriptionFromDb == true {
1301                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1302                 c.RemoveRESTSubscriptionFromDb(restSubId)
1303         } else {
1304                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1305         }
1306 }
1307
1308 //-------------------------------------------------------------------
1309 //
1310 //-------------------------------------------------------------------
1311 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1312         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1313         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1314         if err != nil {
1315                 xapp.Logger.Error("%v", err)
1316         }
1317 }
1318
1319 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1320
1321         const ricRequestorId = 123
1322         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1323
1324         // Send delete for every endpoint in the subscription
1325         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1326         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1327         subDelReqMsg.RequestId.Id = ricRequestorId
1328         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1329         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1330         if err != nil {
1331                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1332                 return
1333         }
1334         for _, endPoint := range subs.EpList.Endpoints {
1335                 params := &xapp.RMRParams{}
1336                 params.Mtype = mType
1337                 params.SubId = int(subs.GetReqId().InstanceId)
1338                 params.Xid = ""
1339                 params.Meid = subs.Meid
1340                 params.Src = endPoint.String()
1341                 params.PayloadLen = len(payload.Buf)
1342                 params.Payload = payload.Buf
1343                 params.Mbuf = nil
1344                 subs.DeleteFromDb = true
1345                 c.handleXAPPSubscriptionDeleteRequest(params)
1346         }
1347 }
1348
1349 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1350
1351         fmt.Println("CRESTSubscriptionRequest")
1352
1353         if p == nil {
1354                 return
1355         }
1356
1357         if p.SubscriptionID != "" {
1358                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1359         } else {
1360                 fmt.Println("  SubscriptionID = ''")
1361         }
1362
1363         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1364
1365         if p.ClientEndpoint.HTTPPort != nil {
1366                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1367         } else {
1368                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1369         }
1370
1371         if p.ClientEndpoint.RMRPort != nil {
1372                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1373         } else {
1374                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1375         }
1376
1377         if p.Meid != nil {
1378                 fmt.Printf("  Meid = %s\n", *p.Meid)
1379         } else {
1380                 fmt.Println("  Meid = nil")
1381         }
1382
1383         for _, subscriptionDetail := range p.SubscriptionDetails {
1384                 if p.RANFunctionID != nil {
1385                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1386                 } else {
1387                         fmt.Println("  RANFunctionID = nil")
1388                 }
1389                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1390                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1391
1392                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1393                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1394                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1395                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1396
1397                         if actionToBeSetup.SubsequentAction != nil {
1398                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1399                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1400                         } else {
1401                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1402                         }
1403                 }
1404         }
1405 }