Counter improvements
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70
71 type Control struct {
72         *xapp.RMRClient
73         e2ap     *E2ap
74         registry *Registry
75         tracker  *Tracker
76         db       Sdlnterface
77         //subscriber *xapp.Subscriber
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81 }
82
83 type RMRMeid struct {
84         PlmnID  string
85         EnbID   string
86         RanName string
87 }
88
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
91
92 func init() {
93         xapp.Logger.Info("SUBMGR")
94         viper.AutomaticEnv()
95         viper.SetEnvPrefix("submgr")
96         viper.AllowEmptyEnv(true)
97 }
98
99 func NewControl() *Control {
100
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         c := &Control{e2ap: new(E2ap),
112                 registry: registry,
113                 tracker:  tracker,
114                 db:       CreateSdl(),
115                 //subscriber: subscriber,
116                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117         }
118         c.ReadConfigParameters("")
119
120         // Register REST handler for testing support
121         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125
126         if readSubsFromDb == "false" {
127                 return c
128         }
129
130         // Read subscriptions from db
131         xapp.Logger.Info("Reading subscriptions from db")
132         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133         if err != nil {
134                 xapp.Logger.Error("%v", err)
135         } else {
136                 c.registry.subIds = subIds
137                 c.registry.register = register
138                 c.HandleUncompletedSubscriptions(register)
139         }
140         return c
141 }
142
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144         subscriptions, _ := c.registry.QueryHandler()
145         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 }
147
148 //-------------------------------------------------------------------
149 //
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
152
153         // viper.GetDuration returns nanoseconds
154         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155         if e2tSubReqTimeout == 0 {
156                 e2tSubReqTimeout = 2000 * 1000000
157         }
158         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160         if e2tSubDelReqTime == 0 {
161                 e2tSubDelReqTime = 2000 * 1000000
162         }
163         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165         if e2tRecvMsgTimeout == 0 {
166                 e2tRecvMsgTimeout = 2000 * 1000000
167         }
168         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
169
170         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171         // value 100ms used currently only in unittests.
172         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173         if waitRouteCleanup_ms == 0 {
174                 waitRouteCleanup_ms = 5000 * 1000000
175         }
176         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
177
178         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179         if e2tMaxSubReqTryCount == 0 {
180                 e2tMaxSubReqTryCount = 1
181         }
182         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
183
184         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185         if e2tMaxSubDelReqTryCount == 0 {
186                 e2tMaxSubDelReqTryCount = 1
187         }
188         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
189
190         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191         if readSubsFromDb == "" {
192                 readSubsFromDb = "true"
193         }
194         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 }
196
197 //-------------------------------------------------------------------
198 //
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
201
202         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203         for subId, subs := range register {
204                 if subs.SubRespRcvd == false {
205                         subs.NoRespToXapp = true
206                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207                         c.SendSubscriptionDeleteReq(subs)
208                 }
209         }
210 }
211
212 func (c *Control) ReadyCB(data interface{}) {
213         if c.RMRClient == nil {
214                 c.RMRClient = xapp.Rmr
215         }
216 }
217
218 func (c *Control) Run() {
219         xapp.SetReadyCB(c.ReadyCB, nil)
220         xapp.AddConfigChangeListener(c.ReadConfigParameters)
221         xapp.Run(c)
222 }
223
224 //-------------------------------------------------------------------
225 //
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
228
229         c.CntRecvMsg++
230         c.UpdateCounter(cRestSubReqFromXapp)
231
232         restSubId := ksuid.New().String()
233         subResp := models.SubscriptionResponse{}
234         subResp.SubscriptionID = &restSubId
235         p := params.(*models.SubscriptionParams)
236
237         if p.ClientEndpoint == nil {
238                 xapp.Logger.Error("ClientEndpoint == nil")
239                 c.UpdateCounter(cRestSubFailToXapp)
240                 return nil, fmt.Errorf("")
241         }
242
243         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
244         if err != nil {
245                 xapp.Logger.Error("%s", err.Error())
246                 c.UpdateCounter(cRestSubFailToXapp)
247                 return nil, err
248         }
249
250         restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
251         if err != nil {
252                 xapp.Logger.Error("%s", err.Error())
253                 c.UpdateCounter(cRestSubFailToXapp)
254                 return nil, err
255         }
256
257         subReqList := e2ap.SubscriptionRequestList{}
258         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
259         if err != nil {
260                 xapp.Logger.Error("%s", err.Error())
261                 c.registry.DeleteRESTSubscription(&restSubId)
262                 c.UpdateCounter(cRestSubFailToXapp)
263                 return nil, err
264         }
265
266         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
267
268         c.UpdateCounter(cRestSubRespToXapp)
269         return &subResp, nil
270 }
271
272 //-------------------------------------------------------------------
273 //
274 //-------------------------------------------------------------------
275
276 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
277         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
278
279         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
280
281         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
282         if err != nil {
283                 xapp.Logger.Error("%s", err.Error())
284                 return
285         }
286
287         var requestorID int64
288         var instanceId int64
289         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
290                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
291
292                 xid := *restSubId + "_" + strconv.FormatUint(uint64(subReqMsg.RequestId.InstanceId), 10)
293                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), xid, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
294                 //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
295                 if trans == nil {
296                         c.registry.DeleteRESTSubscription(restSubId)
297                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
298                         return
299                 }
300
301                 defer trans.Release()
302                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
303                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
304                 if err != nil {
305                         // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
306                         // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
307                         requestorID = (int64)(0)
308                         instanceId = (int64)(0)
309                         resp := &models.SubscriptionResponse{
310                                 SubscriptionID: restSubId,
311                                 SubscriptionInstances: []*models.SubscriptionInstance{
312                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
313                                 },
314                         }
315                         // Mark REST subscription request processed.
316                         restSubscription.SetProcessed()
317                         xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
318                         xapp.Subscription.Notify(resp, *clientEndpoint)
319                         c.UpdateCounter(cRestSubFailNotifToXapp)
320                 } else {
321                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
322
323                         // Store successfully processed InstanceId for deletion
324                         restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
325
326                         // Send notification to xApp that a Subscription Request has been processed.
327                         requestorID = (int64)(subRespMsg.RequestId.Id)
328                         instanceId = (int64)(subRespMsg.RequestId.InstanceId)
329                         resp := &models.SubscriptionResponse{
330                                 SubscriptionID: restSubId,
331                                 SubscriptionInstances: []*models.SubscriptionInstance{
332                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
333                                 },
334                         }
335                         // Mark REST subscription request processesd.
336                         restSubscription.SetProcessed()
337                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
338                         xapp.Subscription.Notify(resp, *clientEndpoint)
339                         c.UpdateCounter(cRestSubNotifToXapp)
340
341                 }
342         }
343 }
344
345 //-------------------------------------------------------------------
346 //
347 //------------------------------------------------------------------
348 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
349         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
350
351         err := c.tracker.Track(trans)
352         if err != nil {
353                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
354                 xapp.Logger.Error("%s", err.Error())
355                 return nil, err
356         }
357
358         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
359         if err != nil {
360                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
361                 xapp.Logger.Error("%s", err.Error())
362                 return nil, err
363         }
364
365         //
366         // Wake subs request
367         //
368         go c.handleSubscriptionCreate(subs, trans)
369         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
370
371         err = nil
372         if event != nil {
373                 switch themsg := event.(type) {
374                 case *e2ap.E2APSubscriptionResponse:
375                         trans.Release()
376                         return themsg, nil
377                 case *e2ap.E2APSubscriptionFailure:
378                         err = fmt.Errorf("SubscriptionFailure received")
379                         return nil, err
380                 default:
381                         break
382                 }
383         }
384         err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
385         xapp.Logger.Error("%s", err.Error())
386         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
387         return nil, err
388 }
389
390 //-------------------------------------------------------------------
391 //
392 //-------------------------------------------------------------------
393 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
394
395         c.CntRecvMsg++
396         c.UpdateCounter(cRestSubDelReqFromXapp)
397
398         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
399
400         restSubscription, err := c.registry.GetRESTSubscription(restSubId)
401         if err != nil {
402                 xapp.Logger.Error("%s", err.Error())
403                 if restSubscription == nil {
404                         // Subscription was not found
405                         return nil
406                 } else {
407                         if restSubscription.SubReqOngoing == true {
408                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
409                                 xapp.Logger.Error("%s", err.Error())
410                                 return err
411                         } else if restSubscription.SubDelReqOngoing == true {
412                                 // Previous request for same restSubId still ongoing
413                                 return nil
414                         }
415                 }
416         }
417
418         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
419         go func() {
420                 for _, instanceId := range restSubscription.InstanceIds {
421                         err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
422                         if err != nil {
423                                 xapp.Logger.Error("%s", err.Error())
424                                 //return err
425                         }
426                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
427                         restSubscription.DeleteInstanceId(instanceId)
428                 }
429                 c.registry.DeleteRESTSubscription(&restSubId)
430         }()
431
432         c.UpdateCounter(cRestSubDelRespToXapp)
433
434         return nil
435 }
436
437 //-------------------------------------------------------------------
438 //
439 //-------------------------------------------------------------------
440 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
441
442         xid := *restSubId + "_" + strconv.FormatUint(uint64(instanceId), 10)
443         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), xid, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
444         //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
445         if trans == nil {
446                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
447                 xapp.Logger.Error("%s", err.Error())
448         }
449         defer trans.Release()
450
451         err := c.tracker.Track(trans)
452         if err != nil {
453                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
454                 xapp.Logger.Error("%s", err.Error())
455                 return &time.ParseError{}
456         }
457
458         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
459         if err != nil {
460                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
461                 xapp.Logger.Error("%s", err.Error())
462                 return err
463         }
464         //
465         // Wake subs delete
466         //
467         go c.handleSubscriptionDelete(subs, trans)
468         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
469
470         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
471
472         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
473
474         return nil
475 }
476
477 //-------------------------------------------------------------------
478 //
479 //-------------------------------------------------------------------
480 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
481         xapp.Logger.Info("QueryHandler() called")
482
483         c.CntRecvMsg++
484
485         return c.registry.QueryHandler()
486 }
487
488 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
489         xapp.Logger.Info("TestRestHandler() called")
490
491         pathParams := mux.Vars(r)
492         s := pathParams["testId"]
493
494         // This can be used to delete single subscription from db
495         if contains := strings.Contains(s, "deletesubid="); contains == true {
496                 var splits = strings.Split(s, "=")
497                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
498                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
499                         c.RemoveSubscriptionFromSdl(uint32(subId))
500                         return
501                 }
502         }
503
504         // This can be used to remove all subscriptions db from
505         if s == "emptydb" {
506                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
507                 c.RemoveAllSubscriptionsFromSdl()
508                 return
509         }
510
511         // This is meant to cause submgr's restart in testing
512         if s == "restart" {
513                 xapp.Logger.Info("os.Exit(1) called")
514                 os.Exit(1)
515         }
516
517         xapp.Logger.Info("Unsupported rest command received %s", s)
518 }
519
520 //-------------------------------------------------------------------
521 //
522 //-------------------------------------------------------------------
523
524 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
525         params := &xapp.RMRParams{}
526         params.Mtype = trans.GetMtype()
527         params.SubId = int(subs.GetReqId().InstanceId)
528         params.Xid = ""
529         params.Meid = subs.GetMeid()
530         params.Src = ""
531         params.PayloadLen = len(trans.Payload.Buf)
532         params.Payload = trans.Payload.Buf
533         params.Mbuf = nil
534         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
535         err = c.SendWithRetry(params, false, 5)
536         if err != nil {
537                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
538         }
539         return err
540 }
541
542 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
543
544         params := &xapp.RMRParams{}
545         params.Mtype = trans.GetMtype()
546         params.SubId = int(subs.GetReqId().InstanceId)
547         params.Xid = trans.GetXid()
548         params.Meid = trans.GetMeid()
549         params.Src = ""
550         params.PayloadLen = len(trans.Payload.Buf)
551         params.Payload = trans.Payload.Buf
552         params.Mbuf = nil
553         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
554         err = c.SendWithRetry(params, false, 5)
555         if err != nil {
556                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
557         }
558         return err
559 }
560
561 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
562         if c.RMRClient == nil {
563                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
564                 xapp.Logger.Error("%s", err.Error())
565                 return
566         }
567         c.CntRecvMsg++
568
569         defer c.RMRClient.Free(msg.Mbuf)
570
571         // xapp-frame might use direct access to c buffer and
572         // when msg.Mbuf is freed, someone might take it into use
573         // and payload data might be invalid inside message handle function
574         //
575         // subscriptions won't load system a lot so there is no
576         // real performance hit by cloning buffer into new go byte slice
577         cPay := append(msg.Payload[:0:0], msg.Payload...)
578         msg.Payload = cPay
579         msg.PayloadLen = len(cPay)
580
581         switch msg.Mtype {
582         case xapp.RIC_SUB_REQ:
583                 go c.handleXAPPSubscriptionRequest(msg)
584         case xapp.RIC_SUB_RESP:
585                 go c.handleE2TSubscriptionResponse(msg)
586         case xapp.RIC_SUB_FAILURE:
587                 go c.handleE2TSubscriptionFailure(msg)
588         case xapp.RIC_SUB_DEL_REQ:
589                 go c.handleXAPPSubscriptionDeleteRequest(msg)
590         case xapp.RIC_SUB_DEL_RESP:
591                 go c.handleE2TSubscriptionDeleteResponse(msg)
592         case xapp.RIC_SUB_DEL_FAILURE:
593                 go c.handleE2TSubscriptionDeleteFailure(msg)
594         default:
595                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
596         }
597         return
598 }
599
600 //-------------------------------------------------------------------
601 // handle from XAPP Subscription Request
602 //------------------------------------------------------------------
603 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
604         xapp.Logger.Info("MSG from XAPP: %s", params.String())
605         c.UpdateCounter(cSubReqFromXapp)
606
607         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
608         if err != nil {
609                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
610                 return
611         }
612
613         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
614         if trans == nil {
615                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
616                 return
617         }
618         defer trans.Release()
619
620         if err = c.tracker.Track(trans); err != nil {
621                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
622                 return
623         }
624
625         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
626         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
627         if err != nil {
628                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
629                 return
630         }
631
632         c.wakeSubscriptionRequest(subs, trans)
633 }
634
635 //-------------------------------------------------------------------
636 // Wake Subscription Request to E2node
637 //------------------------------------------------------------------
638 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
639
640         go c.handleSubscriptionCreate(subs, trans)
641         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
642         var err error
643         if event != nil {
644                 switch themsg := event.(type) {
645                 case *e2ap.E2APSubscriptionResponse:
646                         themsg.RequestId.Id = trans.RequestId.Id
647                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
648                         if err == nil {
649                                 trans.Release()
650                                 c.UpdateCounter(cSubRespToXapp)
651                                 c.rmrSendToXapp("", subs, trans)
652                                 return
653                         }
654                 case *e2ap.E2APSubscriptionFailure:
655                         themsg.RequestId.Id = trans.RequestId.Id
656                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
657                         if err == nil {
658                                 c.UpdateCounter(cSubFailToXapp)
659                                 c.rmrSendToXapp("", subs, trans)
660                         }
661                 default:
662                         break
663                 }
664         }
665         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
666         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
667 }
668
669 //-------------------------------------------------------------------
670 // handle from XAPP Subscription Delete Request
671 //------------------------------------------------------------------
672 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
673         xapp.Logger.Info("MSG from XAPP: %s", params.String())
674         c.UpdateCounter(cSubDelReqFromXapp)
675
676         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
677         if err != nil {
678                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
679                 return
680         }
681
682         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
683         if trans == nil {
684                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
685                 return
686         }
687         defer trans.Release()
688
689         err = c.tracker.Track(trans)
690         if err != nil {
691                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
692                 return
693         }
694
695         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
696         if err != nil {
697                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
698                 return
699         }
700
701         //
702         // Wake subs delete
703         //
704         go c.handleSubscriptionDelete(subs, trans)
705         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
706
707         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
708
709         if subs.NoRespToXapp == true {
710                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
711                 return
712         }
713
714         // Whatever is received success, fail or timeout, send successful delete response
715         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
716         subDelRespMsg.RequestId.Id = trans.RequestId.Id
717         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
718         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
719         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
720         if err == nil {
721                 c.UpdateCounter(cSubDelRespToXapp)
722                 c.rmrSendToXapp("", subs, trans)
723         }
724
725         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
726         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
727 }
728
729 //-------------------------------------------------------------------
730 // SUBS CREATE Handling
731 //-------------------------------------------------------------------
732 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
733
734         var removeSubscriptionFromDb bool = false
735         trans := c.tracker.NewSubsTransaction(subs)
736         subs.WaitTransactionTurn(trans)
737         defer subs.ReleaseTransactionTurn(trans)
738         defer trans.Release()
739
740         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
741
742         subRfMsg, valid := subs.GetCachedResponse()
743         if subRfMsg == nil && valid == true {
744                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
745                 switch event.(type) {
746                 case *e2ap.E2APSubscriptionResponse:
747                         subRfMsg, valid = subs.SetCachedResponse(event, true)
748                         subs.SubRespRcvd = true
749                 case *e2ap.E2APSubscriptionFailure:
750                         removeSubscriptionFromDb = true
751                         subRfMsg, valid = subs.SetCachedResponse(event, false)
752                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
753                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
754                 case *SubmgrRestartTestEvent:
755                         // This simulates that no response has been received and after restart subscriptions are restored from db
756                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
757                         return
758                 default:
759                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
760                         removeSubscriptionFromDb = true
761                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
762                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
763                 }
764                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
765         } else {
766                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
767         }
768
769         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
770         if valid == false {
771                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
772         }
773
774         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
775         parentTrans.SendEvent(subRfMsg, 0)
776 }
777
778 //-------------------------------------------------------------------
779 // SUBS DELETE Handling
780 //-------------------------------------------------------------------
781
782 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
783
784         trans := c.tracker.NewSubsTransaction(subs)
785         subs.WaitTransactionTurn(trans)
786         defer subs.ReleaseTransactionTurn(trans)
787         defer trans.Release()
788
789         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
790
791         subs.mutex.Lock()
792
793         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
794                 subs.valid = false
795                 subs.mutex.Unlock()
796                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
797         } else {
798                 subs.mutex.Unlock()
799         }
800         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
801         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
802         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
803         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
804         c.registry.UpdateSubscriptionToDb(subs, c)
805         parentTrans.SendEvent(nil, 0)
806 }
807
808 //-------------------------------------------------------------------
809 // send to E2T Subscription Request
810 //-------------------------------------------------------------------
811 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
812         var err error
813         var event interface{} = nil
814         var timedOut bool = false
815
816         subReqMsg := subs.SubReqMsg
817         subReqMsg.RequestId = subs.GetReqId().RequestId
818         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
819         if err != nil {
820                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
821                 return event
822         }
823
824         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
825         c.WriteSubscriptionToDb(subs)
826
827         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
828                 desc := fmt.Sprintf("(retry %d)", retries)
829                 if retries == 0 {
830                         c.UpdateCounter(cSubReqToE2)
831                 } else {
832                         c.UpdateCounter(cSubReReqToE2)
833                 }
834                 c.rmrSendToE2T(desc, subs, trans)
835                 if subs.DoNotWaitSubResp == false {
836                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
837                         if timedOut {
838                                 c.UpdateCounter(cSubReqTimerExpiry)
839                                 continue
840                         }
841                 } else {
842                         // Simulating case where subscrition request has been sent but response has not been received before restart
843                         event = &SubmgrRestartTestEvent{}
844                 }
845                 break
846         }
847         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
848         return event
849 }
850
851 //-------------------------------------------------------------------
852 // send to E2T Subscription Delete Request
853 //-------------------------------------------------------------------
854
855 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
856         var err error
857         var event interface{}
858         var timedOut bool
859
860         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
861         subDelReqMsg.RequestId = subs.GetReqId().RequestId
862         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
863         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
864         if err != nil {
865                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
866                 return event
867         }
868
869         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
870                 desc := fmt.Sprintf("(retry %d)", retries)
871                 if retries == 0 {
872                         c.UpdateCounter(cSubDelReqToE2)
873                 } else {
874                         c.UpdateCounter(cSubDelReReqToE2)
875                 }
876                 c.rmrSendToE2T(desc, subs, trans)
877                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
878                 if timedOut {
879                         c.UpdateCounter(cSubDelReqTimerExpiry)
880                         continue
881                 }
882                 break
883         }
884         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
885         return event
886 }
887
888 //-------------------------------------------------------------------
889 // handle from E2T Subscription Response
890 //-------------------------------------------------------------------
891 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
892         xapp.Logger.Info("MSG from E2T: %s", params.String())
893         c.UpdateCounter(cSubRespFromE2)
894
895         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
896         if err != nil {
897                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
898                 return
899         }
900         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
901         if err != nil {
902                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
903                 return
904         }
905         trans := subs.GetTransaction()
906         if trans == nil {
907                 err = fmt.Errorf("Ongoing transaction not found")
908                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
909                 return
910         }
911         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
912         if sendOk == false {
913                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
914                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
915         }
916         return
917 }
918
919 //-------------------------------------------------------------------
920 // handle from E2T Subscription Failure
921 //-------------------------------------------------------------------
922 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
923         xapp.Logger.Info("MSG from E2T: %s", params.String())
924         c.UpdateCounter(cSubFailFromE2)
925         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
926         if err != nil {
927                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
928                 return
929         }
930         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
931         if err != nil {
932                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
933                 return
934         }
935         trans := subs.GetTransaction()
936         if trans == nil {
937                 err = fmt.Errorf("Ongoing transaction not found")
938                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
939                 return
940         }
941         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
942         if sendOk == false {
943                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
944                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
945         }
946         return
947 }
948
949 //-------------------------------------------------------------------
950 // handle from E2T Subscription Delete Response
951 //-------------------------------------------------------------------
952 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
953         xapp.Logger.Info("MSG from E2T: %s", params.String())
954         c.UpdateCounter(cSubDelRespFromE2)
955         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
956         if err != nil {
957                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
958                 return
959         }
960         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
961         if err != nil {
962                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
963                 return
964         }
965         trans := subs.GetTransaction()
966         if trans == nil {
967                 err = fmt.Errorf("Ongoing transaction not found")
968                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
969                 return
970         }
971         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
972         if sendOk == false {
973                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
974                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
975         }
976         return
977 }
978
979 //-------------------------------------------------------------------
980 // handle from E2T Subscription Delete Failure
981 //-------------------------------------------------------------------
982 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
983         xapp.Logger.Info("MSG from E2T: %s", params.String())
984         c.UpdateCounter(cSubDelFailFromE2)
985         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
986         if err != nil {
987                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
988                 return
989         }
990         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
991         if err != nil {
992                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
993                 return
994         }
995         trans := subs.GetTransaction()
996         if trans == nil {
997                 err = fmt.Errorf("Ongoing transaction not found")
998                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
999                 return
1000         }
1001         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1002         if sendOk == false {
1003                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1004                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1005         }
1006         return
1007 }
1008
1009 //-------------------------------------------------------------------
1010 //
1011 //-------------------------------------------------------------------
1012 func typeofSubsMessage(v interface{}) string {
1013         if v == nil {
1014                 return "NIL"
1015         }
1016         switch v.(type) {
1017         //case *e2ap.E2APSubscriptionRequest:
1018         //      return "SubReq"
1019         case *e2ap.E2APSubscriptionResponse:
1020                 return "SubResp"
1021         case *e2ap.E2APSubscriptionFailure:
1022                 return "SubFail"
1023         //case *e2ap.E2APSubscriptionDeleteRequest:
1024         //      return "SubDelReq"
1025         case *e2ap.E2APSubscriptionDeleteResponse:
1026                 return "SubDelResp"
1027         case *e2ap.E2APSubscriptionDeleteFailure:
1028                 return "SubDelFail"
1029         default:
1030                 return "Unknown"
1031         }
1032 }
1033
1034 //-------------------------------------------------------------------
1035 //
1036 //-------------------------------------------------------------------
1037 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1038         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1039         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1040         if err != nil {
1041                 xapp.Logger.Error("%v", err)
1042         }
1043 }
1044
1045 //-------------------------------------------------------------------
1046 //
1047 //-------------------------------------------------------------------
1048 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1049
1050         if removeSubscriptionFromDb == true {
1051                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1052                 c.RemoveSubscriptionFromDb(subs)
1053         } else {
1054                 // Update is needed for successful response and merge case here
1055                 if subs.RetryFromXapp == false {
1056                         c.WriteSubscriptionToDb(subs)
1057                 }
1058         }
1059         subs.RetryFromXapp = false
1060 }
1061
1062 //-------------------------------------------------------------------
1063 //
1064 //-------------------------------------------------------------------
1065 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1066         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1067         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1068         if err != nil {
1069                 xapp.Logger.Error("%v", err)
1070         }
1071 }
1072
1073 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1074
1075         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1076
1077         // Send delete for every endpoint in the subscription
1078         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1079         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1080         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1081         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1082         if err != nil {
1083                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1084                 return
1085         }
1086         for _, endPoint := range subs.EpList.Endpoints {
1087                 params := &xapp.RMRParams{}
1088                 params.Mtype = mType
1089                 params.SubId = int(subs.GetReqId().InstanceId)
1090                 params.Xid = ""
1091                 params.Meid = subs.Meid
1092                 params.Src = endPoint.String()
1093                 params.PayloadLen = len(payload.Buf)
1094                 params.Payload = payload.Buf
1095                 params.Mbuf = nil
1096                 subs.DeleteFromDb = true
1097                 c.handleXAPPSubscriptionDeleteRequest(params)
1098         }
1099 }