Xapp-frame, v0.8.1 Rest Subscription Creation /Query /Deletion
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/segmentio/ksuid"
38         "github.com/spf13/viper"
39 )
40
41 //-----------------------------------------------------------------------------
42 //
43 //-----------------------------------------------------------------------------
44
45 func idstring(err error, entries ...fmt.Stringer) string {
46         var retval string = ""
47         var filler string = ""
48         for _, entry := range entries {
49                 retval += filler + entry.String()
50                 filler = " "
51         }
52         if err != nil {
53                 retval += filler + "err(" + err.Error() + ")"
54                 filler = " "
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80 }
81
82 type RMRMeid struct {
83         PlmnID  string
84         EnbID   string
85         RanName string
86 }
87
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
90
91 func init() {
92         xapp.Logger.Info("SUBMGR")
93         viper.AutomaticEnv()
94         viper.SetEnvPrefix("submgr")
95         viper.AllowEmptyEnv(true)
96 }
97
98 func NewControl() *Control {
99
100         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102
103         registry := new(Registry)
104         registry.Initialize()
105         registry.rtmgrClient = &rtmgrClient
106
107         tracker := new(Tracker)
108         tracker.Init()
109
110         c := &Control{e2ap: new(E2ap),
111                 registry: registry,
112                 tracker:  tracker,
113                 db:       CreateSdl(),
114                 //subscriber: subscriber,
115                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
116         }
117         c.ReadConfigParameters("")
118
119         // Register REST handler for testing support
120         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
121         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
122
123         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
124
125         if readSubsFromDb == "false" {
126                 return c
127         }
128
129         // Read subscriptions from db
130         xapp.Logger.Info("Reading subscriptions from db")
131         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
132         if err != nil {
133                 xapp.Logger.Error("%v", err)
134         } else {
135                 c.registry.subIds = subIds
136                 c.registry.register = register
137                 c.HandleUncompletedSubscriptions(register)
138         }
139         return c
140 }
141
142 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
143         subscriptions, _ := c.registry.QueryHandler()
144         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
145 }
146
147 //-------------------------------------------------------------------
148 //
149 //-------------------------------------------------------------------
150 func (c *Control) ReadConfigParameters(f string) {
151
152         // viper.GetDuration returns nanoseconds
153         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
154         if e2tSubReqTimeout == 0 {
155                 e2tSubReqTimeout = 2000 * 1000000
156         }
157         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
158         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
159         if e2tSubDelReqTime == 0 {
160                 e2tSubDelReqTime = 2000 * 1000000
161         }
162         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
163         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
164         if e2tRecvMsgTimeout == 0 {
165                 e2tRecvMsgTimeout = 2000 * 1000000
166         }
167         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
168         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
169         if e2tMaxSubReqTryCount == 0 {
170                 e2tMaxSubReqTryCount = 1
171         }
172         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
173         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
174         if e2tMaxSubDelReqTryCount == 0 {
175                 e2tMaxSubDelReqTryCount = 1
176         }
177         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
178
179         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
180         if readSubsFromDb == "" {
181                 readSubsFromDb = "true"
182         }
183         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
184 }
185
186 //-------------------------------------------------------------------
187 //
188 //-------------------------------------------------------------------
189 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
190
191         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
192         for subId, subs := range register {
193                 if subs.SubRespRcvd == false {
194                         subs.NoRespToXapp = true
195                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
196                         c.SendSubscriptionDeleteReq(subs)
197                 }
198         }
199 }
200
201 func (c *Control) ReadyCB(data interface{}) {
202         if c.RMRClient == nil {
203                 c.RMRClient = xapp.Rmr
204         }
205 }
206
207 func (c *Control) Run() {
208         xapp.SetReadyCB(c.ReadyCB, nil)
209         xapp.AddConfigChangeListener(c.ReadConfigParameters)
210         xapp.Run(c)
211 }
212
213 //-------------------------------------------------------------------
214 //
215 //-------------------------------------------------------------------
216 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
217
218         restSubId := ksuid.New().String()
219         subResp := models.SubscriptionResponse{}
220         subResp.SubscriptionID = &restSubId
221         p := params.(*models.SubscriptionParams)
222
223         c.CntRecvMsg++
224
225         c.UpdateCounter(cSubReqFromXapp)
226
227         if p.ClientEndpoint == nil {
228                 xapp.Logger.Error("ClientEndpoint == nil")
229                 return nil, fmt.Errorf("")
230         }
231
232         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
233         if err != nil {
234                 xapp.Logger.Error("%s", err.Error())
235                 return nil, err
236         }
237
238         restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
239         if err != nil {
240                 xapp.Logger.Error("%s", err.Error())
241                 return nil, err
242         }
243
244         subReqList := e2ap.SubscriptionRequestList{}
245         err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
246         if err != nil {
247                 xapp.Logger.Error("%s", err.Error())
248                 c.registry.DeleteRESTSubscription(&restSubId)
249                 return nil, err
250         }
251
252         go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
253
254         return &subResp, nil
255
256 }
257
258 //-------------------------------------------------------------------
259 //
260 //-------------------------------------------------------------------
261
262 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
263         clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
264
265         xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
266
267         _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
268         if err != nil {
269                 xapp.Logger.Error("%s", err.Error())
270                 return
271         }
272
273         var requestorID int64
274         var instanceId int64
275         for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
276                 subReqMsg := subReqList.E2APSubscriptionRequests[index]
277
278                 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
279                 if trans == nil {
280                         c.registry.DeleteRESTSubscription(restSubId)
281                         xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
282                         return
283                 }
284
285                 defer trans.Release()
286                 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
287                 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
288                 if err != nil {
289                         // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
290                         // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
291                         requestorID = (int64)(0)
292                         instanceId = (int64)(0)
293                         resp := &models.SubscriptionResponse{
294                                 SubscriptionID: restSubId,
295                                 SubscriptionInstances: []*models.SubscriptionInstance{
296                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
297                                 },
298                         }
299                         // Mark REST subscription request processed.
300                         restSubscription.SetProcessed()
301                         xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
302                         xapp.Subscription.Notify(resp, *clientEndpoint)
303                 } else {
304                         xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
305
306                         // Store successfully processed InstanceId for deletion
307                         restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
308
309                         // Send notification to xApp that a Subscription Request has been processed.
310                         requestorID = (int64)(subRespMsg.RequestId.Id)
311                         instanceId = (int64)(subRespMsg.RequestId.InstanceId)
312                         resp := &models.SubscriptionResponse{
313                                 SubscriptionID: restSubId,
314                                 SubscriptionInstances: []*models.SubscriptionInstance{
315                                         &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
316                                 },
317                         }
318                         // Mark REST subscription request processesd.
319                         restSubscription.SetProcessed()
320                         xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
321                         xapp.Subscription.Notify(resp, *clientEndpoint)
322                 }
323                 c.UpdateCounter(cSubRespToXapp)
324         }
325 }
326
327 //-------------------------------------------------------------------
328 //
329 //------------------------------------------------------------------
330 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
331         restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
332
333         err := c.tracker.Track(trans)
334         if err != nil {
335                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
336                 xapp.Logger.Error("%s", err.Error())
337                 return nil, err
338         }
339
340         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
341         if err != nil {
342                 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
343                 xapp.Logger.Error("%s", err.Error())
344                 return nil, err
345         }
346
347         //
348         // Wake subs request
349         //
350         go c.handleSubscriptionCreate(subs, trans)
351         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
352
353         err = nil
354         if event != nil {
355                 switch themsg := event.(type) {
356                 case *e2ap.E2APSubscriptionResponse:
357                         trans.Release()
358                         return themsg, nil
359                 case *e2ap.E2APSubscriptionFailure:
360                         err = fmt.Errorf("SubscriptionFailure received")
361                         return nil, err
362                 default:
363                         break
364                 }
365         }
366         err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
367         xapp.Logger.Error("%s", err.Error())
368         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
369         return nil, err
370 }
371
372 //-------------------------------------------------------------------
373 //
374 //-------------------------------------------------------------------
375 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
376
377         c.CntRecvMsg++
378         c.UpdateCounter(cSubDelReqFromXapp)
379
380         xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
381
382         restSubscription, err := c.registry.GetRESTSubscription(restSubId)
383         if err != nil {
384                 xapp.Logger.Error("%s", err.Error())
385                 if restSubscription == nil {
386                         // Subscription was not found
387                         return nil
388                 } else {
389                         if restSubscription.SubReqOngoing == true {
390                                 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
391                                 xapp.Logger.Error("%s", err.Error())
392                                 return err
393                         } else if restSubscription.SubDelReqOngoing == true {
394                                 // Previous request for same restSubId still ongoing
395                                 return nil
396                         }
397                 }
398         }
399
400         xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
401         go func() {
402                 for _, instanceId := range restSubscription.InstanceIds {
403                         err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
404                         if err != nil {
405                                 xapp.Logger.Error("%s", err.Error())
406                                 //return err
407                         }
408                         xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
409                         restSubscription.DeleteInstanceId(instanceId)
410                 }
411                 c.registry.DeleteRESTSubscription(&restSubId)
412         }()
413
414         c.UpdateCounter(cSubDelRespToXapp)
415
416         return nil
417 }
418
419 //-------------------------------------------------------------------
420 //
421 //-------------------------------------------------------------------
422 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
423
424         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
425         if trans == nil {
426                 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
427                 xapp.Logger.Error("%s", err.Error())
428         }
429         defer trans.Release()
430
431         err := c.tracker.Track(trans)
432         if err != nil {
433                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
434                 xapp.Logger.Error("%s", err.Error())
435                 return &time.ParseError{}
436         }
437
438         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
439         if err != nil {
440                 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
441                 xapp.Logger.Error("%s", err.Error())
442                 return err
443         }
444         //
445         // Wake subs delete
446         //
447         go c.handleSubscriptionDelete(subs, trans)
448         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
449
450         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
451
452         c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
453
454         return nil
455 }
456
457 //-------------------------------------------------------------------
458 //
459 //-------------------------------------------------------------------
460 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
461         xapp.Logger.Info("QueryHandler() called")
462
463         c.CntRecvMsg++
464
465         return c.registry.QueryHandler()
466 }
467
468 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
469         xapp.Logger.Info("TestRestHandler() called")
470
471         pathParams := mux.Vars(r)
472         s := pathParams["testId"]
473
474         // This can be used to delete single subscription from db
475         if contains := strings.Contains(s, "deletesubid="); contains == true {
476                 var splits = strings.Split(s, "=")
477                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
478                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
479                         c.RemoveSubscriptionFromSdl(uint32(subId))
480                         return
481                 }
482         }
483
484         // This can be used to remove all subscriptions db from
485         if s == "emptydb" {
486                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
487                 c.RemoveAllSubscriptionsFromSdl()
488                 return
489         }
490
491         // This is meant to cause submgr's restart in testing
492         if s == "restart" {
493                 xapp.Logger.Info("os.Exit(1) called")
494                 os.Exit(1)
495         }
496
497         xapp.Logger.Info("Unsupported rest command received %s", s)
498 }
499
500 //-------------------------------------------------------------------
501 //
502 //-------------------------------------------------------------------
503
504 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
505         params := &xapp.RMRParams{}
506         params.Mtype = trans.GetMtype()
507         params.SubId = int(subs.GetReqId().InstanceId)
508         params.Xid = ""
509         params.Meid = subs.GetMeid()
510         params.Src = ""
511         params.PayloadLen = len(trans.Payload.Buf)
512         params.Payload = trans.Payload.Buf
513         params.Mbuf = nil
514         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
515         err = c.SendWithRetry(params, false, 5)
516         if err != nil {
517                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
518         }
519         return err
520 }
521
522 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
523
524         params := &xapp.RMRParams{}
525         params.Mtype = trans.GetMtype()
526         params.SubId = int(subs.GetReqId().InstanceId)
527         params.Xid = trans.GetXid()
528         params.Meid = trans.GetMeid()
529         params.Src = ""
530         params.PayloadLen = len(trans.Payload.Buf)
531         params.Payload = trans.Payload.Buf
532         params.Mbuf = nil
533         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
534         err = c.SendWithRetry(params, false, 5)
535         if err != nil {
536                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
537         }
538         return err
539 }
540
541 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
542         if c.RMRClient == nil {
543                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
544                 xapp.Logger.Error("%s", err.Error())
545                 return
546         }
547         c.CntRecvMsg++
548
549         defer c.RMRClient.Free(msg.Mbuf)
550
551         // xapp-frame might use direct access to c buffer and
552         // when msg.Mbuf is freed, someone might take it into use
553         // and payload data might be invalid inside message handle function
554         //
555         // subscriptions won't load system a lot so there is no
556         // real performance hit by cloning buffer into new go byte slice
557         cPay := append(msg.Payload[:0:0], msg.Payload...)
558         msg.Payload = cPay
559         msg.PayloadLen = len(cPay)
560
561         switch msg.Mtype {
562         case xapp.RIC_SUB_REQ:
563                 go c.handleXAPPSubscriptionRequest(msg)
564         case xapp.RIC_SUB_RESP:
565                 go c.handleE2TSubscriptionResponse(msg)
566         case xapp.RIC_SUB_FAILURE:
567                 go c.handleE2TSubscriptionFailure(msg)
568         case xapp.RIC_SUB_DEL_REQ:
569                 go c.handleXAPPSubscriptionDeleteRequest(msg)
570         case xapp.RIC_SUB_DEL_RESP:
571                 go c.handleE2TSubscriptionDeleteResponse(msg)
572         case xapp.RIC_SUB_DEL_FAILURE:
573                 go c.handleE2TSubscriptionDeleteFailure(msg)
574         default:
575                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
576         }
577         return
578 }
579
580 //-------------------------------------------------------------------
581 // handle from XAPP Subscription Request
582 //------------------------------------------------------------------
583 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
584         xapp.Logger.Info("MSG from XAPP: %s", params.String())
585         c.UpdateCounter(cSubReqFromXapp)
586
587         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
588         if err != nil {
589                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
590                 return
591         }
592
593         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
594         if trans == nil {
595                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
596                 return
597         }
598         defer trans.Release()
599
600         if err = c.tracker.Track(trans); err != nil {
601                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
602                 return
603         }
604
605         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
606         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
607         if err != nil {
608                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
609                 return
610         }
611
612         c.wakeSubscriptionRequest(subs, trans)
613 }
614
615 //-------------------------------------------------------------------
616 // Wake Subscription Request to E2node
617 //------------------------------------------------------------------
618 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
619
620         go c.handleSubscriptionCreate(subs, trans)
621         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
622         var err error
623         if event != nil {
624                 switch themsg := event.(type) {
625                 case *e2ap.E2APSubscriptionResponse:
626                         themsg.RequestId.Id = trans.RequestId.Id
627                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
628                         if err == nil {
629                                 trans.Release()
630                                 c.UpdateCounter(cSubRespToXapp)
631                                 c.rmrSendToXapp("", subs, trans)
632                                 return
633                         }
634                 case *e2ap.E2APSubscriptionFailure:
635                         themsg.RequestId.Id = trans.RequestId.Id
636                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
637                         if err == nil {
638                                 c.UpdateCounter(cSubFailToXapp)
639                                 c.rmrSendToXapp("", subs, trans)
640                         }
641                 default:
642                         break
643                 }
644         }
645         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
646         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
647 }
648
649 //-------------------------------------------------------------------
650 // handle from XAPP Subscription Delete Request
651 //------------------------------------------------------------------
652 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
653         xapp.Logger.Info("MSG from XAPP: %s", params.String())
654         c.UpdateCounter(cSubDelReqFromXapp)
655
656         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
657         if err != nil {
658                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
659                 return
660         }
661
662         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
663         if trans == nil {
664                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
665                 return
666         }
667         defer trans.Release()
668
669         err = c.tracker.Track(trans)
670         if err != nil {
671                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
672                 return
673         }
674
675         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
676         if err != nil {
677                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
678                 return
679         }
680
681         //
682         // Wake subs delete
683         //
684         go c.handleSubscriptionDelete(subs, trans)
685         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
686
687         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
688
689         if subs.NoRespToXapp == true {
690                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
691                 return
692         }
693
694         // Whatever is received success, fail or timeout, send successful delete response
695         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
696         subDelRespMsg.RequestId.Id = trans.RequestId.Id
697         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
698         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
699         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
700         if err == nil {
701                 c.UpdateCounter(cSubDelRespToXapp)
702                 c.rmrSendToXapp("", subs, trans)
703         }
704
705         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
706         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
707 }
708
709 //-------------------------------------------------------------------
710 // SUBS CREATE Handling
711 //-------------------------------------------------------------------
712 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
713
714         var removeSubscriptionFromDb bool = false
715         trans := c.tracker.NewSubsTransaction(subs)
716         subs.WaitTransactionTurn(trans)
717         defer subs.ReleaseTransactionTurn(trans)
718         defer trans.Release()
719
720         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
721
722         subRfMsg, valid := subs.GetCachedResponse()
723         if subRfMsg == nil && valid == true {
724                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
725                 switch event.(type) {
726                 case *e2ap.E2APSubscriptionResponse:
727                         subRfMsg, valid = subs.SetCachedResponse(event, true)
728                         subs.SubRespRcvd = true
729                 case *e2ap.E2APSubscriptionFailure:
730                         removeSubscriptionFromDb = true
731                         subRfMsg, valid = subs.SetCachedResponse(event, false)
732                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
733                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
734                 case *SubmgrRestartTestEvent:
735                         // This simulates that no response has been received and after restart subscriptions are restored from db
736                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
737                         return
738                 default:
739                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
740                         removeSubscriptionFromDb = true
741                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
742                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
743                 }
744                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
745         } else {
746                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
747         }
748
749         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
750         if valid == false {
751                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
752         }
753
754         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
755         parentTrans.SendEvent(subRfMsg, 0)
756 }
757
758 //-------------------------------------------------------------------
759 // SUBS DELETE Handling
760 //-------------------------------------------------------------------
761
762 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
763
764         trans := c.tracker.NewSubsTransaction(subs)
765         subs.WaitTransactionTurn(trans)
766         defer subs.ReleaseTransactionTurn(trans)
767         defer trans.Release()
768
769         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
770
771         subs.mutex.Lock()
772
773         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
774                 subs.valid = false
775                 subs.mutex.Unlock()
776                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
777         } else {
778                 subs.mutex.Unlock()
779         }
780         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
781         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
782         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
783         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
784         c.registry.UpdateSubscriptionToDb(subs, c)
785         parentTrans.SendEvent(nil, 0)
786 }
787
788 //-------------------------------------------------------------------
789 // send to E2T Subscription Request
790 //-------------------------------------------------------------------
791 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
792         var err error
793         var event interface{} = nil
794         var timedOut bool = false
795
796         subReqMsg := subs.SubReqMsg
797         subReqMsg.RequestId = subs.GetReqId().RequestId
798         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
799         if err != nil {
800                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
801                 return event
802         }
803
804         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
805         c.WriteSubscriptionToDb(subs)
806         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
807                 desc := fmt.Sprintf("(retry %d)", retries)
808                 if retries == 0 {
809                         c.UpdateCounter(cSubReqToE2)
810                 } else {
811                         c.UpdateCounter(cSubReReqToE2)
812                 }
813                 c.rmrSendToE2T(desc, subs, trans)
814                 if subs.DoNotWaitSubResp == false {
815                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
816                         if timedOut {
817                                 c.UpdateCounter(cSubReqTimerExpiry)
818                                 continue
819                         }
820                 } else {
821                         // Simulating case where subscrition request has been sent but response has not been received before restart
822                         event = &SubmgrRestartTestEvent{}
823                 }
824                 break
825         }
826         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
827         return event
828 }
829
830 //-------------------------------------------------------------------
831 // send to E2T Subscription Delete Request
832 //-------------------------------------------------------------------
833
834 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
835         var err error
836         var event interface{}
837         var timedOut bool
838
839         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
840         subDelReqMsg.RequestId = subs.GetReqId().RequestId
841         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
842         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
843         if err != nil {
844                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
845                 return event
846         }
847
848         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
849                 desc := fmt.Sprintf("(retry %d)", retries)
850                 if retries == 0 {
851                         c.UpdateCounter(cSubDelReqToE2)
852                 } else {
853                         c.UpdateCounter(cSubDelReReqToE2)
854                 }
855                 c.rmrSendToE2T(desc, subs, trans)
856                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
857                 if timedOut {
858                         c.UpdateCounter(cSubDelReqTimerExpiry)
859                         continue
860                 }
861                 break
862         }
863         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
864         return event
865 }
866
867 //-------------------------------------------------------------------
868 // handle from E2T Subscription Response
869 //-------------------------------------------------------------------
870 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
871         xapp.Logger.Info("MSG from E2T: %s", params.String())
872         c.UpdateCounter(cSubRespFromE2)
873         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
874         if err != nil {
875                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
876                 return
877         }
878         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
879         if err != nil {
880                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
881                 return
882         }
883         trans := subs.GetTransaction()
884         if trans == nil {
885                 err = fmt.Errorf("Ongoing transaction not found")
886                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
887                 return
888         }
889         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
890         if sendOk == false {
891                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
892                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
893         }
894         return
895 }
896
897 //-------------------------------------------------------------------
898 // handle from E2T Subscription Failure
899 //-------------------------------------------------------------------
900 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
901         xapp.Logger.Info("MSG from E2T: %s", params.String())
902         c.UpdateCounter(cSubFailFromE2)
903         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
904         if err != nil {
905                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
906                 return
907         }
908         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
909         if err != nil {
910                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
911                 return
912         }
913         trans := subs.GetTransaction()
914         if trans == nil {
915                 err = fmt.Errorf("Ongoing transaction not found")
916                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
917                 return
918         }
919         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
920         if sendOk == false {
921                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
922                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
923         }
924         return
925 }
926
927 //-------------------------------------------------------------------
928 // handle from E2T Subscription Delete Response
929 //-------------------------------------------------------------------
930 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
931         xapp.Logger.Info("MSG from E2T: %s", params.String())
932         c.UpdateCounter(cSubDelRespFromE2)
933         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
934         if err != nil {
935                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
936                 return
937         }
938         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
939         if err != nil {
940                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
941                 return
942         }
943         trans := subs.GetTransaction()
944         if trans == nil {
945                 err = fmt.Errorf("Ongoing transaction not found")
946                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
947                 return
948         }
949         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
950         if sendOk == false {
951                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
952                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
953         }
954         return
955 }
956
957 //-------------------------------------------------------------------
958 // handle from E2T Subscription Delete Failure
959 //-------------------------------------------------------------------
960 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
961         xapp.Logger.Info("MSG from E2T: %s", params.String())
962         c.UpdateCounter(cSubDelFailFromE2)
963         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
964         if err != nil {
965                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
966                 return
967         }
968         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
969         if err != nil {
970                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
971                 return
972         }
973         trans := subs.GetTransaction()
974         if trans == nil {
975                 err = fmt.Errorf("Ongoing transaction not found")
976                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
977                 return
978         }
979         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
980         if sendOk == false {
981                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
982                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
983         }
984         return
985 }
986
987 //-------------------------------------------------------------------
988 //
989 //-------------------------------------------------------------------
990 func typeofSubsMessage(v interface{}) string {
991         if v == nil {
992                 return "NIL"
993         }
994         switch v.(type) {
995         case *e2ap.E2APSubscriptionRequest:
996                 return "SubReq"
997         case *e2ap.E2APSubscriptionResponse:
998                 return "SubResp"
999         case *e2ap.E2APSubscriptionFailure:
1000                 return "SubFail"
1001         case *e2ap.E2APSubscriptionDeleteRequest:
1002                 return "SubDelReq"
1003         case *e2ap.E2APSubscriptionDeleteResponse:
1004                 return "SubDelResp"
1005         case *e2ap.E2APSubscriptionDeleteFailure:
1006                 return "SubDelFail"
1007         default:
1008                 return "Unknown"
1009         }
1010 }
1011
1012 //-------------------------------------------------------------------
1013 //
1014 //-------------------------------------------------------------------
1015 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1016         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1017         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1018         if err != nil {
1019                 xapp.Logger.Error("%v", err)
1020         }
1021 }
1022
1023 //-------------------------------------------------------------------
1024 //
1025 //-------------------------------------------------------------------
1026 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1027
1028         if removeSubscriptionFromDb == true {
1029                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1030                 c.RemoveSubscriptionFromDb(subs)
1031         } else {
1032                 // Update is needed for successful response and merge case here
1033                 if subs.RetryFromXapp == false {
1034                         c.WriteSubscriptionToDb(subs)
1035                 }
1036         }
1037         subs.RetryFromXapp = false
1038 }
1039
1040 //-------------------------------------------------------------------
1041 //
1042 //-------------------------------------------------------------------
1043 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1044         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1045         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1046         if err != nil {
1047                 xapp.Logger.Error("%v", err)
1048         }
1049 }
1050
1051 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1052
1053         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1054
1055         // Send delete for every endpoint in the subscription
1056         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1057         subDelReqMsg.RequestId = subs.GetReqId().RequestId
1058         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1059         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1060         if err != nil {
1061                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1062                 return
1063         }
1064         for _, endPoint := range subs.EpList.Endpoints {
1065                 params := &xapp.RMRParams{}
1066                 params.Mtype = mType
1067                 params.SubId = int(subs.GetReqId().InstanceId)
1068                 params.Xid = ""
1069                 params.Meid = subs.Meid
1070                 params.Src = endPoint.String()
1071                 params.PayloadLen = len(payload.Buf)
1072                 params.Payload = payload.Buf
1073                 params.Mbuf = nil
1074                 subs.DeleteFromDb = true
1075                 c.handleXAPPSubscriptionDeleteRequest(params)
1076         }
1077 }