c74b1a5165480211e34697591aec209e0618ca1a
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70 var restDuplicateCtrl duplicateCtrl
71
72 type Control struct {
73         *xapp.RMRClient
74         e2ap          *E2ap
75         registry      *Registry
76         tracker       *Tracker
77         e2SubsDb      Sdlnterface
78         restSubsDb    Sdlnterface
79         CntRecvMsg    uint64
80         ResetTestFlag bool
81         Counters      map[string]xapp.Counter
82         LoggerLevel   uint32
83 }
84
85 type RMRMeid struct {
86         PlmnID  string
87         EnbID   string
88         RanName string
89 }
90
91 type SubmgrRestartTestEvent struct{}
92 type SubmgrRestartUpEvent struct{}
93
94 func init() {
95         xapp.Logger.Info("SUBMGR")
96         viper.AutomaticEnv()
97         viper.SetEnvPrefix("submgr")
98         viper.AllowEmptyEnv(true)
99 }
100
101 func NewControl() *Control {
102
103         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
104         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
105
106         registry := new(Registry)
107         registry.Initialize()
108         registry.rtmgrClient = &rtmgrClient
109
110         tracker := new(Tracker)
111         tracker.Init()
112
113         c := &Control{e2ap: new(E2ap),
114                 registry:    registry,
115                 tracker:     tracker,
116                 e2SubsDb:    CreateSdl(),
117                 restSubsDb:  CreateRESTSdl(),
118                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119                 LoggerLevel: 3,
120         }
121         c.ReadConfigParameters("")
122
123         // Register REST handler for testing support
124         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
125         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
126
127         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
128
129         if readSubsFromDb == "false" {
130                 return c
131         }
132
133         restDuplicateCtrl.Init()
134
135         // Read subscriptions from db
136         xapp.Logger.Info("Reading subscriptions from db")
137         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
138         if err != nil {
139                 xapp.Logger.Error("%v", err)
140         } else {
141                 c.registry.subIds = subIds
142                 c.registry.register = register
143                 c.HandleUncompletedSubscriptions(register)
144         }
145
146         restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
147         if err != nil {
148                 xapp.Logger.Error("%v", err)
149         } else {
150                 c.registry.restSubscriptions = restSubscriptions
151         }
152         return c
153 }
154
155 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
156         subscriptions, _ := c.registry.QueryHandler()
157         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
158 }
159
160 //-------------------------------------------------------------------
161 //
162 //-------------------------------------------------------------------
163 func (c *Control) ReadConfigParameters(f string) {
164
165         // viper.GetDuration returns nanoseconds
166         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
167         if e2tSubReqTimeout == 0 {
168                 e2tSubReqTimeout = 2000 * 1000000
169         }
170         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
171         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
172         if e2tSubDelReqTime == 0 {
173                 e2tSubDelReqTime = 2000 * 1000000
174         }
175         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
176         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
177         if e2tRecvMsgTimeout == 0 {
178                 e2tRecvMsgTimeout = 2000 * 1000000
179         }
180         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
181
182         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
183         // value 100ms used currently only in unittests.
184         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
185         if waitRouteCleanup_ms == 0 {
186                 waitRouteCleanup_ms = 5000 * 1000000
187         }
188         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
189
190         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
191         if e2tMaxSubReqTryCount == 0 {
192                 e2tMaxSubReqTryCount = 1
193         }
194         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
195
196         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
197         if e2tMaxSubDelReqTryCount == 0 {
198                 e2tMaxSubDelReqTryCount = 1
199         }
200         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
201
202         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
203         if readSubsFromDb == "" {
204                 readSubsFromDb = "true"
205         }
206         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
207         c.LoggerLevel = viper.GetUint32("logger.level")
208         if c.LoggerLevel == 0 {
209                 c.LoggerLevel = 3
210         }
211 }
212
213 //-------------------------------------------------------------------
214 //
215 //-------------------------------------------------------------------
216 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
217
218         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
219         for subId, subs := range register {
220                 if subs.SubRespRcvd == false {
221                         subs.NoRespToXapp = true
222                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
223                         c.SendSubscriptionDeleteReq(subs)
224                 }
225         }
226 }
227
228 func (c *Control) ReadyCB(data interface{}) {
229         if c.RMRClient == nil {
230                 c.RMRClient = xapp.Rmr
231         }
232 }
233
234 func (c *Control) Run() {
235         xapp.SetReadyCB(c.ReadyCB, nil)
236         xapp.AddConfigChangeListener(c.ReadConfigParameters)
237         xapp.Run(c)
238 }
239
240 //-------------------------------------------------------------------
241 //
242 //-------------------------------------------------------------------
243 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
244
245         c.CntRecvMsg++
246         c.UpdateCounter(cRestSubReqFromXapp)
247
248         subResp := models.SubscriptionResponse{}
249         p := params.(*models.SubscriptionParams)
250
251         if c.LoggerLevel > 2 {
252                 c.PrintRESTSubscriptionRequest(p)
253         }
254
255         if p.ClientEndpoint == nil {
256                 xapp.Logger.Error("ClientEndpoint == nil")
257                 c.UpdateCounter(cRestSubFailToXapp)
258                 return nil, fmt.Errorf("")
259         }
260
261         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
262         if err != nil {
263                 xapp.Logger.Error("%s", err.Error())
264                 c.UpdateCounter(cRestSubFailToXapp)
265                 return nil, err
266         }
267         var restSubId string
268         var restSubscription *RESTSubscription
269         if p.SubscriptionID == "" {
270                 restSubId = ksuid.New().String()
271                 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
272                 if err != nil {
273                         xapp.Logger.Error("%s", err.Error())
274                         c.UpdateCounter(cRestSubFailToXapp)
275                         return nil, err
276                 }
277
278         } else {
279                 restSubId = p.SubscriptionID
280                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
281                 if err != nil {
282                         xapp.Logger.Error("%s", err.Error())
283                         c.UpdateCounter(cRestSubFailToXapp)
284                         return nil, err
285                 }
286         }
287
288         subResp.SubscriptionID = &restSubId
289         subReqList := e2ap.SubscriptionRequestList{}
290         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
291         if err != nil {
292                 xapp.Logger.Error("%s", err.Error())
293                 c.registry.DeleteRESTSubscription(&restSubId)
294                 c.UpdateCounter(cRestSubFailToXapp)
295                 return nil, err
296         }
297
298         err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
299
300         if err != nil {
301                 // We were unable to detect whether this request was duplicate or not, proceed
302                 xapp.Logger.Info("%s - proceeding with the request", err.Error())
303         } else {
304                 if duplicate {
305                         if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
306                                 xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
307                                 c.UpdateCounter(cRestSubRespToXapp)
308                                 return &subResp, nil
309                         }
310                 }
311                 restSubscription.Md5sum = md5sum
312         }
313
314         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
315
316         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
317
318         c.UpdateCounter(cRestSubRespToXapp)
319         return &subResp, nil
320 }
321
322 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
323         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
324
325         // Send notification to xApp that prosessing of a Subscription Request has failed.
326         e2EventInstanceID := (int64)(0)
327         errorCause := err.Error()
328         resp := &models.SubscriptionResponse{
329                 SubscriptionID: restSubId,
330                 SubscriptionInstances: []*models.SubscriptionInstance{
331                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
332                                 ErrorCause:          &errorCause,
333                                 XappEventInstanceID: &xAppEventInstanceID},
334                 },
335         }
336         // Mark REST subscription request processed.
337         restSubscription.SetProcessed()
338         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
339         if trans != nil {
340                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
341                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
342         } else {
343                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
344                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
345         }
346
347         c.UpdateCounter(cRestSubFailNotifToXapp)
348         xapp.Subscription.Notify(resp, *clientEndpoint)
349 }
350
351 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
352         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
353
354         // Store successfully processed InstanceId for deletion
355         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
356         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
357
358         // Send notification to xApp that a Subscription Request has been processed.
359         resp := &models.SubscriptionResponse{
360                 SubscriptionID: restSubId,
361                 SubscriptionInstances: []*models.SubscriptionInstance{
362                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
363                                 ErrorCause:          nil,
364                                 XappEventInstanceID: &xAppEventInstanceID},
365                 },
366         }
367         // Mark REST subscription request processesd.
368         restSubscription.SetProcessed()
369         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
370         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
371                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
372
373         c.UpdateCounter(cRestSubNotifToXapp)
374         xapp.Subscription.Notify(resp, *clientEndpoint)
375 }
376
377 //-------------------------------------------------------------------
378 //
379 //-------------------------------------------------------------------
380
381 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
382         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
383
384         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
385
386         var xAppEventInstanceID int64
387         var e2EventInstanceID int64
388
389         defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sum)
390
391         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
392                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
393                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
394
395                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
396                 if trans == nil {
397                         // Send notification to xApp that prosessing of a Subscription Request has failed.
398                         err := fmt.Errorf("Tracking failure")
399                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
400                         continue
401                 }
402
403                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
404
405                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
406                 if err != nil {
407                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
408                 } else {
409                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
410                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
411                                 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
412                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
413                 }
414                 trans.Release()
415         }
416 }
417
418 //-------------------------------------------------------------------
419 //
420 //------------------------------------------------------------------
421 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
422         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
423
424         err := c.tracker.Track(trans)
425         if err != nil {
426                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
427                 err = fmt.Errorf("Tracking failure")
428                 return nil, err
429         }
430
431         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
432         if err != nil {
433                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
434                 return nil, err
435         }
436
437         //
438         // Wake subs request
439         //
440         go c.handleSubscriptionCreate(subs, trans)
441         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
442
443         err = nil
444         if event != nil {
445                 switch themsg := event.(type) {
446                 case *e2ap.E2APSubscriptionResponse:
447                         trans.Release()
448                         return themsg, nil
449                 case *e2ap.E2APSubscriptionFailure:
450                         err = fmt.Errorf("E2 SubscriptionFailure received")
451                         return nil, err
452                 default:
453                         err = fmt.Errorf("unexpected E2 subscription response received")
454                         break
455                 }
456         } else {
457                 err = fmt.Errorf("E2 subscription response timeout")
458         }
459
460         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
461         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
462         return nil, err
463 }
464
465 //-------------------------------------------------------------------
466 //
467 //-------------------------------------------------------------------
468 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
469
470         c.CntRecvMsg++
471         c.UpdateCounter(cRestSubDelReqFromXapp)
472
473         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
474
475         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
476         if err != nil {
477                 xapp.Logger.Error("%s", err.Error())
478                 if restSubscription == nil {
479                         // Subscription was not found
480                         return nil
481                 } else {
482                         if restSubscription.SubReqOngoing == true {
483                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
484                                 xapp.Logger.Error("%s", err.Error())
485                                 return err
486                         } else if restSubscription.SubDelReqOngoing == true {
487                                 // Previous request for same restSubId still ongoing
488                                 return nil
489                         }
490                 }
491         }
492
493         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
494         go func() {
495                 for _, instanceId := range restSubscription.InstanceIds {
496                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
497
498                         if err != nil {
499                                 xapp.Logger.Error("%s", err.Error())
500                                 //return err
501                         }
502                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
503                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
504                         restSubscription.DeleteE2InstanceId(instanceId)
505                 }
506                 c.registry.DeleteRESTSubscription(&restSubId)
507                 c.RemoveRESTSubscriptionFromDb(restSubId)
508         }()
509
510         c.UpdateCounter(cRestSubDelRespToXapp)
511
512         return nil
513 }
514
515 //-------------------------------------------------------------------
516 //
517 //-------------------------------------------------------------------
518 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
519
520         var xAppEventInstanceID int64
521         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
522         if err != nil {
523                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
524                         restSubId, instanceId, idstring(err, nil))
525                 return xAppEventInstanceID, nil
526         }
527
528         xAppEventInstanceID = int64(subs.ReqId.Id)
529         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
530         if trans == nil {
531                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
532                 xapp.Logger.Error("%s", err.Error())
533         }
534         defer trans.Release()
535
536         err = c.tracker.Track(trans)
537         if err != nil {
538                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
539                 xapp.Logger.Error("%s", err.Error())
540                 return xAppEventInstanceID, &time.ParseError{}
541         }
542         //
543         // Wake subs delete
544         //
545         go c.handleSubscriptionDelete(subs, trans)
546         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
547
548         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
549
550         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
551
552         return xAppEventInstanceID, nil
553 }
554
555 //-------------------------------------------------------------------
556 //
557 //-------------------------------------------------------------------
558 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
559         xapp.Logger.Info("QueryHandler() called")
560
561         c.CntRecvMsg++
562
563         return c.registry.QueryHandler()
564 }
565
566 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
567         xapp.Logger.Info("TestRestHandler() called")
568
569         pathParams := mux.Vars(r)
570         s := pathParams["testId"]
571
572         // This can be used to delete single subscription from db
573         if contains := strings.Contains(s, "deletesubid="); contains == true {
574                 var splits = strings.Split(s, "=")
575                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
576                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
577                         c.RemoveSubscriptionFromSdl(uint32(subId))
578                         return
579                 }
580         }
581
582         // This can be used to remove all subscriptions db from
583         if s == "emptydb" {
584                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
585                 c.RemoveAllSubscriptionsFromSdl()
586                 c.RemoveAllRESTSubscriptionsFromSdl()
587                 return
588         }
589
590         // This is meant to cause submgr's restart in testing
591         if s == "restart" {
592                 xapp.Logger.Info("os.Exit(1) called")
593                 os.Exit(1)
594         }
595
596         xapp.Logger.Info("Unsupported rest command received %s", s)
597 }
598
599 //-------------------------------------------------------------------
600 //
601 //-------------------------------------------------------------------
602
603 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
604         params := &xapp.RMRParams{}
605         params.Mtype = trans.GetMtype()
606         params.SubId = int(subs.GetReqId().InstanceId)
607         params.Xid = ""
608         params.Meid = subs.GetMeid()
609         params.Src = ""
610         params.PayloadLen = len(trans.Payload.Buf)
611         params.Payload = trans.Payload.Buf
612         params.Mbuf = nil
613         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
614         err = c.SendWithRetry(params, false, 5)
615         if err != nil {
616                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
617         }
618         return err
619 }
620
621 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
622
623         params := &xapp.RMRParams{}
624         params.Mtype = trans.GetMtype()
625         params.SubId = int(subs.GetReqId().InstanceId)
626         params.Xid = trans.GetXid()
627         params.Meid = trans.GetMeid()
628         params.Src = ""
629         params.PayloadLen = len(trans.Payload.Buf)
630         params.Payload = trans.Payload.Buf
631         params.Mbuf = nil
632         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
633         err = c.SendWithRetry(params, false, 5)
634         if err != nil {
635                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
636         }
637         return err
638 }
639
640 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
641         if c.RMRClient == nil {
642                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
643                 xapp.Logger.Error("%s", err.Error())
644                 return
645         }
646         c.CntRecvMsg++
647
648         defer c.RMRClient.Free(msg.Mbuf)
649
650         // xapp-frame might use direct access to c buffer and
651         // when msg.Mbuf is freed, someone might take it into use
652         // and payload data might be invalid inside message handle function
653         //
654         // subscriptions won't load system a lot so there is no
655         // real performance hit by cloning buffer into new go byte slice
656         cPay := append(msg.Payload[:0:0], msg.Payload...)
657         msg.Payload = cPay
658         msg.PayloadLen = len(cPay)
659
660         switch msg.Mtype {
661         case xapp.RIC_SUB_REQ:
662                 go c.handleXAPPSubscriptionRequest(msg)
663         case xapp.RIC_SUB_RESP:
664                 go c.handleE2TSubscriptionResponse(msg)
665         case xapp.RIC_SUB_FAILURE:
666                 go c.handleE2TSubscriptionFailure(msg)
667         case xapp.RIC_SUB_DEL_REQ:
668                 go c.handleXAPPSubscriptionDeleteRequest(msg)
669         case xapp.RIC_SUB_DEL_RESP:
670                 go c.handleE2TSubscriptionDeleteResponse(msg)
671         case xapp.RIC_SUB_DEL_FAILURE:
672                 go c.handleE2TSubscriptionDeleteFailure(msg)
673         default:
674                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
675         }
676         return
677 }
678
679 //-------------------------------------------------------------------
680 // handle from XAPP Subscription Request
681 //------------------------------------------------------------------
682 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
683         xapp.Logger.Info("MSG from XAPP: %s", params.String())
684         c.UpdateCounter(cSubReqFromXapp)
685
686         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
687         if err != nil {
688                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
689                 return
690         }
691
692         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
693         if trans == nil {
694                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
695                 return
696         }
697         defer trans.Release()
698
699         if err = c.tracker.Track(trans); err != nil {
700                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
701                 return
702         }
703
704         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
705         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
706         if err != nil {
707                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
708                 return
709         }
710
711         c.wakeSubscriptionRequest(subs, trans)
712 }
713
714 //-------------------------------------------------------------------
715 // Wake Subscription Request to E2node
716 //------------------------------------------------------------------
717 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
718
719         go c.handleSubscriptionCreate(subs, trans)
720         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
721         var err error
722         if event != nil {
723                 switch themsg := event.(type) {
724                 case *e2ap.E2APSubscriptionResponse:
725                         themsg.RequestId.Id = trans.RequestId.Id
726                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
727                         if err == nil {
728                                 trans.Release()
729                                 c.UpdateCounter(cSubRespToXapp)
730                                 c.rmrSendToXapp("", subs, trans)
731                                 return
732                         }
733                 case *e2ap.E2APSubscriptionFailure:
734                         themsg.RequestId.Id = trans.RequestId.Id
735                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
736                         if err == nil {
737                                 c.UpdateCounter(cSubFailToXapp)
738                                 c.rmrSendToXapp("", subs, trans)
739                         }
740                 default:
741                         break
742                 }
743         }
744         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
745         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
746 }
747
748 //-------------------------------------------------------------------
749 // handle from XAPP Subscription Delete Request
750 //------------------------------------------------------------------
751 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
752         xapp.Logger.Info("MSG from XAPP: %s", params.String())
753         c.UpdateCounter(cSubDelReqFromXapp)
754
755         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
756         if err != nil {
757                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
758                 return
759         }
760
761         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
762         if trans == nil {
763                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
764                 return
765         }
766         defer trans.Release()
767
768         err = c.tracker.Track(trans)
769         if err != nil {
770                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
771                 return
772         }
773
774         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
775         if err != nil {
776                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
777                 return
778         }
779
780         //
781         // Wake subs delete
782         //
783         go c.handleSubscriptionDelete(subs, trans)
784         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
785
786         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
787
788         if subs.NoRespToXapp == true {
789                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
790                 return
791         }
792
793         // Whatever is received success, fail or timeout, send successful delete response
794         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
795         subDelRespMsg.RequestId.Id = trans.RequestId.Id
796         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
797         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
798         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
799         if err == nil {
800                 c.UpdateCounter(cSubDelRespToXapp)
801                 c.rmrSendToXapp("", subs, trans)
802         }
803
804         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
805         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
806 }
807
808 //-------------------------------------------------------------------
809 // SUBS CREATE Handling
810 //-------------------------------------------------------------------
811 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
812
813         var removeSubscriptionFromDb bool = false
814         trans := c.tracker.NewSubsTransaction(subs)
815         subs.WaitTransactionTurn(trans)
816         defer subs.ReleaseTransactionTurn(trans)
817         defer trans.Release()
818
819         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
820
821         subRfMsg, valid := subs.GetCachedResponse()
822         if subRfMsg == nil && valid == true {
823                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
824                 switch event.(type) {
825                 case *e2ap.E2APSubscriptionResponse:
826                         subRfMsg, valid = subs.SetCachedResponse(event, true)
827                         subs.SubRespRcvd = true
828                 case *e2ap.E2APSubscriptionFailure:
829                         removeSubscriptionFromDb = true
830                         subRfMsg, valid = subs.SetCachedResponse(event, false)
831                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
832                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
833                 case *SubmgrRestartTestEvent:
834                         // This simulates that no response has been received and after restart subscriptions are restored from db
835                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
836                         return
837                 default:
838                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
839                         removeSubscriptionFromDb = true
840                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
841                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
842                 }
843                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
844         } else {
845                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
846         }
847
848         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
849         if valid == false {
850                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
851         }
852
853         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
854         parentTrans.SendEvent(subRfMsg, 0)
855 }
856
857 //-------------------------------------------------------------------
858 // SUBS DELETE Handling
859 //-------------------------------------------------------------------
860
861 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
862
863         trans := c.tracker.NewSubsTransaction(subs)
864         subs.WaitTransactionTurn(trans)
865         defer subs.ReleaseTransactionTurn(trans)
866         defer trans.Release()
867
868         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
869
870         subs.mutex.Lock()
871
872         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
873                 subs.valid = false
874                 subs.mutex.Unlock()
875                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
876         } else {
877                 subs.mutex.Unlock()
878         }
879         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
880         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
881         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
882         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
883         c.registry.UpdateSubscriptionToDb(subs, c)
884         parentTrans.SendEvent(nil, 0)
885 }
886
887 //-------------------------------------------------------------------
888 // send to E2T Subscription Request
889 //-------------------------------------------------------------------
890 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
891         var err error
892         var event interface{} = nil
893         var timedOut bool = false
894         const ricRequestorId = 123
895
896         subReqMsg := subs.SubReqMsg
897         subReqMsg.RequestId = subs.GetReqId().RequestId
898         subReqMsg.RequestId.Id = ricRequestorId
899         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
900         if err != nil {
901                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
902                 return event
903         }
904
905         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
906         c.WriteSubscriptionToDb(subs)
907
908         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
909                 desc := fmt.Sprintf("(retry %d)", retries)
910                 if retries == 0 {
911                         c.UpdateCounter(cSubReqToE2)
912                 } else {
913                         c.UpdateCounter(cSubReReqToE2)
914                 }
915                 c.rmrSendToE2T(desc, subs, trans)
916                 if subs.DoNotWaitSubResp == false {
917                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
918                         if timedOut {
919                                 c.UpdateCounter(cSubReqTimerExpiry)
920                                 continue
921                         }
922                 } else {
923                         // Simulating case where subscrition request has been sent but response has not been received before restart
924                         event = &SubmgrRestartTestEvent{}
925                 }
926                 break
927         }
928         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
929         return event
930 }
931
932 //-------------------------------------------------------------------
933 // send to E2T Subscription Delete Request
934 //-------------------------------------------------------------------
935
936 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
937         var err error
938         var event interface{}
939         var timedOut bool
940         const ricRequestorId = 123
941
942         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
943         subDelReqMsg.RequestId = subs.GetReqId().RequestId
944         subDelReqMsg.RequestId.Id = ricRequestorId
945         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
946         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
947         if err != nil {
948                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
949                 return event
950         }
951
952         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
953                 desc := fmt.Sprintf("(retry %d)", retries)
954                 if retries == 0 {
955                         c.UpdateCounter(cSubDelReqToE2)
956                 } else {
957                         c.UpdateCounter(cSubDelReReqToE2)
958                 }
959                 c.rmrSendToE2T(desc, subs, trans)
960                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
961                 if timedOut {
962                         c.UpdateCounter(cSubDelReqTimerExpiry)
963                         continue
964                 }
965                 break
966         }
967         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
968         return event
969 }
970
971 //-------------------------------------------------------------------
972 // handle from E2T Subscription Response
973 //-------------------------------------------------------------------
974 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
975         xapp.Logger.Info("MSG from E2T: %s", params.String())
976         c.UpdateCounter(cSubRespFromE2)
977
978         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
979         if err != nil {
980                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
981                 return
982         }
983         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
984         if err != nil {
985                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
986                 return
987         }
988         trans := subs.GetTransaction()
989         if trans == nil {
990                 err = fmt.Errorf("Ongoing transaction not found")
991                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
992                 return
993         }
994         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
995         if sendOk == false {
996                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
997                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
998         }
999         return
1000 }
1001
1002 //-------------------------------------------------------------------
1003 // handle from E2T Subscription Failure
1004 //-------------------------------------------------------------------
1005 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1006         xapp.Logger.Info("MSG from E2T: %s", params.String())
1007         c.UpdateCounter(cSubFailFromE2)
1008         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1009         if err != nil {
1010                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1011                 return
1012         }
1013         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1014         if err != nil {
1015                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1016                 return
1017         }
1018         trans := subs.GetTransaction()
1019         if trans == nil {
1020                 err = fmt.Errorf("Ongoing transaction not found")
1021                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1022                 return
1023         }
1024         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1025         if sendOk == false {
1026                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1027                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1028         }
1029         return
1030 }
1031
1032 //-------------------------------------------------------------------
1033 // handle from E2T Subscription Delete Response
1034 //-------------------------------------------------------------------
1035 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1036         xapp.Logger.Info("MSG from E2T: %s", params.String())
1037         c.UpdateCounter(cSubDelRespFromE2)
1038         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1039         if err != nil {
1040                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1041                 return
1042         }
1043         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1044         if err != nil {
1045                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1046                 return
1047         }
1048         trans := subs.GetTransaction()
1049         if trans == nil {
1050                 err = fmt.Errorf("Ongoing transaction not found")
1051                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1052                 return
1053         }
1054         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1055         if sendOk == false {
1056                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1057                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1058         }
1059         return
1060 }
1061
1062 //-------------------------------------------------------------------
1063 // handle from E2T Subscription Delete Failure
1064 //-------------------------------------------------------------------
1065 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1066         xapp.Logger.Info("MSG from E2T: %s", params.String())
1067         c.UpdateCounter(cSubDelFailFromE2)
1068         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1069         if err != nil {
1070                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1071                 return
1072         }
1073         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1074         if err != nil {
1075                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1076                 return
1077         }
1078         trans := subs.GetTransaction()
1079         if trans == nil {
1080                 err = fmt.Errorf("Ongoing transaction not found")
1081                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1082                 return
1083         }
1084         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1085         if sendOk == false {
1086                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1087                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1088         }
1089         return
1090 }
1091
1092 //-------------------------------------------------------------------
1093 //
1094 //-------------------------------------------------------------------
1095 func typeofSubsMessage(v interface{}) string {
1096         if v == nil {
1097                 return "NIL"
1098         }
1099         switch v.(type) {
1100         //case *e2ap.E2APSubscriptionRequest:
1101         //      return "SubReq"
1102         case *e2ap.E2APSubscriptionResponse:
1103                 return "SubResp"
1104         case *e2ap.E2APSubscriptionFailure:
1105                 return "SubFail"
1106         //case *e2ap.E2APSubscriptionDeleteRequest:
1107         //      return "SubDelReq"
1108         case *e2ap.E2APSubscriptionDeleteResponse:
1109                 return "SubDelResp"
1110         case *e2ap.E2APSubscriptionDeleteFailure:
1111                 return "SubDelFail"
1112         default:
1113                 return "Unknown"
1114         }
1115 }
1116
1117 //-------------------------------------------------------------------
1118 //
1119 //-------------------------------------------------------------------
1120 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1121         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1122         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1123         if err != nil {
1124                 xapp.Logger.Error("%v", err)
1125         }
1126 }
1127
1128 //-------------------------------------------------------------------
1129 //
1130 //-------------------------------------------------------------------
1131 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1132
1133         if removeSubscriptionFromDb == true {
1134                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1135                 c.RemoveSubscriptionFromDb(subs)
1136         } else {
1137                 // Update is needed for successful response and merge case here
1138                 if subs.RetryFromXapp == false {
1139                         c.WriteSubscriptionToDb(subs)
1140                 }
1141         }
1142         subs.RetryFromXapp = false
1143 }
1144
1145 //-------------------------------------------------------------------
1146 //
1147 //-------------------------------------------------------------------
1148 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1149         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1150         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1151         if err != nil {
1152                 xapp.Logger.Error("%v", err)
1153         }
1154 }
1155
1156 //-------------------------------------------------------------------
1157 //
1158 //-------------------------------------------------------------------
1159 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1160         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1161         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1162         if err != nil {
1163                 xapp.Logger.Error("%v", err)
1164         }
1165 }
1166
1167 //-------------------------------------------------------------------
1168 //
1169 //-------------------------------------------------------------------
1170 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1171
1172         if removeRestSubscriptionFromDb == true {
1173                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1174                 c.RemoveRESTSubscriptionFromDb(restSubId)
1175         } else {
1176                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1177         }
1178 }
1179
1180 //-------------------------------------------------------------------
1181 //
1182 //-------------------------------------------------------------------
1183 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1184         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1185         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1186         if err != nil {
1187                 xapp.Logger.Error("%v", err)
1188         }
1189 }
1190
1191 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1192
1193         const ricRequestorId = 123
1194         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1195
1196         // Send delete for every endpoint in the subscription
1197         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1198         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1199         subDelReqMsg.RequestId.Id = ricRequestorId
1200         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1201         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1202         if err != nil {
1203                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1204                 return
1205         }
1206         for _, endPoint := range subs.EpList.Endpoints {
1207                 params := &xapp.RMRParams{}
1208                 params.Mtype = mType
1209                 params.SubId = int(subs.GetReqId().InstanceId)
1210                 params.Xid = ""
1211                 params.Meid = subs.Meid
1212                 params.Src = endPoint.String()
1213                 params.PayloadLen = len(payload.Buf)
1214                 params.Payload = payload.Buf
1215                 params.Mbuf = nil
1216                 subs.DeleteFromDb = true
1217                 c.handleXAPPSubscriptionDeleteRequest(params)
1218         }
1219 }
1220
1221 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1222
1223         fmt.Println("CRESTSubscriptionRequest")
1224
1225         if p.SubscriptionID != "" {
1226                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1227         } else {
1228                 fmt.Println("  SubscriptionID = ''")
1229         }
1230
1231         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1232
1233         if p.ClientEndpoint.HTTPPort != nil {
1234                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1235         } else {
1236                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1237         }
1238
1239         if p.ClientEndpoint.RMRPort != nil {
1240                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1241         } else {
1242                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1243         }
1244
1245         if p.Meid != nil {
1246                 fmt.Printf("  Meid = %s\n", *p.Meid)
1247         } else {
1248                 fmt.Println("  Meid = nil")
1249         }
1250
1251         for _, subscriptionDetail := range p.SubscriptionDetails {
1252                 if p.RANFunctionID != nil {
1253                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1254                 } else {
1255                         fmt.Println("  RANFunctionID = nil")
1256                 }
1257                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1258                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1259
1260                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1261                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1262                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1263                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1264
1265                         if actionToBeSetup.SubsequentAction != nil {
1266                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1267                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1268                         } else {
1269                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1270                         }
1271                 }
1272         }
1273 }