ccb4b3d083648ee7f4158ebd9d8534862a68c37b
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64    // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70
71 type Control struct {
72         *xapp.RMRClient
73         e2ap     *E2ap
74         registry *Registry
75         tracker  *Tracker
76         db       Sdlnterface
77         //subscriber *xapp.Subscriber
78         CntRecvMsg    uint64
79         ResetTestFlag bool
80         Counters      map[string]xapp.Counter
81 }
82
83 type RMRMeid struct {
84         PlmnID  string
85         EnbID   string
86         RanName string
87 }
88
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
91
92 func init() {
93         xapp.Logger.Info("SUBMGR")
94         viper.AutomaticEnv()
95         viper.SetEnvPrefix("submgr")
96         viper.AllowEmptyEnv(true)
97 }
98
99 func NewControl() *Control {
100
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         c := &Control{e2ap: new(E2ap),
112                 registry: registry,
113                 tracker:  tracker,
114                 db:       CreateSdl(),
115                 //subscriber: subscriber,
116                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117         }
118         c.ReadConfigParameters("")
119
120         // Register REST handler for testing support
121         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125
126         if readSubsFromDb == "false" {
127                 return c
128         }
129
130         // Read subscriptions from db
131         xapp.Logger.Info("Reading subscriptions from db")
132         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133         if err != nil {
134                 xapp.Logger.Error("%v", err)
135         } else {
136                 c.registry.subIds = subIds
137                 c.registry.register = register
138                 c.HandleUncompletedSubscriptions(register)
139         }
140         return c
141 }
142
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144         subscriptions, _ := c.registry.QueryHandler()
145         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 }
147
148 //-------------------------------------------------------------------
149 //
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
152
153         // viper.GetDuration returns nanoseconds
154         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155         if e2tSubReqTimeout == 0 {
156                 e2tSubReqTimeout = 2000 * 1000000
157         }
158         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160         if e2tSubDelReqTime == 0 {
161                 e2tSubDelReqTime = 2000 * 1000000
162         }
163         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165         if e2tRecvMsgTimeout == 0 {
166                 e2tRecvMsgTimeout = 2000 * 1000000
167         }
168         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
169
170         // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171         // value 100ms used currently only in unittests.
172         waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173         if waitRouteCleanup_ms == 0 {
174                 waitRouteCleanup_ms = 5000 * 1000000
175         }
176         xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
177
178         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179         if e2tMaxSubReqTryCount == 0 {
180                 e2tMaxSubReqTryCount = 1
181         }
182         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
183
184         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185         if e2tMaxSubDelReqTryCount == 0 {
186                 e2tMaxSubDelReqTryCount = 1
187         }
188         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
189
190         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191         if readSubsFromDb == "" {
192                 readSubsFromDb = "true"
193         }
194         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 }
196
197 //-------------------------------------------------------------------
198 //
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
201
202         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203         for subId, subs := range register {
204                 if subs.SubRespRcvd == false {
205                         subs.NoRespToXapp = true
206                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207                         c.SendSubscriptionDeleteReq(subs)
208                 }
209         }
210 }
211
212 func (c *Control) ReadyCB(data interface{}) {
213         if c.RMRClient == nil {
214                 c.RMRClient = xapp.Rmr
215         }
216 }
217
218 func (c *Control) Run() {
219         xapp.SetReadyCB(c.ReadyCB, nil)
220         xapp.AddConfigChangeListener(c.ReadConfigParameters)
221         xapp.Run(c)
222 }
223
224 //-------------------------------------------------------------------
225 //
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
228
229         restSubId := ksuid.New().String()
230         subResp := models.SubscriptionResponse{}
231         subResp.SubscriptionID = &restSubId
232         p := params.(*models.SubscriptionParams)
233
234         c.CntRecvMsg++
235
236         c.UpdateCounter(cRestSubReqFromXapp)
237
238         if p.ClientEndpoint == nil {
239                 xapp.Logger.Error("ClientEndpoint == nil")
240                 return nil, fmt.Errorf("")
241         }
242
243         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
244         if err != nil {
245                 xapp.Logger.Error("%s", err.Error())
246                 return nil, err
247         }
248
249         restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
250         if err != nil {
251                 xapp.Logger.Error("%s", err.Error())
252                 return nil, err
253         }
254
255         subReqList := e2ap.SubscriptionRequestList{}
256         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
257         if err != nil {
258                 xapp.Logger.Error("%s", err.Error())
259                 c.registry.DeleteRESTSubscription(&restSubId)
260                 return nil, err
261         }
262
263         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
264
265         return &subResp, nil
266
267 }
268
269 //-------------------------------------------------------------------
270 //
271 //-------------------------------------------------------------------
272
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
275
276         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
277
278         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
279         if err != nil {
280                 xapp.Logger.Error("%s", err.Error())
281                 return
282         }
283
284         var requestorID int64
285         var instanceId int64
286         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
288
289                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
290                 if trans == nil {
291                         c.registry.DeleteRESTSubscription(restSubId)
292                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
293                         return
294                 }
295
296                 defer trans.Release()
297                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
299                 if err != nil {
300                         // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301                         // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302                         requestorID = (int64)(0)
303                         instanceId = (int64)(0)
304                         resp := &models.SubscriptionResponse{
305                                 SubscriptionID: restSubId,
306                                 SubscriptionInstances: []*models.SubscriptionInstance{
307                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
308                                 },
309                         }
310                         // Mark REST subscription request processed.
311                         restSubscription.SetProcessed()
312                         xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313                         xapp.Subscription.Notify(resp, *clientEndpoint)
314                         c.UpdateCounter(cRestSubFailToXapp)
315                 } else {
316                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
317
318                         // Store successfully processed InstanceId for deletion
319                         restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
320
321                         // Send notification to xApp that a Subscription Request has been processed.
322                         requestorID = (int64)(subRespMsg.RequestId.Id)
323                         instanceId = (int64)(subRespMsg.RequestId.InstanceId)
324                         resp := &models.SubscriptionResponse{
325                                 SubscriptionID: restSubId,
326                                 SubscriptionInstances: []*models.SubscriptionInstance{
327                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
328                                 },
329                         }
330                         // Mark REST subscription request processesd.
331                         restSubscription.SetProcessed()
332                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
333                         xapp.Subscription.Notify(resp, *clientEndpoint)
334                         c.UpdateCounter(cRestSubRespToXapp)
335
336                 }
337         }
338 }
339
340 //-------------------------------------------------------------------
341 //
342 //------------------------------------------------------------------
343 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
344         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
345
346         err := c.tracker.Track(trans)
347         if err != nil {
348                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
349                 xapp.Logger.Error("%s", err.Error())
350                 return nil, err
351         }
352
353         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
354         if err != nil {
355                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
356                 xapp.Logger.Error("%s", err.Error())
357                 return nil, err
358         }
359
360         //
361         // Wake subs request
362         //
363         go c.handleSubscriptionCreate(subs, trans)
364         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
365
366         err = nil
367         if event != nil {
368                 switch themsg := event.(type) {
369                 case *e2ap.E2APSubscriptionResponse:
370                         trans.Release()
371                         return themsg, nil
372                 case *e2ap.E2APSubscriptionFailure:
373                         err = fmt.Errorf("SubscriptionFailure received")
374                         return nil, err
375                 default:
376                         break
377                 }
378         }
379         err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
380         xapp.Logger.Error("%s", err.Error())
381         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
382         return nil, err
383 }
384
385 //-------------------------------------------------------------------
386 //
387 //-------------------------------------------------------------------
388 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
389
390         c.CntRecvMsg++
391         c.UpdateCounter(cRestSubDelReqFromXapp)
392
393         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
394
395         restSubscription, err := c.registry.GetRESTSubscription(restSubId)
396         if err != nil {
397                 xapp.Logger.Error("%s", err.Error())
398                 if restSubscription == nil {
399                         // Subscription was not found
400                         return nil
401                 } else {
402                         if restSubscription.SubReqOngoing == true {
403                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
404                                 xapp.Logger.Error("%s", err.Error())
405                                 return err
406                         } else if restSubscription.SubDelReqOngoing == true {
407                                 // Previous request for same restSubId still ongoing
408                                 return nil
409                         }
410                 }
411         }
412
413         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
414         go func() {
415                 for _, instanceId := range restSubscription.InstanceIds {
416                         err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
417                         if err != nil {
418                                 xapp.Logger.Error("%s", err.Error())
419                                 //return err
420                         }
421                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
422                         restSubscription.DeleteInstanceId(instanceId)
423                 }
424                 c.registry.DeleteRESTSubscription(&restSubId)
425         }()
426
427         c.UpdateCounter(cRestSubDelRespToXapp)
428
429         return nil
430 }
431
432 //-------------------------------------------------------------------
433 //
434 //-------------------------------------------------------------------
435 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
436
437         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
438         if trans == nil {
439                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
440                 xapp.Logger.Error("%s", err.Error())
441         }
442         defer trans.Release()
443
444         err := c.tracker.Track(trans)
445         if err != nil {
446                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
447                 xapp.Logger.Error("%s", err.Error())
448                 return &time.ParseError{}
449         }
450
451         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
452         if err != nil {
453                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
454                 xapp.Logger.Error("%s", err.Error())
455                 return err
456         }
457         //
458         // Wake subs delete
459         //
460         go c.handleSubscriptionDelete(subs, trans)
461         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
462
463         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
464
465         c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
466
467         return nil
468 }
469
470 //-------------------------------------------------------------------
471 //
472 //-------------------------------------------------------------------
473 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
474         xapp.Logger.Info("QueryHandler() called")
475
476         c.CntRecvMsg++
477
478         return c.registry.QueryHandler()
479 }
480
481 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
482         xapp.Logger.Info("TestRestHandler() called")
483
484         pathParams := mux.Vars(r)
485         s := pathParams["testId"]
486
487         // This can be used to delete single subscription from db
488         if contains := strings.Contains(s, "deletesubid="); contains == true {
489                 var splits = strings.Split(s, "=")
490                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
491                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
492                         c.RemoveSubscriptionFromSdl(uint32(subId))
493                         return
494                 }
495         }
496
497         // This can be used to remove all subscriptions db from
498         if s == "emptydb" {
499                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
500                 c.RemoveAllSubscriptionsFromSdl()
501                 return
502         }
503
504         // This is meant to cause submgr's restart in testing
505         if s == "restart" {
506                 xapp.Logger.Info("os.Exit(1) called")
507                 os.Exit(1)
508         }
509
510         xapp.Logger.Info("Unsupported rest command received %s", s)
511 }
512
513 //-------------------------------------------------------------------
514 //
515 //-------------------------------------------------------------------
516
517 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
518         params := &xapp.RMRParams{}
519         params.Mtype = trans.GetMtype()
520         params.SubId = int(subs.GetReqId().InstanceId)
521         params.Xid = ""
522         params.Meid = subs.GetMeid()
523         params.Src = ""
524         params.PayloadLen = len(trans.Payload.Buf)
525         params.Payload = trans.Payload.Buf
526         params.Mbuf = nil
527         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
528         err = c.SendWithRetry(params, false, 5)
529         if err != nil {
530                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
531         }
532         return err
533 }
534
535 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
536
537         params := &xapp.RMRParams{}
538         params.Mtype = trans.GetMtype()
539         params.SubId = int(subs.GetReqId().InstanceId)
540         params.Xid = trans.GetXid()
541         params.Meid = trans.GetMeid()
542         params.Src = ""
543         params.PayloadLen = len(trans.Payload.Buf)
544         params.Payload = trans.Payload.Buf
545         params.Mbuf = nil
546         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
547         err = c.SendWithRetry(params, false, 5)
548         if err != nil {
549                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
550         }
551         return err
552 }
553
554 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
555         if c.RMRClient == nil {
556                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
557                 xapp.Logger.Error("%s", err.Error())
558                 return
559         }
560         c.CntRecvMsg++
561
562         defer c.RMRClient.Free(msg.Mbuf)
563
564         // xapp-frame might use direct access to c buffer and
565         // when msg.Mbuf is freed, someone might take it into use
566         // and payload data might be invalid inside message handle function
567         //
568         // subscriptions won't load system a lot so there is no
569         // real performance hit by cloning buffer into new go byte slice
570         cPay := append(msg.Payload[:0:0], msg.Payload...)
571         msg.Payload = cPay
572         msg.PayloadLen = len(cPay)
573
574         switch msg.Mtype {
575         case xapp.RIC_SUB_REQ:
576                 go c.handleXAPPSubscriptionRequest(msg)
577         case xapp.RIC_SUB_RESP:
578                 go c.handleE2TSubscriptionResponse(msg)
579         case xapp.RIC_SUB_FAILURE:
580                 go c.handleE2TSubscriptionFailure(msg)
581         case xapp.RIC_SUB_DEL_REQ:
582                 go c.handleXAPPSubscriptionDeleteRequest(msg)
583         case xapp.RIC_SUB_DEL_RESP:
584                 go c.handleE2TSubscriptionDeleteResponse(msg)
585         case xapp.RIC_SUB_DEL_FAILURE:
586                 go c.handleE2TSubscriptionDeleteFailure(msg)
587         default:
588                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
589         }
590         return
591 }
592
593 //-------------------------------------------------------------------
594 // handle from XAPP Subscription Request
595 //------------------------------------------------------------------
596 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
597         xapp.Logger.Info("MSG from XAPP: %s", params.String())
598         c.UpdateCounter(cSubReqFromXapp)
599
600         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
601         if err != nil {
602                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
603                 return
604         }
605
606         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
607         if trans == nil {
608                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
609                 return
610         }
611         defer trans.Release()
612
613         if err = c.tracker.Track(trans); err != nil {
614                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
615                 return
616         }
617
618         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
619         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
620         if err != nil {
621                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
622                 return
623         }
624
625         c.wakeSubscriptionRequest(subs, trans)
626 }
627
628 //-------------------------------------------------------------------
629 // Wake Subscription Request to E2node
630 //------------------------------------------------------------------
631 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
632
633         go c.handleSubscriptionCreate(subs, trans)
634         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
635         var err error
636         if event != nil {
637                 switch themsg := event.(type) {
638                 case *e2ap.E2APSubscriptionResponse:
639                         themsg.RequestId.Id = trans.RequestId.Id
640                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
641                         if err == nil {
642                                 trans.Release()
643                                 c.UpdateCounter(cSubRespToXapp)
644                                 c.rmrSendToXapp("", subs, trans)
645                                 return
646                         }
647                 case *e2ap.E2APSubscriptionFailure:
648                         themsg.RequestId.Id = trans.RequestId.Id
649                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
650                         if err == nil {
651                                 c.UpdateCounter(cSubFailToXapp)
652                                 c.rmrSendToXapp("", subs, trans)
653                         }
654                 default:
655                         break
656                 }
657         }
658         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
659         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
660 }
661
662 //-------------------------------------------------------------------
663 // handle from XAPP Subscription Delete Request
664 //------------------------------------------------------------------
665 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
666         xapp.Logger.Info("MSG from XAPP: %s", params.String())
667         c.UpdateCounter(cSubDelReqFromXapp)
668
669         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
670         if err != nil {
671                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
672                 return
673         }
674
675         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
676         if trans == nil {
677                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
678                 return
679         }
680         defer trans.Release()
681
682         err = c.tracker.Track(trans)
683         if err != nil {
684                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
685                 return
686         }
687
688         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
689         if err != nil {
690                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
691                 return
692         }
693
694         //
695         // Wake subs delete
696         //
697         go c.handleSubscriptionDelete(subs, trans)
698         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
699
700         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
701
702         if subs.NoRespToXapp == true {
703                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
704                 return
705         }
706
707         // Whatever is received success, fail or timeout, send successful delete response
708         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
709         subDelRespMsg.RequestId.Id = trans.RequestId.Id
710         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
711         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
712         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
713         if err == nil {
714                 c.UpdateCounter(cSubDelRespToXapp)
715                 c.rmrSendToXapp("", subs, trans)
716         }
717
718         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
719         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
720 }
721
722 //-------------------------------------------------------------------
723 // SUBS CREATE Handling
724 //-------------------------------------------------------------------
725 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
726
727         var removeSubscriptionFromDb bool = false
728         trans := c.tracker.NewSubsTransaction(subs)
729         subs.WaitTransactionTurn(trans)
730         defer subs.ReleaseTransactionTurn(trans)
731         defer trans.Release()
732
733         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
734
735         subRfMsg, valid := subs.GetCachedResponse()
736         if subRfMsg == nil && valid == true {
737                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
738                 switch event.(type) {
739                 case *e2ap.E2APSubscriptionResponse:
740                         subRfMsg, valid = subs.SetCachedResponse(event, true)
741                         subs.SubRespRcvd = true
742                 case *e2ap.E2APSubscriptionFailure:
743                         removeSubscriptionFromDb = true
744                         subRfMsg, valid = subs.SetCachedResponse(event, false)
745                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
746                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
747                 case *SubmgrRestartTestEvent:
748                         // This simulates that no response has been received and after restart subscriptions are restored from db
749                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
750                         return
751                 default:
752                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
753                         removeSubscriptionFromDb = true
754                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
755                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
756                 }
757                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
758         } else {
759                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
760         }
761
762         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
763         if valid == false {
764                 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
765         }
766
767         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
768         parentTrans.SendEvent(subRfMsg, 0)
769 }
770
771 //-------------------------------------------------------------------
772 // SUBS DELETE Handling
773 //-------------------------------------------------------------------
774
775 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
776
777         trans := c.tracker.NewSubsTransaction(subs)
778         subs.WaitTransactionTurn(trans)
779         defer subs.ReleaseTransactionTurn(trans)
780         defer trans.Release()
781
782         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
783
784         subs.mutex.Lock()
785
786         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
787                 subs.valid = false
788                 subs.mutex.Unlock()
789                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
790         } else {
791                 subs.mutex.Unlock()
792         }
793         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
794         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
795         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
796         c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
797         c.registry.UpdateSubscriptionToDb(subs, c)
798         parentTrans.SendEvent(nil, 0)
799 }
800
801 //-------------------------------------------------------------------
802 // send to E2T Subscription Request
803 //-------------------------------------------------------------------
804 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
805         var err error
806         var event interface{} = nil
807         var timedOut bool = false
808
809         subReqMsg := subs.SubReqMsg
810         subReqMsg.RequestId = subs.GetReqId().RequestId
811         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
812         if err != nil {
813                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
814                 return event
815         }
816
817         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
818         c.WriteSubscriptionToDb(subs)
819
820         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
821                 desc := fmt.Sprintf("(retry %d)", retries)
822                 if retries == 0 {
823                         c.UpdateCounter(cSubReqToE2)
824                 } else {
825                         c.UpdateCounter(cSubReReqToE2)
826                 }
827                 c.rmrSendToE2T(desc, subs, trans)
828                 if subs.DoNotWaitSubResp == false {
829                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
830                         if timedOut {
831                                 c.UpdateCounter(cSubReqTimerExpiry)
832                                 continue
833                         }
834                 } else {
835                         // Simulating case where subscrition request has been sent but response has not been received before restart
836                         event = &SubmgrRestartTestEvent{}
837                 }
838                 break
839         }
840         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
841         return event
842 }
843
844 //-------------------------------------------------------------------
845 // send to E2T Subscription Delete Request
846 //-------------------------------------------------------------------
847
848 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
849         var err error
850         var event interface{}
851         var timedOut bool
852
853         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
854         subDelReqMsg.RequestId = subs.GetReqId().RequestId
855         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
856         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
857         if err != nil {
858                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
859                 return event
860         }
861
862         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
863                 desc := fmt.Sprintf("(retry %d)", retries)
864                 if retries == 0 {
865                         c.UpdateCounter(cSubDelReqToE2)
866                 } else {
867                         c.UpdateCounter(cSubDelReReqToE2)
868                 }
869                 c.rmrSendToE2T(desc, subs, trans)
870                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
871                 if timedOut {
872                         c.UpdateCounter(cSubDelReqTimerExpiry)
873                         continue
874                 }
875                 break
876         }
877         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
878         return event
879 }
880
881 //-------------------------------------------------------------------
882 // handle from E2T Subscription Response
883 //-------------------------------------------------------------------
884 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
885         xapp.Logger.Info("MSG from E2T: %s", params.String())
886         c.UpdateCounter(cSubRespFromE2)
887
888         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
889         if err != nil {
890                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
891                 return
892         }
893         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
894         if err != nil {
895                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
896                 return
897         }
898         trans := subs.GetTransaction()
899         if trans == nil {
900                 err = fmt.Errorf("Ongoing transaction not found")
901                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
902                 return
903         }
904         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
905         if sendOk == false {
906                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
907                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
908         }
909         return
910 }
911
912 //-------------------------------------------------------------------
913 // handle from E2T Subscription Failure
914 //-------------------------------------------------------------------
915 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
916         xapp.Logger.Info("MSG from E2T: %s", params.String())
917         c.UpdateCounter(cSubFailFromE2)
918         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
919         if err != nil {
920                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
921                 return
922         }
923         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
924         if err != nil {
925                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
926                 return
927         }
928         trans := subs.GetTransaction()
929         if trans == nil {
930                 err = fmt.Errorf("Ongoing transaction not found")
931                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
932                 return
933         }
934         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
935         if sendOk == false {
936                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
937                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
938         }
939         return
940 }
941
942 //-------------------------------------------------------------------
943 // handle from E2T Subscription Delete Response
944 //-------------------------------------------------------------------
945 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
946         xapp.Logger.Info("MSG from E2T: %s", params.String())
947         c.UpdateCounter(cSubDelRespFromE2)
948         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
949         if err != nil {
950                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
951                 return
952         }
953         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
954         if err != nil {
955                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
956                 return
957         }
958         trans := subs.GetTransaction()
959         if trans == nil {
960                 err = fmt.Errorf("Ongoing transaction not found")
961                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
962                 return
963         }
964         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
965         if sendOk == false {
966                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
967                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
968         }
969         return
970 }
971
972 //-------------------------------------------------------------------
973 // handle from E2T Subscription Delete Failure
974 //-------------------------------------------------------------------
975 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
976         xapp.Logger.Info("MSG from E2T: %s", params.String())
977         c.UpdateCounter(cSubDelFailFromE2)
978         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
979         if err != nil {
980                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
981                 return
982         }
983         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
984         if err != nil {
985                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
986                 return
987         }
988         trans := subs.GetTransaction()
989         if trans == nil {
990                 err = fmt.Errorf("Ongoing transaction not found")
991                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
992                 return
993         }
994         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
995         if sendOk == false {
996                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
997                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
998         }
999         return
1000 }
1001
1002 //-------------------------------------------------------------------
1003 //
1004 //-------------------------------------------------------------------
1005 func typeofSubsMessage(v interface{}) string {
1006         if v == nil {
1007                 return "NIL"
1008         }
1009         switch v.(type) {
1010         //case *e2ap.E2APSubscriptionRequest:
1011         //      return "SubReq"
1012         case *e2ap.E2APSubscriptionResponse:
1013                 return "SubResp"
1014         case *e2ap.E2APSubscriptionFailure:
1015                 return "SubFail"
1016         //case *e2ap.E2APSubscriptionDeleteRequest:
1017         //      return "SubDelReq"
1018         case *e2ap.E2APSubscriptionDeleteResponse:
1019                 return "SubDelResp"
1020         case *e2ap.E2APSubscriptionDeleteFailure:
1021                 return "SubDelFail"
1022         default:
1023                 return "Unknown"
1024         }
1025 }
1026
1027 //-------------------------------------------------------------------
1028 //
1029 //-------------------------------------------------------------------
1030 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1031         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1032         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1033         if err != nil {
1034                 xapp.Logger.Error("%v", err)
1035         }
1036 }
1037
1038 //-------------------------------------------------------------------
1039 //
1040 //-------------------------------------------------------------------
1041 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1042
1043         if removeSubscriptionFromDb == true {
1044                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1045                 c.RemoveSubscriptionFromDb(subs)
1046         } else {
1047                 // Update is needed for successful response and merge case here
1048                 if subs.RetryFromXapp == false {
1049                         c.WriteSubscriptionToDb(subs)
1050                 }
1051         }
1052         subs.RetryFromXapp = false
1053 }
1054
1055 //-------------------------------------------------------------------
1056 //
1057 //-------------------------------------------------------------------
1058 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1059         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1060         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1061         if err != nil {
1062                 xapp.Logger.Error("%v", err)
1063         }
1064 }
1065
1066 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1067
1068         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1069
1070         // Send delete for every endpoint in the subscription
1071         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1072         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1073         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1074         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1075         if err != nil {
1076                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1077                 return
1078         }
1079         for _, endPoint := range subs.EpList.Endpoints {
1080                 params := &xapp.RMRParams{}
1081                 params.Mtype = mType
1082                 params.SubId = int(subs.GetReqId().InstanceId)
1083                 params.Xid = ""
1084                 params.Meid = subs.Meid
1085                 params.Src = endPoint.String()
1086                 params.PayloadLen = len(payload.Buf)
1087                 params.Payload = payload.Buf
1088                 params.Mbuf = nil
1089                 subs.DeleteFromDb = true
1090                 c.handleXAPPSubscriptionDeleteRequest(params)
1091         }
1092 }