Added duplicate detection changes
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 if entry != nil {
50                         retval += filler + entry.String()
51                         filler = " "
52                 } else {
53                         retval += filler + "(NIL)"
54                 }
55         }
56         if err != nil {
57                 retval += filler + "err(" + err.Error() + ")"
58                 filler = " "
59         }
60         return retval
61 }
62
63 //-----------------------------------------------------------------------------
64 //
65 //-----------------------------------------------------------------------------
66
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64    // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75
76 type Control struct {
77         *xapp.RMRClient
78         e2ap          *E2ap
79         registry      *Registry
80         tracker       *Tracker
81         e2SubsDb      Sdlnterface
82         restSubsDb    Sdlnterface
83         CntRecvMsg    uint64
84         ResetTestFlag bool
85         Counters      map[string]xapp.Counter
86         LoggerLevel   uint32
87 }
88
89 type RMRMeid struct {
90         PlmnID  string
91         EnbID   string
92         RanName string
93 }
94
95 type SubmgrRestartTestEvent struct{}
96 type SubmgrRestartUpEvent struct{}
97
98 func init() {
99         xapp.Logger.Info("SUBMGR")
100         viper.AutomaticEnv()
101         viper.SetEnvPrefix("submgr")
102         viper.AllowEmptyEnv(true)
103 }
104
105 func NewControl() *Control {
106
107         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
108         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
109
110         registry := new(Registry)
111         registry.Initialize()
112         registry.rtmgrClient = &rtmgrClient
113
114         tracker := new(Tracker)
115         tracker.Init()
116
117         c := &Control{e2ap: new(E2ap),
118                 registry:    registry,
119                 tracker:     tracker,
120                 e2SubsDb:    CreateSdl(),
121                 restSubsDb:  CreateRESTSdl(),
122                 Counters:    xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
123                 LoggerLevel: 3,
124         }
125         c.ReadConfigParameters("")
126
127         // Register REST handler for testing support
128         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
129         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
130
131         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
132
133         if readSubsFromDb == "false" {
134                 return c
135         }
136
137         restDuplicateCtrl.Init()
138
139         // Read subscriptions from db
140         xapp.Logger.Info("Reading subscriptions from db")
141         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
142         if err != nil {
143                 xapp.Logger.Error("%v", err)
144         } else {
145                 c.registry.subIds = subIds
146                 c.registry.register = register
147                 c.HandleUncompletedSubscriptions(register)
148         }
149
150         restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
151         if err != nil {
152                 xapp.Logger.Error("%v", err)
153         } else {
154                 c.registry.restSubscriptions = restSubscriptions
155         }
156         return c
157 }
158
159 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
160         subscriptions, _ := c.registry.QueryHandler()
161         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
162 }
163
164 //-------------------------------------------------------------------
165 //
166 //-------------------------------------------------------------------
167 func (c *Control) ReadConfigParameters(f string) {
168
169         // viper.GetDuration returns nanoseconds
170         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
171         if e2tSubReqTimeout == 0 {
172                 e2tSubReqTimeout = 2000 * 1000000
173         }
174         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
175         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
176         if e2tSubDelReqTime == 0 {
177                 e2tSubDelReqTime = 2000 * 1000000
178         }
179         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
180         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
181         if e2tRecvMsgTimeout == 0 {
182                 e2tRecvMsgTimeout = 2000 * 1000000
183         }
184         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
185
186         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
187         // value 100ms used currently only in unittests.
188         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
189         if waitRouteCleanup_ms == 0 {
190                 waitRouteCleanup_ms = 5000 * 1000000
191         }
192         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
193
194         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
195         if e2tMaxSubReqTryCount == 0 {
196                 e2tMaxSubReqTryCount = 1
197         }
198         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
199
200         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
201         if e2tMaxSubDelReqTryCount == 0 {
202                 e2tMaxSubDelReqTryCount = 1
203         }
204         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
205
206         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
207         if readSubsFromDb == "" {
208                 readSubsFromDb = "true"
209         }
210         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
211         c.LoggerLevel = viper.GetUint32("logger.level")
212         if c.LoggerLevel == 0 {
213                 c.LoggerLevel = 3
214         }
215 }
216
217 //-------------------------------------------------------------------
218 //
219 //-------------------------------------------------------------------
220 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
221
222         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
223         for subId, subs := range register {
224                 if subs.SubRespRcvd == false {
225                         subs.NoRespToXapp = true
226                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
227                         c.SendSubscriptionDeleteReq(subs)
228                 }
229         }
230 }
231
232 func (c *Control) ReadyCB(data interface{}) {
233         if c.RMRClient == nil {
234                 c.RMRClient = xapp.Rmr
235         }
236 }
237
238 func (c *Control) Run() {
239         xapp.SetReadyCB(c.ReadyCB, nil)
240         xapp.AddConfigChangeListener(c.ReadConfigParameters)
241         xapp.Run(c)
242 }
243
244 //-------------------------------------------------------------------
245 //
246 //-------------------------------------------------------------------
247 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
248
249         var restSubId string
250         var restSubscription *RESTSubscription
251         var err error
252
253         prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
254         if p.SubscriptionID == "" {
255                 if exists {
256                         restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
257                         if restSubscription != nil {
258                                 restSubId = prevRestSubsId
259                                 if err == nil {
260                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
261                                 } else {
262                                         xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
263                                 }
264                         } else {
265                                 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
266                                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
267                         }
268                 }
269
270                 if restSubscription == nil {
271                         restSubId = ksuid.New().String()
272                         restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
273                         if err != nil {
274                                 xapp.Logger.Error("%s", err.Error())
275                                 c.UpdateCounter(cRestSubFailToXapp)
276                                 return nil, "", err
277                         }
278                 }
279         } else {
280                 restSubId = p.SubscriptionID
281
282                 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
283
284                 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
285                 if err != nil {
286                         xapp.Logger.Error("%s", err.Error())
287                         c.UpdateCounter(cRestSubFailToXapp)
288                         return nil, "", err
289                 }
290
291                 if !exists {
292                         xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
293                 } else {
294                         xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
295                 }
296         }
297
298         return restSubscription, restSubId, nil
299 }
300
301 //-------------------------------------------------------------------
302 //
303 //-------------------------------------------------------------------
304 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
305
306         c.CntRecvMsg++
307         c.UpdateCounter(cRestSubReqFromXapp)
308
309         subResp := models.SubscriptionResponse{}
310         p := params.(*models.SubscriptionParams)
311
312         if c.LoggerLevel > 2 {
313                 c.PrintRESTSubscriptionRequest(p)
314         }
315
316         if p.ClientEndpoint == nil {
317                 xapp.Logger.Error("ClientEndpoint == nil")
318                 c.UpdateCounter(cRestSubFailToXapp)
319                 return nil, fmt.Errorf("")
320         }
321
322         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
323         if err != nil {
324                 xapp.Logger.Error("%s", err.Error())
325                 c.UpdateCounter(cRestSubFailToXapp)
326                 return nil, err
327         }
328
329         md5sum, err := CalculateRequestMd5sum(params)
330         if err != nil {
331                 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
332         }
333
334         restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
335         if err != nil {
336                 xapp.Logger.Error("Failed to get/allocate REST subscription")
337                 return nil, err
338         }
339
340         subResp.SubscriptionID = &restSubId
341         subReqList := e2ap.SubscriptionRequestList{}
342         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
343         if err != nil {
344                 xapp.Logger.Error("%s", err.Error())
345                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
346                 c.registry.DeleteRESTSubscription(&restSubId)
347                 c.UpdateCounter(cRestSubFailToXapp)
348                 return nil, err
349         }
350
351         duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
352         if duplicate {
353                 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
354                 c.UpdateCounter(cRestSubRespToXapp)
355                 return &subResp, nil
356         }
357
358         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
359
360         c.UpdateCounter(cRestSubRespToXapp)
361         return &subResp, nil
362 }
363
364 //-------------------------------------------------------------------
365 //
366 //-------------------------------------------------------------------
367
368 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
369         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
370
371         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
372
373         var xAppEventInstanceID int64
374         var e2EventInstanceID int64
375
376         defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
377
378         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
379                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
380                 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
381
382                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
383                 if trans == nil {
384                         // Send notification to xApp that prosessing of a Subscription Request has failed.
385                         err := fmt.Errorf("Tracking failure")
386                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
387                         continue
388                 }
389
390                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
391
392                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
393
394                 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
395
396                 if err != nil {
397                         c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
398                 } else {
399                         e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
400                         restSubscription.AddMd5Sum(md5sum)
401                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
402                                 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
403                         c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
404                 }
405                 trans.Release()
406         }
407 }
408
409 //-------------------------------------------------------------------
410 //
411 //------------------------------------------------------------------
412 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
413         restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
414
415         err := c.tracker.Track(trans)
416         if err != nil {
417                 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
418                 err = fmt.Errorf("Tracking failure")
419                 return nil, err
420         }
421
422         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
423         if err != nil {
424                 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
425                 return nil, err
426         }
427
428         //
429         // Wake subs request
430         //
431         go c.handleSubscriptionCreate(subs, trans)
432         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
433
434         err = nil
435         if event != nil {
436                 switch themsg := event.(type) {
437                 case *e2ap.E2APSubscriptionResponse:
438                         trans.Release()
439                         return themsg, nil
440                 case *e2ap.E2APSubscriptionFailure:
441                         err = fmt.Errorf("E2 SubscriptionFailure received")
442                         return nil, err
443                 default:
444                         err = fmt.Errorf("unexpected E2 subscription response received")
445                         break
446                 }
447         } else {
448                 err = fmt.Errorf("E2 subscription response timeout")
449         }
450
451         xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
452         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
453         return nil, err
454 }
455
456 //-------------------------------------------------------------------
457 //
458 //-------------------------------------------------------------------
459 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
460         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
461
462         // Send notification to xApp that prosessing of a Subscription Request has failed.
463         e2EventInstanceID := (int64)(0)
464         errorCause := err.Error()
465         resp := &models.SubscriptionResponse{
466                 SubscriptionID: restSubId,
467                 SubscriptionInstances: []*models.SubscriptionInstance{
468                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
469                                 ErrorCause:          &errorCause,
470                                 XappEventInstanceID: &xAppEventInstanceID},
471                 },
472         }
473         // Mark REST subscription request processed.
474         restSubscription.SetProcessed(err)
475         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
476         if trans != nil {
477                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
478                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
479         } else {
480                 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
481                         errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
482         }
483
484         c.UpdateCounter(cRestSubFailNotifToXapp)
485         xapp.Subscription.Notify(resp, *clientEndpoint)
486 }
487
488 //-------------------------------------------------------------------
489 //
490 //-------------------------------------------------------------------
491 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
492         clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
493
494         // Store successfully processed InstanceId for deletion
495         restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
496         restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
497
498         // Send notification to xApp that a Subscription Request has been processed.
499         resp := &models.SubscriptionResponse{
500                 SubscriptionID: restSubId,
501                 SubscriptionInstances: []*models.SubscriptionInstance{
502                         &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
503                                 ErrorCause:          nil,
504                                 XappEventInstanceID: &xAppEventInstanceID},
505                 },
506         }
507         // Mark REST subscription request processesd.
508         restSubscription.SetProcessed(nil)
509         c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
510         xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
511                 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
512
513         c.UpdateCounter(cRestSubNotifToXapp)
514         xapp.Subscription.Notify(resp, *clientEndpoint)
515 }
516
517 //-------------------------------------------------------------------
518 //
519 //-------------------------------------------------------------------
520 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
521
522         c.CntRecvMsg++
523         c.UpdateCounter(cRestSubDelReqFromXapp)
524
525         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
526
527         restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
528         if err != nil {
529                 xapp.Logger.Error("%s", err.Error())
530                 if restSubscription == nil {
531                         // Subscription was not found
532                         return nil
533                 } else {
534                         if restSubscription.SubReqOngoing == true {
535                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
536                                 xapp.Logger.Error("%s", err.Error())
537                                 return err
538                         } else if restSubscription.SubDelReqOngoing == true {
539                                 // Previous request for same restSubId still ongoing
540                                 return nil
541                         }
542                 }
543         }
544
545         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
546         go func() {
547                 xapp.Logger.Info("Deleteting instances = %v", restSubscription.InstanceIds)
548                 for _, instanceId := range restSubscription.InstanceIds {
549                         xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
550
551                         if err != nil {
552                                 xapp.Logger.Error("%s", err.Error())
553                                 //return err
554                         }
555                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
556                         restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
557                         restSubscription.DeleteE2InstanceId(instanceId)
558                 }
559                 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
560                 c.registry.DeleteRESTSubscription(&restSubId)
561                 c.RemoveRESTSubscriptionFromDb(restSubId)
562         }()
563
564         c.UpdateCounter(cRestSubDelRespToXapp)
565
566         return nil
567 }
568
569 //-------------------------------------------------------------------
570 //
571 //-------------------------------------------------------------------
572 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
573
574         var xAppEventInstanceID int64
575         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
576         if err != nil {
577                 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
578                         restSubId, instanceId, idstring(err, nil))
579                 return xAppEventInstanceID, nil
580         }
581
582         xAppEventInstanceID = int64(subs.ReqId.Id)
583         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
584         if trans == nil {
585                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
586                 xapp.Logger.Error("%s", err.Error())
587         }
588         defer trans.Release()
589
590         err = c.tracker.Track(trans)
591         if err != nil {
592                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
593                 xapp.Logger.Error("%s", err.Error())
594                 return xAppEventInstanceID, &time.ParseError{}
595         }
596         //
597         // Wake subs delete
598         //
599         go c.handleSubscriptionDelete(subs, trans)
600         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
601
602         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
603
604         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
605
606         return xAppEventInstanceID, nil
607 }
608
609 //-------------------------------------------------------------------
610 //
611 //-------------------------------------------------------------------
612 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
613         xapp.Logger.Info("QueryHandler() called")
614
615         c.CntRecvMsg++
616
617         return c.registry.QueryHandler()
618 }
619
620 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
621         xapp.Logger.Info("TestRestHandler() called")
622
623         pathParams := mux.Vars(r)
624         s := pathParams["testId"]
625
626         // This can be used to delete single subscription from db
627         if contains := strings.Contains(s, "deletesubid="); contains == true {
628                 var splits = strings.Split(s, "=")
629                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
630                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
631                         c.RemoveSubscriptionFromSdl(uint32(subId))
632                         return
633                 }
634         }
635
636         // This can be used to remove all subscriptions db from
637         if s == "emptydb" {
638                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
639                 c.RemoveAllSubscriptionsFromSdl()
640                 c.RemoveAllRESTSubscriptionsFromSdl()
641                 return
642         }
643
644         // This is meant to cause submgr's restart in testing
645         if s == "restart" {
646                 xapp.Logger.Info("os.Exit(1) called")
647                 os.Exit(1)
648         }
649
650         xapp.Logger.Info("Unsupported rest command received %s", s)
651 }
652
653 //-------------------------------------------------------------------
654 //
655 //-------------------------------------------------------------------
656
657 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
658         params := &xapp.RMRParams{}
659         params.Mtype = trans.GetMtype()
660         params.SubId = int(subs.GetReqId().InstanceId)
661         params.Xid = ""
662         params.Meid = subs.GetMeid()
663         params.Src = ""
664         params.PayloadLen = len(trans.Payload.Buf)
665         params.Payload = trans.Payload.Buf
666         params.Mbuf = nil
667         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
668         err = c.SendWithRetry(params, false, 5)
669         if err != nil {
670                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
671         }
672         return err
673 }
674
675 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
676
677         params := &xapp.RMRParams{}
678         params.Mtype = trans.GetMtype()
679         params.SubId = int(subs.GetReqId().InstanceId)
680         params.Xid = trans.GetXid()
681         params.Meid = trans.GetMeid()
682         params.Src = ""
683         params.PayloadLen = len(trans.Payload.Buf)
684         params.Payload = trans.Payload.Buf
685         params.Mbuf = nil
686         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
687         err = c.SendWithRetry(params, false, 5)
688         if err != nil {
689                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
690         }
691         return err
692 }
693
694 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
695         if c.RMRClient == nil {
696                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
697                 xapp.Logger.Error("%s", err.Error())
698                 return
699         }
700         c.CntRecvMsg++
701
702         defer c.RMRClient.Free(msg.Mbuf)
703
704         // xapp-frame might use direct access to c buffer and
705         // when msg.Mbuf is freed, someone might take it into use
706         // and payload data might be invalid inside message handle function
707         //
708         // subscriptions won't load system a lot so there is no
709         // real performance hit by cloning buffer into new go byte slice
710         cPay := append(msg.Payload[:0:0], msg.Payload...)
711         msg.Payload = cPay
712         msg.PayloadLen = len(cPay)
713
714         switch msg.Mtype {
715         case xapp.RIC_SUB_REQ:
716                 go c.handleXAPPSubscriptionRequest(msg)
717         case xapp.RIC_SUB_RESP:
718                 go c.handleE2TSubscriptionResponse(msg)
719         case xapp.RIC_SUB_FAILURE:
720                 go c.handleE2TSubscriptionFailure(msg)
721         case xapp.RIC_SUB_DEL_REQ:
722                 go c.handleXAPPSubscriptionDeleteRequest(msg)
723         case xapp.RIC_SUB_DEL_RESP:
724                 go c.handleE2TSubscriptionDeleteResponse(msg)
725         case xapp.RIC_SUB_DEL_FAILURE:
726                 go c.handleE2TSubscriptionDeleteFailure(msg)
727         default:
728                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
729         }
730         return
731 }
732
733 //-------------------------------------------------------------------
734 // handle from XAPP Subscription Request
735 //------------------------------------------------------------------
736 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
737         xapp.Logger.Info("MSG from XAPP: %s", params.String())
738         c.UpdateCounter(cSubReqFromXapp)
739
740         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
741         if err != nil {
742                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
743                 return
744         }
745
746         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
747         if trans == nil {
748                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
749                 return
750         }
751         defer trans.Release()
752
753         if err = c.tracker.Track(trans); err != nil {
754                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
755                 return
756         }
757
758         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
759         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
760         if err != nil {
761                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
762                 return
763         }
764
765         c.wakeSubscriptionRequest(subs, trans)
766 }
767
768 //-------------------------------------------------------------------
769 // Wake Subscription Request to E2node
770 //------------------------------------------------------------------
771 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
772
773         go c.handleSubscriptionCreate(subs, trans)
774         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
775         var err error
776         if event != nil {
777                 switch themsg := event.(type) {
778                 case *e2ap.E2APSubscriptionResponse:
779                         themsg.RequestId.Id = trans.RequestId.Id
780                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
781                         if err == nil {
782                                 trans.Release()
783                                 c.UpdateCounter(cSubRespToXapp)
784                                 c.rmrSendToXapp("", subs, trans)
785                                 return
786                         }
787                 case *e2ap.E2APSubscriptionFailure:
788                         themsg.RequestId.Id = trans.RequestId.Id
789                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
790                         if err == nil {
791                                 c.UpdateCounter(cSubFailToXapp)
792                                 c.rmrSendToXapp("", subs, trans)
793                         }
794                 default:
795                         break
796                 }
797         }
798         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
799         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
800 }
801
802 //-------------------------------------------------------------------
803 // handle from XAPP Subscription Delete Request
804 //------------------------------------------------------------------
805 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
806         xapp.Logger.Info("MSG from XAPP: %s", params.String())
807         c.UpdateCounter(cSubDelReqFromXapp)
808
809         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
810         if err != nil {
811                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
812                 return
813         }
814
815         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
816         if trans == nil {
817                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
818                 return
819         }
820         defer trans.Release()
821
822         err = c.tracker.Track(trans)
823         if err != nil {
824                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
825                 return
826         }
827
828         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
829         if err != nil {
830                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
831                 return
832         }
833
834         //
835         // Wake subs delete
836         //
837         go c.handleSubscriptionDelete(subs, trans)
838         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
839
840         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
841
842         if subs.NoRespToXapp == true {
843                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
844                 return
845         }
846
847         // Whatever is received success, fail or timeout, send successful delete response
848         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
849         subDelRespMsg.RequestId.Id = trans.RequestId.Id
850         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
851         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
852         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
853         if err == nil {
854                 c.UpdateCounter(cSubDelRespToXapp)
855                 c.rmrSendToXapp("", subs, trans)
856         }
857
858         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
859         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
860 }
861
862 //-------------------------------------------------------------------
863 // SUBS CREATE Handling
864 //-------------------------------------------------------------------
865 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
866
867         var removeSubscriptionFromDb bool = false
868         trans := c.tracker.NewSubsTransaction(subs)
869         subs.WaitTransactionTurn(trans)
870         defer subs.ReleaseTransactionTurn(trans)
871         defer trans.Release()
872
873         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
874
875         subRfMsg, valid := subs.GetCachedResponse()
876         if subRfMsg == nil && valid == true {
877                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
878                 switch event.(type) {
879                 case *e2ap.E2APSubscriptionResponse:
880                         subRfMsg, valid = subs.SetCachedResponse(event, true)
881                         subs.SubRespRcvd = true
882                 case *e2ap.E2APSubscriptionFailure:
883                         removeSubscriptionFromDb = true
884                         subRfMsg, valid = subs.SetCachedResponse(event, false)
885                         xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
886                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
887                 case *SubmgrRestartTestEvent:
888                         // This simulates that no response has been received and after restart subscriptions are restored from db
889                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
890                         return
891                 default:
892                         xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
893                         removeSubscriptionFromDb = true
894                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
895                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
896                 }
897                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
898         } else {
899                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
900         }
901
902         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
903         if valid == false {
904                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
905         }
906
907         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
908         parentTrans.SendEvent(subRfMsg, 0)
909 }
910
911 //-------------------------------------------------------------------
912 // SUBS DELETE Handling
913 //-------------------------------------------------------------------
914
915 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
916
917         trans := c.tracker.NewSubsTransaction(subs)
918         subs.WaitTransactionTurn(trans)
919         defer subs.ReleaseTransactionTurn(trans)
920         defer trans.Release()
921
922         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
923
924         subs.mutex.Lock()
925
926         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
927                 subs.valid = false
928                 subs.mutex.Unlock()
929                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
930         } else {
931                 subs.mutex.Unlock()
932         }
933         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
934         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
935         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
936         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
937         c.registry.UpdateSubscriptionToDb(subs, c)
938         parentTrans.SendEvent(nil, 0)
939 }
940
941 //-------------------------------------------------------------------
942 // send to E2T Subscription Request
943 //-------------------------------------------------------------------
944 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
945         var err error
946         var event interface{} = nil
947         var timedOut bool = false
948         const ricRequestorId = 123
949
950         subReqMsg := subs.SubReqMsg
951         subReqMsg.RequestId = subs.GetReqId().RequestId
952         subReqMsg.RequestId.Id = ricRequestorId
953         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
954         if err != nil {
955                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
956                 return event
957         }
958
959         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
960         c.WriteSubscriptionToDb(subs)
961
962         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
963                 desc := fmt.Sprintf("(retry %d)", retries)
964                 if retries == 0 {
965                         c.UpdateCounter(cSubReqToE2)
966                 } else {
967                         c.UpdateCounter(cSubReReqToE2)
968                 }
969                 c.rmrSendToE2T(desc, subs, trans)
970                 if subs.DoNotWaitSubResp == false {
971                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
972                         if timedOut {
973                                 c.UpdateCounter(cSubReqTimerExpiry)
974                                 continue
975                         }
976                 } else {
977                         // Simulating case where subscrition request has been sent but response has not been received before restart
978                         event = &SubmgrRestartTestEvent{}
979                 }
980                 break
981         }
982         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
983         return event
984 }
985
986 //-------------------------------------------------------------------
987 // send to E2T Subscription Delete Request
988 //-------------------------------------------------------------------
989
990 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
991         var err error
992         var event interface{}
993         var timedOut bool
994         const ricRequestorId = 123
995
996         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
997         subDelReqMsg.RequestId = subs.GetReqId().RequestId
998         subDelReqMsg.RequestId.Id = ricRequestorId
999         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1000         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1001         if err != nil {
1002                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1003                 return event
1004         }
1005
1006         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1007                 desc := fmt.Sprintf("(retry %d)", retries)
1008                 if retries == 0 {
1009                         c.UpdateCounter(cSubDelReqToE2)
1010                 } else {
1011                         c.UpdateCounter(cSubDelReReqToE2)
1012                 }
1013                 c.rmrSendToE2T(desc, subs, trans)
1014                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1015                 if timedOut {
1016                         c.UpdateCounter(cSubDelReqTimerExpiry)
1017                         continue
1018                 }
1019                 break
1020         }
1021         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1022         return event
1023 }
1024
1025 //-------------------------------------------------------------------
1026 // handle from E2T Subscription Response
1027 //-------------------------------------------------------------------
1028 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1029         xapp.Logger.Info("MSG from E2T: %s", params.String())
1030         c.UpdateCounter(cSubRespFromE2)
1031
1032         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1033         if err != nil {
1034                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1035                 return
1036         }
1037         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1038         if err != nil {
1039                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1040                 return
1041         }
1042         trans := subs.GetTransaction()
1043         if trans == nil {
1044                 err = fmt.Errorf("Ongoing transaction not found")
1045                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1046                 return
1047         }
1048         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1049         if sendOk == false {
1050                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1051                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1052         }
1053         return
1054 }
1055
1056 //-------------------------------------------------------------------
1057 // handle from E2T Subscription Failure
1058 //-------------------------------------------------------------------
1059 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1060         xapp.Logger.Info("MSG from E2T: %s", params.String())
1061         c.UpdateCounter(cSubFailFromE2)
1062         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1063         if err != nil {
1064                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1065                 return
1066         }
1067         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1068         if err != nil {
1069                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1070                 return
1071         }
1072         trans := subs.GetTransaction()
1073         if trans == nil {
1074                 err = fmt.Errorf("Ongoing transaction not found")
1075                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1076                 return
1077         }
1078         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1079         if sendOk == false {
1080                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1081                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1082         }
1083         return
1084 }
1085
1086 //-------------------------------------------------------------------
1087 // handle from E2T Subscription Delete Response
1088 //-------------------------------------------------------------------
1089 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1090         xapp.Logger.Info("MSG from E2T: %s", params.String())
1091         c.UpdateCounter(cSubDelRespFromE2)
1092         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1093         if err != nil {
1094                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1095                 return
1096         }
1097         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1098         if err != nil {
1099                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1100                 return
1101         }
1102         trans := subs.GetTransaction()
1103         if trans == nil {
1104                 err = fmt.Errorf("Ongoing transaction not found")
1105                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1106                 return
1107         }
1108         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1109         if sendOk == false {
1110                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1111                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1112         }
1113         return
1114 }
1115
1116 //-------------------------------------------------------------------
1117 // handle from E2T Subscription Delete Failure
1118 //-------------------------------------------------------------------
1119 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1120         xapp.Logger.Info("MSG from E2T: %s", params.String())
1121         c.UpdateCounter(cSubDelFailFromE2)
1122         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1123         if err != nil {
1124                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1125                 return
1126         }
1127         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1128         if err != nil {
1129                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1130                 return
1131         }
1132         trans := subs.GetTransaction()
1133         if trans == nil {
1134                 err = fmt.Errorf("Ongoing transaction not found")
1135                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1136                 return
1137         }
1138         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1139         if sendOk == false {
1140                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1141                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1142         }
1143         return
1144 }
1145
1146 //-------------------------------------------------------------------
1147 //
1148 //-------------------------------------------------------------------
1149 func typeofSubsMessage(v interface{}) string {
1150         if v == nil {
1151                 return "NIL"
1152         }
1153         switch v.(type) {
1154         //case *e2ap.E2APSubscriptionRequest:
1155         //      return "SubReq"
1156         case *e2ap.E2APSubscriptionResponse:
1157                 return "SubResp"
1158         case *e2ap.E2APSubscriptionFailure:
1159                 return "SubFail"
1160         //case *e2ap.E2APSubscriptionDeleteRequest:
1161         //      return "SubDelReq"
1162         case *e2ap.E2APSubscriptionDeleteResponse:
1163                 return "SubDelResp"
1164         case *e2ap.E2APSubscriptionDeleteFailure:
1165                 return "SubDelFail"
1166         default:
1167                 return "Unknown"
1168         }
1169 }
1170
1171 //-------------------------------------------------------------------
1172 //
1173 //-------------------------------------------------------------------
1174 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1175         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1176         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1177         if err != nil {
1178                 xapp.Logger.Error("%v", err)
1179         }
1180 }
1181
1182 //-------------------------------------------------------------------
1183 //
1184 //-------------------------------------------------------------------
1185 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1186
1187         if removeSubscriptionFromDb == true {
1188                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1189                 c.RemoveSubscriptionFromDb(subs)
1190         } else {
1191                 // Update is needed for successful response and merge case here
1192                 if subs.RetryFromXapp == false {
1193                         c.WriteSubscriptionToDb(subs)
1194                 }
1195         }
1196         subs.RetryFromXapp = false
1197 }
1198
1199 //-------------------------------------------------------------------
1200 //
1201 //-------------------------------------------------------------------
1202 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1203         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1204         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1205         if err != nil {
1206                 xapp.Logger.Error("%v", err)
1207         }
1208 }
1209
1210 //-------------------------------------------------------------------
1211 //
1212 //-------------------------------------------------------------------
1213 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1214         xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1215         err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1216         if err != nil {
1217                 xapp.Logger.Error("%v", err)
1218         }
1219 }
1220
1221 //-------------------------------------------------------------------
1222 //
1223 //-------------------------------------------------------------------
1224 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1225
1226         if removeRestSubscriptionFromDb == true {
1227                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1228                 c.RemoveRESTSubscriptionFromDb(restSubId)
1229         } else {
1230                 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1231         }
1232 }
1233
1234 //-------------------------------------------------------------------
1235 //
1236 //-------------------------------------------------------------------
1237 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1238         xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1239         err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1240         if err != nil {
1241                 xapp.Logger.Error("%v", err)
1242         }
1243 }
1244
1245 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1246
1247         const ricRequestorId = 123
1248         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1249
1250         // Send delete for every endpoint in the subscription
1251         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1252         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1253         subDelReqMsg.RequestId.Id = ricRequestorId
1254         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1255         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1256         if err != nil {
1257                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1258                 return
1259         }
1260         for _, endPoint := range subs.EpList.Endpoints {
1261                 params := &xapp.RMRParams{}
1262                 params.Mtype = mType
1263                 params.SubId = int(subs.GetReqId().InstanceId)
1264                 params.Xid = ""
1265                 params.Meid = subs.Meid
1266                 params.Src = endPoint.String()
1267                 params.PayloadLen = len(payload.Buf)
1268                 params.Payload = payload.Buf
1269                 params.Mbuf = nil
1270                 subs.DeleteFromDb = true
1271                 c.handleXAPPSubscriptionDeleteRequest(params)
1272         }
1273 }
1274
1275 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1276
1277         fmt.Println("CRESTSubscriptionRequest")
1278
1279         if p.SubscriptionID != "" {
1280                 fmt.Println("  SubscriptionID = ", p.SubscriptionID)
1281         } else {
1282                 fmt.Println("  SubscriptionID = ''")
1283         }
1284
1285         fmt.Printf("  ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1286
1287         if p.ClientEndpoint.HTTPPort != nil {
1288                 fmt.Printf("  ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1289         } else {
1290                 fmt.Println("  ClientEndpoint.HTTPPort = nil")
1291         }
1292
1293         if p.ClientEndpoint.RMRPort != nil {
1294                 fmt.Printf("  ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1295         } else {
1296                 fmt.Println("  ClientEndpoint.RMRPort = nil")
1297         }
1298
1299         if p.Meid != nil {
1300                 fmt.Printf("  Meid = %s\n", *p.Meid)
1301         } else {
1302                 fmt.Println("  Meid = nil")
1303         }
1304
1305         for _, subscriptionDetail := range p.SubscriptionDetails {
1306                 if p.RANFunctionID != nil {
1307                         fmt.Printf("  RANFunctionID = %v\n", *p.RANFunctionID)
1308                 } else {
1309                         fmt.Println("  RANFunctionID = nil")
1310                 }
1311                 fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1312                 fmt.Printf("  SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1313
1314                 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1315                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1316                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1317                         fmt.Printf("  SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1318
1319                         if actionToBeSetup.SubsequentAction != nil {
1320                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1321                                 fmt.Printf("  SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1322                         } else {
1323                                 fmt.Println("  SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1324                         }
1325                 }
1326         }
1327 }