14bf034cc3aa418a919440fbdeede94a4e6415c5
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
76 var dbTryCount int
77
78 type Control struct {
79         *xapp.RMRClient
80         e2ap          *E2ap
81         registry      *Registry
82         tracker       *Tracker
83         e2SubsDb      Sdlnterface
84         restSubsDb    Sdlnterface
85         CntRecvMsg    uint64
86         ResetTestFlag bool
87         Counters      map[string]xapp.Counter
88         LoggerLevel   uint32
89 }
90
91 type RMRMeid struct {
92         PlmnID  string
93         EnbID   string
94         RanName string
95 }
96
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
99
100 func init() {
101         xapp.Logger.Info("SUBMGR")
102         viper.AutomaticEnv()
103         viper.SetEnvPrefix("submgr")
104         viper.AllowEmptyEnv(true)
105 }
106
107 func NewControl() *Control {
108
109         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
111
112         registry := new(Registry)
113         registry.Initialize()
114         registry.rtmgrClient = &rtmgrClient
115
116         tracker := new(Tracker)
117         tracker.Init()
118
119         c := &Control{e2ap: new(E2ap),
120                 registry:    registry,
121                 tracker:     tracker,
122                 e2SubsDb:    CreateSdl(),
123                 restSubsDb:  CreateRESTSdl(),
124                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
125                 LoggerLevel: 3,
126         }
127         c.ReadConfigParameters("")
128
129         // Register REST handler for testing support
130         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
133
134         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
135
136         if readSubsFromDb == "false" {
137                 return c
138         }
139
140         restDuplicateCtrl.Init()
141
142         // Read subscriptions from db
143         c.ReadE2Subscriptions()
144         c.ReadRESTSubscriptions()
145
146         /*
147                 xapp.Logger.Info("Reading subscriptions from db")
148                 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
149                 if err != nil {
150                         xapp.Logger.Error("%v", err)
151                 } else {
152                         c.registry.subIds = subIds
153                         c.registry.register = register
154                         c.HandleUncompletedSubscriptions(register)
155                 }
156
157                 restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
158                 if err != nil {
159                         xapp.Logger.Error("%v", err)
160                 } else {
161                         c.registry.restSubscriptions = restSubscriptions
162                 }
163         */
164         return c
165 }
166
167 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
168         subscriptions, _ := c.registry.QueryHandler()
169         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
170 }
171
172 //-------------------------------------------------------------------
173 //
174 //-------------------------------------------------------------------
175 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
176         xapp.Logger.Info("GetAllRestSubscriptions() called")
177         response := c.registry.GetAllRestSubscriptions()
178         w.Write(response)
179 }
180
181 //-------------------------------------------------------------------
182 //
183 //-------------------------------------------------------------------
184 func (c *Control) ReadE2Subscriptions() error {
185         var err error
186         var subIds []uint32
187         var register map[uint32]*Subscription
188         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
189                 xapp.Logger.Info("Reading E2 subscriptions from db")
190                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
191                 if err != nil {
192                         xapp.Logger.Error("%v", err)
193                         <-time.After(1 * time.Second)
194                 } else {
195                         c.registry.subIds = subIds
196                         c.registry.register = register
197                         c.HandleUncompletedSubscriptions(register)
198                         return nil
199                 }
200         }
201         xapp.Logger.Info("Continuing without retring")
202         return err
203 }
204
205 //-------------------------------------------------------------------
206 //
207 //-------------------------------------------------------------------
208 func (c *Control) ReadRESTSubscriptions() error {
209         var err error
210         var restSubscriptions map[string]*RESTSubscription
211         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
212                 xapp.Logger.Info("Reading REST subscriptions from db")
213                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
214                 if err != nil {
215                         xapp.Logger.Error("%v", err)
216                         <-time.After(1 * time.Second)
217                 } else {
218                         c.registry.restSubscriptions = restSubscriptions
219                         return nil
220                 }
221         }
222         xapp.Logger.Info("Continuing without retring")
223         return err
224 }
225
226 //-------------------------------------------------------------------
227 //
228 //-------------------------------------------------------------------
229 func (c *Control) ReadConfigParameters(f string) {
230
231         // viper.GetDuration returns nanoseconds
232         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233         if e2tSubReqTimeout == 0 {
234                 e2tSubReqTimeout = 2000 * 1000000
235         }
236         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
237         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
238         if e2tSubDelReqTime == 0 {
239                 e2tSubDelReqTime = 2000 * 1000000
240         }
241         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
242         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
243         if e2tRecvMsgTimeout == 0 {
244                 e2tRecvMsgTimeout = 2000 * 1000000
245         }
246         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
247
248         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
249         if e2tMaxSubReqTryCount == 0 {
250                 e2tMaxSubReqTryCount = 1
251         }
252         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
253
254         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
255         if e2tMaxSubDelReqTryCount == 0 {
256                 e2tMaxSubDelReqTryCount = 1
257         }
258         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
259
260         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
261         if readSubsFromDb == "" {
262                 readSubsFromDb = "true"
263         }
264         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
265
266         dbTryCount = viper.GetInt("controls.dbTryCount")
267         if dbTryCount == 0 {
268                 dbTryCount = 200
269         }
270         xapp.Logger.Info("dbTryCount %v", dbTryCount)
271
272         dbRetryForever = viper.GetString("controls.dbRetryForever")
273         if dbRetryForever == "" {
274                 dbRetryForever = "true"
275         }
276         xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
277
278         c.LoggerLevel = viper.GetUint32("logger.level")
279         if c.LoggerLevel == 0 {
280                 c.LoggerLevel = 3
281         }
282         xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
283
284         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
285         // value 100ms used currently only in unittests.
286         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
287         if waitRouteCleanup_ms == 0 {
288                 waitRouteCleanup_ms = 5000 * 1000000
289         }
290         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
291 }
292
293 //-------------------------------------------------------------------
294 //
295 //-------------------------------------------------------------------
296 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
297
298         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
299         for subId, subs := range register {
300                 if subs.SubRespRcvd == false {
301                         subs.NoRespToXapp = true
302                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
303                         c.SendSubscriptionDeleteReq(subs)
304                 }
305         }
306 }
307
308 func (c *Control) ReadyCB(data interface{}) {
309         if c.RMRClient == nil {
310                 c.RMRClient = xapp.Rmr
311         }
312 }
313
314 func (c *Control) Run() {
315         xapp.SetReadyCB(c.ReadyCB, nil)
316         xapp.AddConfigChangeListener(c.ReadConfigParameters)
317         xapp.Run(c)
318 }
319
320 //-------------------------------------------------------------------
321 //
322 //-------------------------------------------------------------------
323 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
324
325         var restSubId string
326         var restSubscription *RESTSubscription
327         var err error
328
329         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
330         if p.SubscriptionID == "" {
331                 if exists {
332                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333                         if restSubscription != nil {
334                                 restSubId = prevRestSubsId
335                                 if err == nil {
336                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
337                                 } else {
338                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
339                                 }
340                         } else {
341                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
343                         }
344                 }
345
346                 if restSubscription == nil {
347                         restSubId = ksuid.New().String()
348                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
349                         if err != nil {
350                                 xapp.Logger.Error("%s", err.Error())
351                                 c.UpdateCounter(cRestSubFailToXapp)
352                                 return nil, "", err
353                         }
354                 }
355         } else {
356                 restSubId = p.SubscriptionID
357
358                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
359
360                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
361                 if err != nil {
362                         xapp.Logger.Error("%s", err.Error())
363                         c.UpdateCounter(cRestSubFailToXapp)
364                         return nil, "", err
365                 }
366
367                 if !exists {
368                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
369                 } else {
370                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
371                 }
372         }
373
374         return restSubscription, restSubId, nil
375 }
376
377 //-------------------------------------------------------------------
378 //
379 //-------------------------------------------------------------------
380 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
381
382         c.CntRecvMsg++
383         c.UpdateCounter(cRestSubReqFromXapp)
384
385         subResp := models.SubscriptionResponse{}
386         p := params.(*models.SubscriptionParams)
387
388         if c.LoggerLevel > 2 {
389                 c.PrintRESTSubscriptionRequest(p)
390         }
391
392         if p.ClientEndpoint == nil {
393                 xapp.Logger.Error("ClientEndpoint == nil")
394                 c.UpdateCounter(cRestSubFailToXapp)
395                 return nil, fmt.Errorf("")
396         }
397
398         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
399         if err != nil {
400                 xapp.Logger.Error("%s", err.Error())
401                 c.UpdateCounter(cRestSubFailToXapp)
402                 return nil, err
403         }
404
405         md5sum, err := CalculateRequestMd5sum(params)
406         if err != nil {
407                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
408         }
409
410         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
411         if err != nil {
412                 xapp.Logger.Error("Failed to get/allocate REST subscription")
413                 return nil, err
414         }
415
416         subResp.SubscriptionID = &restSubId
417         subReqList := e2ap.SubscriptionRequestList{}
418         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
419         if err != nil {
420                 xapp.Logger.Error("%s", err.Error())
421                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
422                 c.registry.DeleteRESTSubscription(&restSubId)
423                 c.UpdateCounter(cRestSubFailToXapp)
424                 return nil, err
425         }
426
427         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
428         if duplicate {
429                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
430                 c.UpdateCounter(cRestSubRespToXapp)
431                 return &subResp, nil
432         }
433
434         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
435
436         c.UpdateCounter(cRestSubRespToXapp)
437         return &subResp, nil
438 }
439
440 //-------------------------------------------------------------------
441 //
442 //-------------------------------------------------------------------
443
444 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
445         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
446
447         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
448
449         var xAppEventInstanceID int64
450         var e2EventInstanceID int64
451
452         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
453
454         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
455                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
456                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
457
458                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
459                 if trans == nil {
460                         // Send notification to xApp that prosessing of a Subscription Request has failed.
461                         err := fmt.Errorf("Tracking failure")
462                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
463                         continue
464                 }
465
466                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
467
468                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
469
470                 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
471
472                 if err != nil {
473                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
474                 } else {
475                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
476                         restSubscription.AddMd5Sum(md5sum)
477                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
478                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
479                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
480                 }
481                 trans.Release()
482         }
483 }
484
485 //-------------------------------------------------------------------
486 //
487 //------------------------------------------------------------------
488 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
489         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
490
491         err := c.tracker.Track(trans)
492         if err != nil {
493                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
494                 err = fmt.Errorf("Tracking failure")
495                 return nil, err
496         }
497
498         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
499         if err != nil {
500                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
501                 return nil, err
502         }
503
504         //
505         // Wake subs request
506         //
507         go c.handleSubscriptionCreate(subs, trans)
508         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
509
510         err = nil
511         if event != nil {
512                 switch themsg := event.(type) {
513                 case *e2ap.E2APSubscriptionResponse:
514                         trans.Release()
515                         return themsg, nil
516                 case *e2ap.E2APSubscriptionFailure:
517                         err = fmt.Errorf("E2 SubscriptionFailure received")
518                         return nil, err
519                 default:
520                         err = fmt.Errorf("unexpected E2 subscription response received")
521                         break
522                 }
523         } else {
524                 err = fmt.Errorf("E2 subscription response timeout")
525         }
526
527         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
528         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
529         return nil, err
530 }
531
532 //-------------------------------------------------------------------
533 //
534 //-------------------------------------------------------------------
535 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
536         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
537
538         // Send notification to xApp that prosessing of a Subscription Request has failed.
539         e2EventInstanceID := (int64)(0)
540         errorCause := err.Error()
541         resp := &models.SubscriptionResponse{
542                 SubscriptionID: restSubId,
543                 SubscriptionInstances: []*models.SubscriptionInstance{
544                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
545                                 ErrorCause:          &errorCause,
546                                 XappEventInstanceID: &xAppEventInstanceID},
547                 },
548         }
549         // Mark REST subscription request processed.
550         restSubscription.SetProcessed(err)
551         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
552         if trans != nil {
553                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
554                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
555         } else {
556                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
557                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
558         }
559
560         c.UpdateCounter(cRestSubFailNotifToXapp)
561         xapp.Subscription.Notify(resp, *clientEndpoint)
562 }
563
564 //-------------------------------------------------------------------
565 //
566 //-------------------------------------------------------------------
567 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
568         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
569
570         // Store successfully processed InstanceId for deletion
571         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
572         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
573
574         // Send notification to xApp that a Subscription Request has been processed.
575         resp := &models.SubscriptionResponse{
576                 SubscriptionID: restSubId,
577                 SubscriptionInstances: []*models.SubscriptionInstance{
578                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
579                                 ErrorCause:          nil,
580                                 XappEventInstanceID: &xAppEventInstanceID},
581                 },
582         }
583         // Mark REST subscription request processesd.
584         restSubscription.SetProcessed(nil)
585         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
586         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
587                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
588
589         c.UpdateCounter(cRestSubNotifToXapp)
590         xapp.Subscription.Notify(resp, *clientEndpoint)
591 }
592
593 //-------------------------------------------------------------------
594 //
595 //-------------------------------------------------------------------
596 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
597
598         c.CntRecvMsg++
599         c.UpdateCounter(cRestSubDelReqFromXapp)
600
601         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
602
603         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
604         if err != nil {
605                 xapp.Logger.Error("%s", err.Error())
606                 if restSubscription == nil {
607                         // Subscription was not found
608                         return nil
609                 } else {
610                         if restSubscription.SubReqOngoing == true {
611                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
612                                 xapp.Logger.Error("%s", err.Error())
613                                 return err
614                         } else if restSubscription.SubDelReqOngoing == true {
615                                 // Previous request for same restSubId still ongoing
616                                 return nil
617                         }
618                 }
619         }
620
621         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
622         go func() {
623                 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
624                 for _, instanceId := range restSubscription.InstanceIds {
625                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
626
627                         if err != nil {
628                                 xapp.Logger.Error("%s", err.Error())
629                                 //return err
630                         }
631                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
632                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
633                         restSubscription.DeleteE2InstanceId(instanceId)
634                 }
635                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
636                 c.registry.DeleteRESTSubscription(&restSubId)
637                 c.RemoveRESTSubscriptionFromDb(restSubId)
638         }()
639
640         c.UpdateCounter(cRestSubDelRespToXapp)
641
642         return nil
643 }
644
645 //-------------------------------------------------------------------
646 //
647 //-------------------------------------------------------------------
648 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
649
650         var xAppEventInstanceID int64
651         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
652         if err != nil {
653                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
654                         restSubId, instanceId, idstring(err, nil))
655                 return xAppEventInstanceID, nil
656         }
657
658         xAppEventInstanceID = int64(subs.ReqId.Id)
659         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
660         if trans == nil {
661                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
662                 xapp.Logger.Error("%s", err.Error())
663         }
664         defer trans.Release()
665
666         err = c.tracker.Track(trans)
667         if err != nil {
668                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
669                 xapp.Logger.Error("%s", err.Error())
670                 return xAppEventInstanceID, &time.ParseError{}
671         }
672         //
673         // Wake subs delete
674         //
675         go c.handleSubscriptionDelete(subs, trans)
676         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
677
678         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
679
680         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
681
682         return xAppEventInstanceID, nil
683 }
684
685 //-------------------------------------------------------------------
686 //
687 //-------------------------------------------------------------------
688 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
689         xapp.Logger.Info("QueryHandler() called")
690
691         c.CntRecvMsg++
692
693         return c.registry.QueryHandler()
694 }
695
696 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
697         xapp.Logger.Info("TestRestHandler() called")
698
699         pathParams := mux.Vars(r)
700         s := pathParams["testId"]
701
702         // This can be used to delete single subscription from db
703         if contains := strings.Contains(s, "deletesubid="); contains == true {
704                 var splits = strings.Split(s, "=")
705                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
706                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
707                         c.RemoveSubscriptionFromSdl(uint32(subId))
708                         return
709                 }
710         }
711
712         // This can be used to remove all subscriptions db from
713         if s == "emptydb" {
714                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
715                 c.RemoveAllSubscriptionsFromSdl()
716                 c.RemoveAllRESTSubscriptionsFromSdl()
717                 return
718         }
719
720         // This is meant to cause submgr's restart in testing
721         if s == "restart" {
722                 xapp.Logger.Info("os.Exit(1) called")
723                 os.Exit(1)
724         }
725
726         xapp.Logger.Info("Unsupported rest command received %s", s)
727 }
728
729 //-------------------------------------------------------------------
730 //
731 //-------------------------------------------------------------------
732
733 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
734         params := &xapp.RMRParams{}
735         params.Mtype = trans.GetMtype()
736         params.SubId = int(subs.GetReqId().InstanceId)
737         params.Xid = ""
738         params.Meid = subs.GetMeid()
739         params.Src = ""
740         params.PayloadLen = len(trans.Payload.Buf)
741         params.Payload = trans.Payload.Buf
742         params.Mbuf = nil
743         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
744         err = c.SendWithRetry(params, false, 5)
745         if err != nil {
746                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
747         }
748         return err
749 }
750
751 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
752
753         params := &xapp.RMRParams{}
754         params.Mtype = trans.GetMtype()
755         params.SubId = int(subs.GetReqId().InstanceId)
756         params.Xid = trans.GetXid()
757         params.Meid = trans.GetMeid()
758         params.Src = ""
759         params.PayloadLen = len(trans.Payload.Buf)
760         params.Payload = trans.Payload.Buf
761         params.Mbuf = nil
762         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
763         err = c.SendWithRetry(params, false, 5)
764         if err != nil {
765                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
766         }
767         return err
768 }
769
770 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
771         if c.RMRClient == nil {
772                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
773                 xapp.Logger.Error("%s", err.Error())
774                 return
775         }
776         c.CntRecvMsg++
777
778         defer c.RMRClient.Free(msg.Mbuf)
779
780         // xapp-frame might use direct access to c buffer and
781         // when msg.Mbuf is freed, someone might take it into use
782         // and payload data might be invalid inside message handle function
783         //
784         // subscriptions won't load system a lot so there is no
785         // real performance hit by cloning buffer into new go byte slice
786         cPay := append(msg.Payload[:0:0], msg.Payload...)
787         msg.Payload = cPay
788         msg.PayloadLen = len(cPay)
789
790         switch msg.Mtype {
791         case xapp.RIC_SUB_REQ:
792                 go c.handleXAPPSubscriptionRequest(msg)
793         case xapp.RIC_SUB_RESP:
794                 go c.handleE2TSubscriptionResponse(msg)
795         case xapp.RIC_SUB_FAILURE:
796                 go c.handleE2TSubscriptionFailure(msg)
797         case xapp.RIC_SUB_DEL_REQ:
798                 go c.handleXAPPSubscriptionDeleteRequest(msg)
799         case xapp.RIC_SUB_DEL_RESP:
800                 go c.handleE2TSubscriptionDeleteResponse(msg)
801         case xapp.RIC_SUB_DEL_FAILURE:
802                 go c.handleE2TSubscriptionDeleteFailure(msg)
803         default:
804                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
805         }
806         return
807 }
808
809 //-------------------------------------------------------------------
810 // handle from XAPP Subscription Request
811 //------------------------------------------------------------------
812 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
813         xapp.Logger.Info("MSG from XAPP: %s", params.String())
814         c.UpdateCounter(cSubReqFromXapp)
815
816         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
817         if err != nil {
818                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
819                 return
820         }
821
822         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
823         if trans == nil {
824                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
825                 return
826         }
827         defer trans.Release()
828
829         if err = c.tracker.Track(trans); err != nil {
830                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
831                 return
832         }
833
834         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
835         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
836         if err != nil {
837                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
838                 return
839         }
840
841         c.wakeSubscriptionRequest(subs, trans)
842 }
843
844 //-------------------------------------------------------------------
845 // Wake Subscription Request to E2node
846 //------------------------------------------------------------------
847 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
848
849         go c.handleSubscriptionCreate(subs, trans)
850         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
851         var err error
852         if event != nil {
853                 switch themsg := event.(type) {
854                 case *e2ap.E2APSubscriptionResponse:
855                         themsg.RequestId.Id = trans.RequestId.Id
856                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
857                         if err == nil {
858                                 trans.Release()
859                                 c.UpdateCounter(cSubRespToXapp)
860                                 c.rmrSendToXapp("", subs, trans)
861                                 return
862                         }
863                 case *e2ap.E2APSubscriptionFailure:
864                         themsg.RequestId.Id = trans.RequestId.Id
865                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
866                         if err == nil {
867                                 c.UpdateCounter(cSubFailToXapp)
868                                 c.rmrSendToXapp("", subs, trans)
869                         }
870                 default:
871                         break
872                 }
873         }
874         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
875         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
876 }
877
878 //-------------------------------------------------------------------
879 // handle from XAPP Subscription Delete Request
880 //------------------------------------------------------------------
881 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
882         xapp.Logger.Info("MSG from XAPP: %s", params.String())
883         c.UpdateCounter(cSubDelReqFromXapp)
884
885         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
886         if err != nil {
887                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
888                 return
889         }
890
891         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
892         if trans == nil {
893                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
894                 return
895         }
896         defer trans.Release()
897
898         err = c.tracker.Track(trans)
899         if err != nil {
900                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
901                 return
902         }
903
904         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
905         if err != nil {
906                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
907                 return
908         }
909
910         //
911         // Wake subs delete
912         //
913         go c.handleSubscriptionDelete(subs, trans)
914         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
915
916         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
917
918         if subs.NoRespToXapp == true {
919                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
920                 return
921         }
922
923         // Whatever is received success, fail or timeout, send successful delete response
924         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
925         subDelRespMsg.RequestId.Id = trans.RequestId.Id
926         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
927         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
928         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
929         if err == nil {
930                 c.UpdateCounter(cSubDelRespToXapp)
931                 c.rmrSendToXapp("", subs, trans)
932         }
933
934         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
935         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
936 }
937
938 //-------------------------------------------------------------------
939 // SUBS CREATE Handling
940 //-------------------------------------------------------------------
941 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
942
943         var removeSubscriptionFromDb bool = false
944         trans := c.tracker.NewSubsTransaction(subs)
945         subs.WaitTransactionTurn(trans)
946         defer subs.ReleaseTransactionTurn(trans)
947         defer trans.Release()
948
949         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
950
951         subRfMsg, valid := subs.GetCachedResponse()
952         if subRfMsg == nil && valid == true {
953                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
954                 switch event.(type) {
955                 case *e2ap.E2APSubscriptionResponse:
956                         subRfMsg, valid = subs.SetCachedResponse(event, true)
957                         subs.SubRespRcvd = true
958                 case *e2ap.E2APSubscriptionFailure:
959                         removeSubscriptionFromDb = true
960                         subRfMsg, valid = subs.SetCachedResponse(event, false)
961                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
962                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
963                 case *SubmgrRestartTestEvent:
964                         // This simulates that no response has been received and after restart subscriptions are restored from db
965                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
966                         return
967                 default:
968                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
969                         removeSubscriptionFromDb = true
970                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
971                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
972                 }
973                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
974         } else {
975                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
976         }
977
978         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
979         if valid == false {
980                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
981         }
982
983         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
984         parentTrans.SendEvent(subRfMsg, 0)
985 }
986
987 //-------------------------------------------------------------------
988 // SUBS DELETE Handling
989 //-------------------------------------------------------------------
990
991 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
992
993         trans := c.tracker.NewSubsTransaction(subs)
994         subs.WaitTransactionTurn(trans)
995         defer subs.ReleaseTransactionTurn(trans)
996         defer trans.Release()
997
998         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
999
1000         subs.mutex.Lock()
1001
1002         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1003                 subs.valid = false
1004                 subs.mutex.Unlock()
1005                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1006         } else {
1007                 subs.mutex.Unlock()
1008         }
1009         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1010         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1011         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1012         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1013         c.registry.UpdateSubscriptionToDb(subs, c)
1014         parentTrans.SendEvent(nil, 0)
1015 }
1016
1017 //-------------------------------------------------------------------
1018 // send to E2T Subscription Request
1019 //-------------------------------------------------------------------
1020 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1021         var err error
1022         var event interface{} = nil
1023         var timedOut bool = false
1024         const ricRequestorId = 123
1025
1026         subReqMsg := subs.SubReqMsg
1027         subReqMsg.RequestId = subs.GetReqId().RequestId
1028         subReqMsg.RequestId.Id = ricRequestorId
1029         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1030         if err != nil {
1031                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1032                 return event
1033         }
1034
1035         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1036         c.WriteSubscriptionToDb(subs)
1037
1038         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1039                 desc := fmt.Sprintf("(retry %d)", retries)
1040                 if retries == 0 {
1041                         c.UpdateCounter(cSubReqToE2)
1042                 } else {
1043                         c.UpdateCounter(cSubReReqToE2)
1044                 }
1045                 c.rmrSendToE2T(desc, subs, trans)
1046                 if subs.DoNotWaitSubResp == false {
1047                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1048                         if timedOut {
1049                                 c.UpdateCounter(cSubReqTimerExpiry)
1050                                 continue
1051                         }
1052                 } else {
1053                         // Simulating case where subscrition request has been sent but response has not been received before restart
1054                         event = &SubmgrRestartTestEvent{}
1055                 }
1056                 break
1057         }
1058         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1059         return event
1060 }
1061
1062 //-------------------------------------------------------------------
1063 // send to E2T Subscription Delete Request
1064 //-------------------------------------------------------------------
1065
1066 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1067         var err error
1068         var event interface{}
1069         var timedOut bool
1070         const ricRequestorId = 123
1071
1072         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1073         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1074         subDelReqMsg.RequestId.Id = ricRequestorId
1075         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1076         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1077         if err != nil {
1078                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1079                 return event
1080         }
1081
1082         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1083                 desc := fmt.Sprintf("(retry %d)", retries)
1084                 if retries == 0 {
1085                         c.UpdateCounter(cSubDelReqToE2)
1086                 } else {
1087                         c.UpdateCounter(cSubDelReReqToE2)
1088                 }
1089                 c.rmrSendToE2T(desc, subs, trans)
1090                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1091                 if timedOut {
1092                         c.UpdateCounter(cSubDelReqTimerExpiry)
1093                         continue
1094                 }
1095                 break
1096         }
1097         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1098         return event
1099 }
1100
1101 //-------------------------------------------------------------------
1102 // handle from E2T Subscription Response
1103 //-------------------------------------------------------------------
1104 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1105         xapp.Logger.Info("MSG from E2T: %s", params.String())
1106         c.UpdateCounter(cSubRespFromE2)
1107
1108         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1109         if err != nil {
1110                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1111                 return
1112         }
1113         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1114         if err != nil {
1115                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1116                 return
1117         }
1118         trans := subs.GetTransaction()
1119         if trans == nil {
1120                 err = fmt.Errorf("Ongoing transaction not found")
1121                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1122                 return
1123         }
1124         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1125         if sendOk == false {
1126                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1127                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1128         }
1129         return
1130 }
1131
1132 //-------------------------------------------------------------------
1133 // handle from E2T Subscription Failure
1134 //-------------------------------------------------------------------
1135 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1136         xapp.Logger.Info("MSG from E2T: %s", params.String())
1137         c.UpdateCounter(cSubFailFromE2)
1138         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1139         if err != nil {
1140                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1141                 return
1142         }
1143         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1144         if err != nil {
1145                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1146                 return
1147         }
1148         trans := subs.GetTransaction()
1149         if trans == nil {
1150                 err = fmt.Errorf("Ongoing transaction not found")
1151                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1152                 return
1153         }
1154         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1155         if sendOk == false {
1156                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1157                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1158         }
1159         return
1160 }
1161
1162 //-------------------------------------------------------------------
1163 // handle from E2T Subscription Delete Response
1164 //-------------------------------------------------------------------
1165 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1166         xapp.Logger.Info("MSG from E2T: %s", params.String())
1167         c.UpdateCounter(cSubDelRespFromE2)
1168         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1169         if err != nil {
1170                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1171                 return
1172         }
1173         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1174         if err != nil {
1175                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1176                 return
1177         }
1178         trans := subs.GetTransaction()
1179         if trans == nil {
1180                 err = fmt.Errorf("Ongoing transaction not found")
1181                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1182                 return
1183         }
1184         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1185         if sendOk == false {
1186                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1187                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1188         }
1189         return
1190 }
1191
1192 //-------------------------------------------------------------------
1193 // handle from E2T Subscription Delete Failure
1194 //-------------------------------------------------------------------
1195 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1196         xapp.Logger.Info("MSG from E2T: %s", params.String())
1197         c.UpdateCounter(cSubDelFailFromE2)
1198         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1199         if err != nil {
1200                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1201                 return
1202         }
1203         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1204         if err != nil {
1205                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1206                 return
1207         }
1208         trans := subs.GetTransaction()
1209         if trans == nil {
1210                 err = fmt.Errorf("Ongoing transaction not found")
1211                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1212                 return
1213         }
1214         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1215         if sendOk == false {
1216                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1217                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1218         }
1219         return
1220 }
1221
1222 //-------------------------------------------------------------------
1223 //
1224 //-------------------------------------------------------------------
1225 func typeofSubsMessage(v interface{}) string {
1226         if v == nil {
1227                 return "NIL"
1228         }
1229         switch v.(type) {
1230         //case *e2ap.E2APSubscriptionRequest:
1231         //      return "SubReq"
1232         case *e2ap.E2APSubscriptionResponse:
1233                 return "SubResp"
1234         case *e2ap.E2APSubscriptionFailure:
1235                 return "SubFail"
1236         //case *e2ap.E2APSubscriptionDeleteRequest:
1237         //      return "SubDelReq"
1238         case *e2ap.E2APSubscriptionDeleteResponse:
1239                 return "SubDelResp"
1240         case *e2ap.E2APSubscriptionDeleteFailure:
1241                 return "SubDelFail"
1242         default:
1243                 return "Unknown"
1244         }
1245 }
1246
1247 //-------------------------------------------------------------------
1248 //
1249 //-------------------------------------------------------------------
1250 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1251         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1252         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1253         if err != nil {
1254                 xapp.Logger.Error("%v", err)
1255         }
1256 }
1257
1258 //-------------------------------------------------------------------
1259 //
1260 //-------------------------------------------------------------------
1261 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1262
1263         if removeSubscriptionFromDb == true {
1264                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1265                 c.RemoveSubscriptionFromDb(subs)
1266         } else {
1267                 // Update is needed for successful response and merge case here
1268                 if subs.RetryFromXapp == false {
1269                         c.WriteSubscriptionToDb(subs)
1270                 }
1271         }
1272         subs.RetryFromXapp = false
1273 }
1274
1275 //-------------------------------------------------------------------
1276 //
1277 //-------------------------------------------------------------------
1278 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1279         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1280         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1281         if err != nil {
1282                 xapp.Logger.Error("%v", err)
1283         }
1284 }
1285
1286 //-------------------------------------------------------------------
1287 //
1288 //-------------------------------------------------------------------
1289 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1290         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1291         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1292         if err != nil {
1293                 xapp.Logger.Error("%v", err)
1294         }
1295 }
1296
1297 //-------------------------------------------------------------------
1298 //
1299 //-------------------------------------------------------------------
1300 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1301
1302         if removeRestSubscriptionFromDb == true {
1303                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1304                 c.RemoveRESTSubscriptionFromDb(restSubId)
1305         } else {
1306                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1307         }
1308 }
1309
1310 //-------------------------------------------------------------------
1311 //
1312 //-------------------------------------------------------------------
1313 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1314         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1315         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1316         if err != nil {
1317                 xapp.Logger.Error("%v", err)
1318         }
1319 }
1320
1321 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1322
1323         const ricRequestorId = 123
1324         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1325
1326         // Send delete for every endpoint in the subscription
1327         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1328         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1329         subDelReqMsg.RequestId.Id = ricRequestorId
1330         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1331         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1332         if err != nil {
1333                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1334                 return
1335         }
1336         for _, endPoint := range subs.EpList.Endpoints {
1337                 params := &xapp.RMRParams{}
1338                 params.Mtype = mType
1339                 params.SubId = int(subs.GetReqId().InstanceId)
1340                 params.Xid = ""
1341                 params.Meid = subs.Meid
1342                 params.Src = endPoint.String()
1343                 params.PayloadLen = len(payload.Buf)
1344                 params.Payload = payload.Buf
1345                 params.Mbuf = nil
1346                 subs.DeleteFromDb = true
1347                 c.handleXAPPSubscriptionDeleteRequest(params)
1348         }
1349 }
1350
1351 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1352
1353         fmt.Println("CRESTSubscriptionRequest")
1354
1355         if p == nil {
1356                 return
1357         }
1358
1359         if p.SubscriptionID != "" {
1360                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1361         } else {
1362                 fmt.Println("  SubscriptionID = ''")
1363         }
1364
1365         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1366
1367         if p.ClientEndpoint.HTTPPort != nil {
1368                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1369         } else {
1370                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1371         }
1372
1373         if p.ClientEndpoint.RMRPort != nil {
1374                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1375         } else {
1376                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1377         }
1378
1379         if p.Meid != nil {
1380                 fmt.Printf("  Meid = %s\n", *p.Meid)
1381         } else {
1382                 fmt.Println("  Meid = nil")
1383         }
1384
1385         for _, subscriptionDetail := range p.SubscriptionDetails {
1386                 if p.RANFunctionID != nil {
1387                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1388                 } else {
1389                         fmt.Println("  RANFunctionID = nil")
1390                 }
1391                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1392                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1393
1394                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1395                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1396                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1397                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1398
1399                         if actionToBeSetup.SubsequentAction != nil {
1400                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1401                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1402                         } else {
1403                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1404                         }
1405                 }
1406         }
1407 }