3ebcbcd4789280ac83915ddd5f09519f395b94a9
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
76 var dbTryCount int
77
78 type Control struct {
79         *xapp.RMRClient
80         e2ap          *E2ap
81         registry      *Registry
82         tracker       *Tracker
83         e2SubsDb      Sdlnterface
84         restSubsDb    Sdlnterface
85         CntRecvMsg    uint64
86         ResetTestFlag bool
87         Counters      map[string]xapp.Counter
88         LoggerLevel   uint32
89 }
90
91 type RMRMeid struct {
92         PlmnID  string
93         EnbID   string
94         RanName string
95 }
96
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
99
100 func init() {
101         xapp.Logger.Info("SUBMGR")
102         viper.AutomaticEnv()
103         viper.SetEnvPrefix("submgr")
104         viper.AllowEmptyEnv(true)
105 }
106
107 func NewControl() *Control {
108
109         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
111
112         registry := new(Registry)
113         registry.Initialize()
114         registry.rtmgrClient = &rtmgrClient
115
116         tracker := new(Tracker)
117         tracker.Init()
118
119         c := &Control{e2ap: new(E2ap),
120                 registry:    registry,
121                 tracker:     tracker,
122                 e2SubsDb:    CreateSdl(),
123                 restSubsDb:  CreateRESTSdl(),
124                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
125                 LoggerLevel: 3,
126         }
127         c.ReadConfigParameters("")
128
129         // Register REST handler for testing support
130         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131         xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
133
134         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
135
136         if readSubsFromDb == "false" {
137                 return c
138         }
139
140         restDuplicateCtrl.Init()
141
142         // Read subscriptions from db
143         c.ReadE2Subscriptions()
144         c.ReadRESTSubscriptions()
145         return c
146 }
147
148 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
149         subscriptions, _ := c.registry.QueryHandler()
150         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
151 }
152
153 //-------------------------------------------------------------------
154 //
155 //-------------------------------------------------------------------
156 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
157         xapp.Logger.Info("GetAllRestSubscriptions() called")
158         response := c.registry.GetAllRestSubscriptions()
159         w.Write(response)
160 }
161
162 //-------------------------------------------------------------------
163 //
164 //-------------------------------------------------------------------
165 func (c *Control) ReadE2Subscriptions() error {
166         var err error
167         var subIds []uint32
168         var register map[uint32]*Subscription
169         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
170                 xapp.Logger.Info("Reading E2 subscriptions from db")
171                 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
172                 if err != nil {
173                         xapp.Logger.Error("%v", err)
174                         <-time.After(1 * time.Second)
175                 } else {
176                         c.registry.subIds = subIds
177                         c.registry.register = register
178                         c.HandleUncompletedSubscriptions(register)
179                         return nil
180                 }
181         }
182         xapp.Logger.Info("Continuing without retring")
183         return err
184 }
185
186 //-------------------------------------------------------------------
187 //
188 //-------------------------------------------------------------------
189 func (c *Control) ReadRESTSubscriptions() error {
190         var err error
191         var restSubscriptions map[string]*RESTSubscription
192         for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
193                 xapp.Logger.Info("Reading REST subscriptions from db")
194                 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
195                 if err != nil {
196                         xapp.Logger.Error("%v", err)
197                         <-time.After(1 * time.Second)
198                 } else {
199                         c.registry.restSubscriptions = restSubscriptions
200                         return nil
201                 }
202         }
203         xapp.Logger.Info("Continuing without retring")
204         return err
205 }
206
207 //-------------------------------------------------------------------
208 //
209 //-------------------------------------------------------------------
210 func (c *Control) ReadConfigParameters(f string) {
211
212         // viper.GetDuration returns nanoseconds
213         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
214         if e2tSubReqTimeout == 0 {
215                 e2tSubReqTimeout = 2000 * 1000000
216         }
217         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
218         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
219         if e2tSubDelReqTime == 0 {
220                 e2tSubDelReqTime = 2000 * 1000000
221         }
222         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
223         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
224         if e2tRecvMsgTimeout == 0 {
225                 e2tRecvMsgTimeout = 2000 * 1000000
226         }
227         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
228
229         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
230         if e2tMaxSubReqTryCount == 0 {
231                 e2tMaxSubReqTryCount = 1
232         }
233         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
234
235         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
236         if e2tMaxSubDelReqTryCount == 0 {
237                 e2tMaxSubDelReqTryCount = 1
238         }
239         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
240
241         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
242         if readSubsFromDb == "" {
243                 readSubsFromDb = "true"
244         }
245         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
246
247         dbTryCount = viper.GetInt("controls.dbTryCount")
248         if dbTryCount == 0 {
249                 dbTryCount = 200
250         }
251         xapp.Logger.Info("dbTryCount %v", dbTryCount)
252
253         dbRetryForever = viper.GetString("controls.dbRetryForever")
254         if dbRetryForever == "" {
255                 dbRetryForever = "true"
256         }
257         xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
258
259         c.LoggerLevel = viper.GetUint32("logger.level")
260         if c.LoggerLevel == 0 {
261                 c.LoggerLevel = 3
262         }
263         xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
264
265         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
266         // value 100ms used currently only in unittests.
267         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
268         if waitRouteCleanup_ms == 0 {
269                 waitRouteCleanup_ms = 5000 * 1000000
270         }
271         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
272 }
273
274 //-------------------------------------------------------------------
275 //
276 //-------------------------------------------------------------------
277 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
278
279         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
280         for subId, subs := range register {
281                 if subs.SubRespRcvd == false {
282                         subs.NoRespToXapp = true
283                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
284                         c.SendSubscriptionDeleteReq(subs)
285                 }
286         }
287 }
288
289 func (c *Control) ReadyCB(data interface{}) {
290         if c.RMRClient == nil {
291                 c.RMRClient = xapp.Rmr
292         }
293 }
294
295 func (c *Control) Run() {
296         xapp.SetReadyCB(c.ReadyCB, nil)
297         xapp.AddConfigChangeListener(c.ReadConfigParameters)
298         xapp.Run(c)
299 }
300
301 //-------------------------------------------------------------------
302 //
303 //-------------------------------------------------------------------
304 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
305
306         var restSubId string
307         var restSubscription *RESTSubscription
308         var err error
309
310         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
311         if p.SubscriptionID == "" {
312                 if exists {
313                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
314                         if restSubscription != nil {
315                                 restSubId = prevRestSubsId
316                                 if err == nil {
317                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
318                                 } else {
319                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
320                                 }
321                         } else {
322                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
323                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
324                         }
325                 }
326
327                 if restSubscription == nil {
328                         restSubId = ksuid.New().String()
329                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
330                         if err != nil {
331                                 xapp.Logger.Error("%s", err.Error())
332                                 c.UpdateCounter(cRestSubFailToXapp)
333                                 return nil, "", err
334                         }
335                 }
336         } else {
337                 restSubId = p.SubscriptionID
338
339                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
340
341                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
342                 if err != nil {
343                         xapp.Logger.Error("%s", err.Error())
344                         c.UpdateCounter(cRestSubFailToXapp)
345                         return nil, "", err
346                 }
347
348                 if !exists {
349                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
350                 } else {
351                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
352                 }
353         }
354
355         return restSubscription, restSubId, nil
356 }
357
358 //-------------------------------------------------------------------
359 //
360 //-------------------------------------------------------------------
361 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
362
363         c.CntRecvMsg++
364         c.UpdateCounter(cRestSubReqFromXapp)
365
366         subResp := models.SubscriptionResponse{}
367         p := params.(*models.SubscriptionParams)
368
369         if c.LoggerLevel > 2 {
370                 c.PrintRESTSubscriptionRequest(p)
371         }
372
373         if p.ClientEndpoint == nil {
374                 xapp.Logger.Error("ClientEndpoint == nil")
375                 c.UpdateCounter(cRestSubFailToXapp)
376                 return nil, fmt.Errorf("")
377         }
378
379         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
380         if err != nil {
381                 xapp.Logger.Error("%s", err.Error())
382                 c.UpdateCounter(cRestSubFailToXapp)
383                 return nil, err
384         }
385
386         md5sum, err := CalculateRequestMd5sum(params)
387         if err != nil {
388                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
389         }
390
391         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
392         if err != nil {
393                 xapp.Logger.Error("Failed to get/allocate REST subscription")
394                 return nil, err
395         }
396
397         subResp.SubscriptionID = &restSubId
398         subReqList := e2ap.SubscriptionRequestList{}
399         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
400         if err != nil {
401                 xapp.Logger.Error("%s", err.Error())
402                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
403                 c.registry.DeleteRESTSubscription(&restSubId)
404                 c.UpdateCounter(cRestSubFailToXapp)
405                 return nil, err
406         }
407
408         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
409         if duplicate {
410                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
411                 c.UpdateCounter(cRestSubRespToXapp)
412                 return &subResp, nil
413         }
414
415         c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
416
417         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
418
419         c.UpdateCounter(cRestSubRespToXapp)
420         return &subResp, nil
421 }
422
423 //-------------------------------------------------------------------
424 //
425 //-------------------------------------------------------------------
426
427 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
428         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
429
430         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
431
432         var xAppEventInstanceID int64
433         var e2EventInstanceID int64
434
435         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
436
437         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
438                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
439                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
440
441                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
442                 if trans == nil {
443                         // Send notification to xApp that prosessing of a Subscription Request has failed.
444                         err := fmt.Errorf("Tracking failure")
445                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
446                         continue
447                 }
448
449                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
450
451                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
452
453                 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
454
455                 if err != nil {
456                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
457                 } else {
458                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
459                         restSubscription.AddMd5Sum(md5sum)
460                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
461                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
462                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
463                 }
464                 trans.Release()
465         }
466 }
467
468 //-------------------------------------------------------------------
469 //
470 //------------------------------------------------------------------
471 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
472         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
473
474         err := c.tracker.Track(trans)
475         if err != nil {
476                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
477                 err = fmt.Errorf("Tracking failure")
478                 return nil, err
479         }
480
481         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
482         if err != nil {
483                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
484                 return nil, err
485         }
486
487         //
488         // Wake subs request
489         //
490         go c.handleSubscriptionCreate(subs, trans)
491         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
492
493         err = nil
494         if event != nil {
495                 switch themsg := event.(type) {
496                 case *e2ap.E2APSubscriptionResponse:
497                         trans.Release()
498                         return themsg, nil
499                 case *e2ap.E2APSubscriptionFailure:
500                         err = fmt.Errorf("E2 SubscriptionFailure received")
501                         return nil, err
502                 default:
503                         err = fmt.Errorf("unexpected E2 subscription response received")
504                         break
505                 }
506         } else {
507                 err = fmt.Errorf("E2 subscription response timeout")
508         }
509
510         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
511         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
512         return nil, err
513 }
514
515 //-------------------------------------------------------------------
516 //
517 //-------------------------------------------------------------------
518 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
519         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
520
521         // Send notification to xApp that prosessing of a Subscription Request has failed.
522         e2EventInstanceID := (int64)(0)
523         errorCause := err.Error()
524         resp := &models.SubscriptionResponse{
525                 SubscriptionID: restSubId,
526                 SubscriptionInstances: []*models.SubscriptionInstance{
527                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
528                                 ErrorCause:          &errorCause,
529                                 XappEventInstanceID: &xAppEventInstanceID},
530                 },
531         }
532         // Mark REST subscription request processed.
533         restSubscription.SetProcessed(err)
534         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
535         if trans != nil {
536                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
537                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
538         } else {
539                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
540                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
541         }
542
543         c.UpdateCounter(cRestSubFailNotifToXapp)
544         xapp.Subscription.Notify(resp, *clientEndpoint)
545 }
546
547 //-------------------------------------------------------------------
548 //
549 //-------------------------------------------------------------------
550 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
551         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
552
553         // Store successfully processed InstanceId for deletion
554         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
555         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
556
557         // Send notification to xApp that a Subscription Request has been processed.
558         resp := &models.SubscriptionResponse{
559                 SubscriptionID: restSubId,
560                 SubscriptionInstances: []*models.SubscriptionInstance{
561                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
562                                 ErrorCause:          nil,
563                                 XappEventInstanceID: &xAppEventInstanceID},
564                 },
565         }
566         // Mark REST subscription request processesd.
567         restSubscription.SetProcessed(nil)
568         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
569         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
570                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
571
572         c.UpdateCounter(cRestSubNotifToXapp)
573         xapp.Subscription.Notify(resp, *clientEndpoint)
574 }
575
576 //-------------------------------------------------------------------
577 //
578 //-------------------------------------------------------------------
579 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
580
581         c.CntRecvMsg++
582         c.UpdateCounter(cRestSubDelReqFromXapp)
583
584         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
585
586         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
587         if err != nil {
588                 xapp.Logger.Error("%s", err.Error())
589                 if restSubscription == nil {
590                         // Subscription was not found
591                         return nil
592                 } else {
593                         if restSubscription.SubReqOngoing == true {
594                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
595                                 xapp.Logger.Error("%s", err.Error())
596                                 return err
597                         } else if restSubscription.SubDelReqOngoing == true {
598                                 // Previous request for same restSubId still ongoing
599                                 return nil
600                         }
601                 }
602         }
603
604         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
605         go func() {
606                 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
607                 for _, instanceId := range restSubscription.InstanceIds {
608                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
609
610                         if err != nil {
611                                 xapp.Logger.Error("%s", err.Error())
612                                 //return err
613                         }
614                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
615                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
616                         restSubscription.DeleteE2InstanceId(instanceId)
617                 }
618                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
619                 c.registry.DeleteRESTSubscription(&restSubId)
620                 c.RemoveRESTSubscriptionFromDb(restSubId)
621         }()
622
623         c.UpdateCounter(cRestSubDelRespToXapp)
624
625         return nil
626 }
627
628 //-------------------------------------------------------------------
629 //
630 //-------------------------------------------------------------------
631 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
632
633         var xAppEventInstanceID int64
634         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
635         if err != nil {
636                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
637                         restSubId, instanceId, idstring(err, nil))
638                 return xAppEventInstanceID, nil
639         }
640
641         xAppEventInstanceID = int64(subs.ReqId.Id)
642         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
643         if trans == nil {
644                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
645                 xapp.Logger.Error("%s", err.Error())
646         }
647         defer trans.Release()
648
649         err = c.tracker.Track(trans)
650         if err != nil {
651                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
652                 xapp.Logger.Error("%s", err.Error())
653                 return xAppEventInstanceID, &time.ParseError{}
654         }
655         //
656         // Wake subs delete
657         //
658         go c.handleSubscriptionDelete(subs, trans)
659         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
660
661         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
662
663         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
664
665         return xAppEventInstanceID, nil
666 }
667
668 //-------------------------------------------------------------------
669 //
670 //-------------------------------------------------------------------
671 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
672         xapp.Logger.Info("QueryHandler() called")
673
674         c.CntRecvMsg++
675
676         return c.registry.QueryHandler()
677 }
678
679 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
680         xapp.Logger.Info("TestRestHandler() called")
681
682         pathParams := mux.Vars(r)
683         s := pathParams["testId"]
684
685         // This can be used to delete single subscription from db
686         if contains := strings.Contains(s, "deletesubid="); contains == true {
687                 var splits = strings.Split(s, "=")
688                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
689                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
690                         c.RemoveSubscriptionFromSdl(uint32(subId))
691                         return
692                 }
693         }
694
695         // This can be used to remove all subscriptions db from
696         if s == "emptydb" {
697                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
698                 c.RemoveAllSubscriptionsFromSdl()
699                 c.RemoveAllRESTSubscriptionsFromSdl()
700                 return
701         }
702
703         // This is meant to cause submgr's restart in testing
704         if s == "restart" {
705                 xapp.Logger.Info("os.Exit(1) called")
706                 os.Exit(1)
707         }
708
709         xapp.Logger.Info("Unsupported rest command received %s", s)
710 }
711
712 //-------------------------------------------------------------------
713 //
714 //-------------------------------------------------------------------
715
716 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
717         params := &xapp.RMRParams{}
718         params.Mtype = trans.GetMtype()
719         params.SubId = int(subs.GetReqId().InstanceId)
720         params.Xid = ""
721         params.Meid = subs.GetMeid()
722         params.Src = ""
723         params.PayloadLen = len(trans.Payload.Buf)
724         params.Payload = trans.Payload.Buf
725         params.Mbuf = nil
726         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
727         err = c.SendWithRetry(params, false, 5)
728         if err != nil {
729                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
730         }
731         return err
732 }
733
734 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
735
736         params := &xapp.RMRParams{}
737         params.Mtype = trans.GetMtype()
738         params.SubId = int(subs.GetReqId().InstanceId)
739         params.Xid = trans.GetXid()
740         params.Meid = trans.GetMeid()
741         params.Src = ""
742         params.PayloadLen = len(trans.Payload.Buf)
743         params.Payload = trans.Payload.Buf
744         params.Mbuf = nil
745         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
746         err = c.SendWithRetry(params, false, 5)
747         if err != nil {
748                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
749         }
750         return err
751 }
752
753 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
754         if c.RMRClient == nil {
755                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
756                 xapp.Logger.Error("%s", err.Error())
757                 return
758         }
759         c.CntRecvMsg++
760
761         defer c.RMRClient.Free(msg.Mbuf)
762
763         // xapp-frame might use direct access to c buffer and
764         // when msg.Mbuf is freed, someone might take it into use
765         // and payload data might be invalid inside message handle function
766         //
767         // subscriptions won't load system a lot so there is no
768         // real performance hit by cloning buffer into new go byte slice
769         cPay := append(msg.Payload[:0:0], msg.Payload...)
770         msg.Payload = cPay
771         msg.PayloadLen = len(cPay)
772
773         switch msg.Mtype {
774         case xapp.RIC_SUB_REQ:
775                 go c.handleXAPPSubscriptionRequest(msg)
776         case xapp.RIC_SUB_RESP:
777                 go c.handleE2TSubscriptionResponse(msg)
778         case xapp.RIC_SUB_FAILURE:
779                 go c.handleE2TSubscriptionFailure(msg)
780         case xapp.RIC_SUB_DEL_REQ:
781                 go c.handleXAPPSubscriptionDeleteRequest(msg)
782         case xapp.RIC_SUB_DEL_RESP:
783                 go c.handleE2TSubscriptionDeleteResponse(msg)
784         case xapp.RIC_SUB_DEL_FAILURE:
785                 go c.handleE2TSubscriptionDeleteFailure(msg)
786         default:
787                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
788         }
789         return
790 }
791
792 //-------------------------------------------------------------------
793 // handle from XAPP Subscription Request
794 //------------------------------------------------------------------
795 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
796         xapp.Logger.Info("MSG from XAPP: %s", params.String())
797         c.UpdateCounter(cSubReqFromXapp)
798
799         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
800         if err != nil {
801                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
802                 return
803         }
804
805         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
806         if trans == nil {
807                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
808                 return
809         }
810         defer trans.Release()
811
812         if err = c.tracker.Track(trans); err != nil {
813                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
814                 return
815         }
816
817         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
818         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
819         if err != nil {
820                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
821                 return
822         }
823
824         c.wakeSubscriptionRequest(subs, trans)
825 }
826
827 //-------------------------------------------------------------------
828 // Wake Subscription Request to E2node
829 //------------------------------------------------------------------
830 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
831
832         go c.handleSubscriptionCreate(subs, trans)
833         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
834         var err error
835         if event != nil {
836                 switch themsg := event.(type) {
837                 case *e2ap.E2APSubscriptionResponse:
838                         themsg.RequestId.Id = trans.RequestId.Id
839                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
840                         if err == nil {
841                                 trans.Release()
842                                 c.UpdateCounter(cSubRespToXapp)
843                                 c.rmrSendToXapp("", subs, trans)
844                                 return
845                         }
846                 case *e2ap.E2APSubscriptionFailure:
847                         themsg.RequestId.Id = trans.RequestId.Id
848                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
849                         if err == nil {
850                                 c.UpdateCounter(cSubFailToXapp)
851                                 c.rmrSendToXapp("", subs, trans)
852                         }
853                 default:
854                         break
855                 }
856         }
857         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
858         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
859 }
860
861 //-------------------------------------------------------------------
862 // handle from XAPP Subscription Delete Request
863 //------------------------------------------------------------------
864 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
865         xapp.Logger.Info("MSG from XAPP: %s", params.String())
866         c.UpdateCounter(cSubDelReqFromXapp)
867
868         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
869         if err != nil {
870                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
871                 return
872         }
873
874         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
875         if trans == nil {
876                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
877                 return
878         }
879         defer trans.Release()
880
881         err = c.tracker.Track(trans)
882         if err != nil {
883                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
884                 return
885         }
886
887         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
888         if err != nil {
889                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
890                 return
891         }
892
893         //
894         // Wake subs delete
895         //
896         go c.handleSubscriptionDelete(subs, trans)
897         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
898
899         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
900
901         if subs.NoRespToXapp == true {
902                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
903                 return
904         }
905
906         // Whatever is received success, fail or timeout, send successful delete response
907         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
908         subDelRespMsg.RequestId.Id = trans.RequestId.Id
909         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
910         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
911         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
912         if err == nil {
913                 c.UpdateCounter(cSubDelRespToXapp)
914                 c.rmrSendToXapp("", subs, trans)
915         }
916
917         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
918         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
919 }
920
921 //-------------------------------------------------------------------
922 // SUBS CREATE Handling
923 //-------------------------------------------------------------------
924 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
925
926         var removeSubscriptionFromDb bool = false
927         trans := c.tracker.NewSubsTransaction(subs)
928         subs.WaitTransactionTurn(trans)
929         defer subs.ReleaseTransactionTurn(trans)
930         defer trans.Release()
931
932         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
933
934         subRfMsg, valid := subs.GetCachedResponse()
935         if subRfMsg == nil && valid == true {
936                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
937                 switch event.(type) {
938                 case *e2ap.E2APSubscriptionResponse:
939                         subRfMsg, valid = subs.SetCachedResponse(event, true)
940                         subs.SubRespRcvd = true
941                 case *e2ap.E2APSubscriptionFailure:
942                         removeSubscriptionFromDb = true
943                         subRfMsg, valid = subs.SetCachedResponse(event, false)
944                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
945                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
946                 case *SubmgrRestartTestEvent:
947                         // This simulates that no response has been received and after restart subscriptions are restored from db
948                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
949                         return
950                 default:
951                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
952                         removeSubscriptionFromDb = true
953                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
954                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
955                 }
956                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
957         } else {
958                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
959         }
960
961         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
962         if valid == false {
963                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
964         }
965
966         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
967         parentTrans.SendEvent(subRfMsg, 0)
968 }
969
970 //-------------------------------------------------------------------
971 // SUBS DELETE Handling
972 //-------------------------------------------------------------------
973
974 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
975
976         trans := c.tracker.NewSubsTransaction(subs)
977         subs.WaitTransactionTurn(trans)
978         defer subs.ReleaseTransactionTurn(trans)
979         defer trans.Release()
980
981         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
982
983         subs.mutex.Lock()
984
985         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
986                 subs.valid = false
987                 subs.mutex.Unlock()
988                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
989         } else {
990                 subs.mutex.Unlock()
991         }
992         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
993         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
994         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
995         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
996         c.registry.UpdateSubscriptionToDb(subs, c)
997         parentTrans.SendEvent(nil, 0)
998 }
999
1000 //-------------------------------------------------------------------
1001 // send to E2T Subscription Request
1002 //-------------------------------------------------------------------
1003 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1004         var err error
1005         var event interface{} = nil
1006         var timedOut bool = false
1007         const ricRequestorId = 123
1008
1009         subReqMsg := subs.SubReqMsg
1010         subReqMsg.RequestId = subs.GetReqId().RequestId
1011         subReqMsg.RequestId.Id = ricRequestorId
1012         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1013         if err != nil {
1014                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1015                 return event
1016         }
1017
1018         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1019         c.WriteSubscriptionToDb(subs)
1020
1021         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1022                 desc := fmt.Sprintf("(retry %d)", retries)
1023                 if retries == 0 {
1024                         c.UpdateCounter(cSubReqToE2)
1025                 } else {
1026                         c.UpdateCounter(cSubReReqToE2)
1027                 }
1028                 c.rmrSendToE2T(desc, subs, trans)
1029                 if subs.DoNotWaitSubResp == false {
1030                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1031                         if timedOut {
1032                                 c.UpdateCounter(cSubReqTimerExpiry)
1033                                 continue
1034                         }
1035                 } else {
1036                         // Simulating case where subscrition request has been sent but response has not been received before restart
1037                         event = &SubmgrRestartTestEvent{}
1038                 }
1039                 break
1040         }
1041         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1042         return event
1043 }
1044
1045 //-------------------------------------------------------------------
1046 // send to E2T Subscription Delete Request
1047 //-------------------------------------------------------------------
1048
1049 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1050         var err error
1051         var event interface{}
1052         var timedOut bool
1053         const ricRequestorId = 123
1054
1055         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1056         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1057         subDelReqMsg.RequestId.Id = ricRequestorId
1058         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1059         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1060         if err != nil {
1061                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1062                 return event
1063         }
1064
1065         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1066                 desc := fmt.Sprintf("(retry %d)", retries)
1067                 if retries == 0 {
1068                         c.UpdateCounter(cSubDelReqToE2)
1069                 } else {
1070                         c.UpdateCounter(cSubDelReReqToE2)
1071                 }
1072                 c.rmrSendToE2T(desc, subs, trans)
1073                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1074                 if timedOut {
1075                         c.UpdateCounter(cSubDelReqTimerExpiry)
1076                         continue
1077                 }
1078                 break
1079         }
1080         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1081         return event
1082 }
1083
1084 //-------------------------------------------------------------------
1085 // handle from E2T Subscription Response
1086 //-------------------------------------------------------------------
1087 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1088         xapp.Logger.Info("MSG from E2T: %s", params.String())
1089         c.UpdateCounter(cSubRespFromE2)
1090
1091         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1092         if err != nil {
1093                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1094                 return
1095         }
1096         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1097         if err != nil {
1098                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1099                 return
1100         }
1101         trans := subs.GetTransaction()
1102         if trans == nil {
1103                 err = fmt.Errorf("Ongoing transaction not found")
1104                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1105                 return
1106         }
1107         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1108         if sendOk == false {
1109                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1110                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1111         }
1112         return
1113 }
1114
1115 //-------------------------------------------------------------------
1116 // handle from E2T Subscription Failure
1117 //-------------------------------------------------------------------
1118 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1119         xapp.Logger.Info("MSG from E2T: %s", params.String())
1120         c.UpdateCounter(cSubFailFromE2)
1121         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1122         if err != nil {
1123                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1124                 return
1125         }
1126         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1127         if err != nil {
1128                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1129                 return
1130         }
1131         trans := subs.GetTransaction()
1132         if trans == nil {
1133                 err = fmt.Errorf("Ongoing transaction not found")
1134                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1135                 return
1136         }
1137         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1138         if sendOk == false {
1139                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1140                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1141         }
1142         return
1143 }
1144
1145 //-------------------------------------------------------------------
1146 // handle from E2T Subscription Delete Response
1147 //-------------------------------------------------------------------
1148 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1149         xapp.Logger.Info("MSG from E2T: %s", params.String())
1150         c.UpdateCounter(cSubDelRespFromE2)
1151         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1152         if err != nil {
1153                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1154                 return
1155         }
1156         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1157         if err != nil {
1158                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1159                 return
1160         }
1161         trans := subs.GetTransaction()
1162         if trans == nil {
1163                 err = fmt.Errorf("Ongoing transaction not found")
1164                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1165                 return
1166         }
1167         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1168         if sendOk == false {
1169                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1170                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1171         }
1172         return
1173 }
1174
1175 //-------------------------------------------------------------------
1176 // handle from E2T Subscription Delete Failure
1177 //-------------------------------------------------------------------
1178 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1179         xapp.Logger.Info("MSG from E2T: %s", params.String())
1180         c.UpdateCounter(cSubDelFailFromE2)
1181         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1182         if err != nil {
1183                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1184                 return
1185         }
1186         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1187         if err != nil {
1188                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1189                 return
1190         }
1191         trans := subs.GetTransaction()
1192         if trans == nil {
1193                 err = fmt.Errorf("Ongoing transaction not found")
1194                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1195                 return
1196         }
1197         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1198         if sendOk == false {
1199                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1200                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1201         }
1202         return
1203 }
1204
1205 //-------------------------------------------------------------------
1206 //
1207 //-------------------------------------------------------------------
1208 func typeofSubsMessage(v interface{}) string {
1209         if v == nil {
1210                 return "NIL"
1211         }
1212         switch v.(type) {
1213         //case *e2ap.E2APSubscriptionRequest:
1214         //      return "SubReq"
1215         case *e2ap.E2APSubscriptionResponse:
1216                 return "SubResp"
1217         case *e2ap.E2APSubscriptionFailure:
1218                 return "SubFail"
1219         //case *e2ap.E2APSubscriptionDeleteRequest:
1220         //      return "SubDelReq"
1221         case *e2ap.E2APSubscriptionDeleteResponse:
1222                 return "SubDelResp"
1223         case *e2ap.E2APSubscriptionDeleteFailure:
1224                 return "SubDelFail"
1225         default:
1226                 return "Unknown"
1227         }
1228 }
1229
1230 //-------------------------------------------------------------------
1231 //
1232 //-------------------------------------------------------------------
1233 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1234         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1235         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1236         if err != nil {
1237                 xapp.Logger.Error("%v", err)
1238         }
1239 }
1240
1241 //-------------------------------------------------------------------
1242 //
1243 //-------------------------------------------------------------------
1244 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1245
1246         if removeSubscriptionFromDb == true {
1247                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1248                 c.RemoveSubscriptionFromDb(subs)
1249         } else {
1250                 // Update is needed for successful response and merge case here
1251                 if subs.RetryFromXapp == false {
1252                         c.WriteSubscriptionToDb(subs)
1253                 }
1254         }
1255         subs.RetryFromXapp = false
1256 }
1257
1258 //-------------------------------------------------------------------
1259 //
1260 //-------------------------------------------------------------------
1261 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1262         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1263         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1264         if err != nil {
1265                 xapp.Logger.Error("%v", err)
1266         }
1267 }
1268
1269 //-------------------------------------------------------------------
1270 //
1271 //-------------------------------------------------------------------
1272 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1273         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1274         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1275         if err != nil {
1276                 xapp.Logger.Error("%v", err)
1277         }
1278 }
1279
1280 //-------------------------------------------------------------------
1281 //
1282 //-------------------------------------------------------------------
1283 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1284
1285         if removeRestSubscriptionFromDb == true {
1286                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1287                 c.RemoveRESTSubscriptionFromDb(restSubId)
1288         } else {
1289                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1290         }
1291 }
1292
1293 //-------------------------------------------------------------------
1294 //
1295 //-------------------------------------------------------------------
1296 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1297         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1298         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1299         if err != nil {
1300                 xapp.Logger.Error("%v", err)
1301         }
1302 }
1303
1304 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1305
1306         const ricRequestorId = 123
1307         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1308
1309         // Send delete for every endpoint in the subscription
1310         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1311         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1312         subDelReqMsg.RequestId.Id = ricRequestorId
1313         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1314         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1315         if err != nil {
1316                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1317                 return
1318         }
1319         for _, endPoint := range subs.EpList.Endpoints {
1320                 params := &xapp.RMRParams{}
1321                 params.Mtype = mType
1322                 params.SubId = int(subs.GetReqId().InstanceId)
1323                 params.Xid = ""
1324                 params.Meid = subs.Meid
1325                 params.Src = endPoint.String()
1326                 params.PayloadLen = len(payload.Buf)
1327                 params.Payload = payload.Buf
1328                 params.Mbuf = nil
1329                 subs.DeleteFromDb = true
1330                 c.handleXAPPSubscriptionDeleteRequest(params)
1331         }
1332 }
1333
1334 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1335
1336         fmt.Println("CRESTSubscriptionRequest")
1337
1338         if p == nil {
1339                 return
1340         }
1341
1342         if p.SubscriptionID != "" {
1343                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1344         } else {
1345                 fmt.Println("  SubscriptionID = ''")
1346         }
1347
1348         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1349
1350         if p.ClientEndpoint.HTTPPort != nil {
1351                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1352         } else {
1353                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1354         }
1355
1356         if p.ClientEndpoint.RMRPort != nil {
1357                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1358         } else {
1359                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1360         }
1361
1362         if p.Meid != nil {
1363                 fmt.Printf("  Meid = %s\n", *p.Meid)
1364         } else {
1365                 fmt.Println("  Meid = nil")
1366         }
1367
1368         for _, subscriptionDetail := range p.SubscriptionDetails {
1369                 if p.RANFunctionID != nil {
1370                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1371                 } else {
1372                         fmt.Println("  RANFunctionID = nil")
1373                 }
1374                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1375                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1376
1377                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1378                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1379                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1380                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1381
1382                         if actionToBeSetup.SubsequentAction != nil {
1383                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1384                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1385                         } else {
1386                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1387                         }
1388                 }
1389         }
1390 }