Fix for RequestorId returned to xApp
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80 }
81
82 type RMRMeid struct {
83         PlmnID  string
84         EnbID   string
85         RanName string
86 }
87
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
90
91 func init() {
92         xapp.Logger.Info("SUBMGR")
93         viper.AutomaticEnv()
94         viper.SetEnvPrefix("submgr")
95         viper.AllowEmptyEnv(true)
96 }
97
98 func NewControl() *Control {
99
100         ReadConfigParameters()
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112
113         c := &Control{e2ap: new(E2ap),
114                 registry: registry,
115                 tracker:  tracker,
116                 db:       CreateSdl(),
117                 //subscriber: subscriber,
118                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119         }
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         // Read subscriptions from db
132         xapp.Logger.Info("Reading subscriptions from db")
133         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134         if err != nil {
135                 xapp.Logger.Error("%v", err)
136         } else {
137                 c.registry.subIds = subIds
138                 c.registry.register = register
139                 c.HandleUncompletedSubscriptions(register)
140         }
141         return c
142 }
143
144 //-------------------------------------------------------------------
145 //
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
148
149         // viper.GetDuration returns nanoseconds
150         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151         if e2tSubReqTimeout == 0 {
152                 e2tSubReqTimeout = 2000 * 1000000
153         }
154         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156         if e2tSubDelReqTime == 0 {
157                 e2tSubDelReqTime = 2000 * 1000000
158         }
159         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161         if e2tRecvMsgTimeout == 0 {
162                 e2tRecvMsgTimeout = 2000 * 1000000
163         }
164         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166         if e2tMaxSubReqTryCount == 0 {
167                 e2tMaxSubReqTryCount = 1
168         }
169         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171         if e2tMaxSubDelReqTryCount == 0 {
172                 e2tMaxSubDelReqTryCount = 1
173         }
174         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
175
176         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177         if readSubsFromDb == "" {
178                 readSubsFromDb = "true"
179         }
180         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 }
182
183 //-------------------------------------------------------------------
184 //
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
187
188         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189         for subId, subs := range register {
190                 if subs.SubRespRcvd == false {
191                         subs.NoRespToXapp = true
192                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193                         c.SendSubscriptionDeleteReq(subs)
194                 }
195         }
196 }
197
198 func (c *Control) ReadyCB(data interface{}) {
199         if c.RMRClient == nil {
200                 c.RMRClient = xapp.Rmr
201         }
202 }
203
204 func (c *Control) Run() {
205         xapp.SetReadyCB(c.ReadyCB, nil)
206         xapp.Run(c)
207 }
208
209 //-------------------------------------------------------------------
210 //
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
213         /*
214            switch p := params.(type) {
215            case *models.ReportParams:
216                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
217                if trans == nil {
218                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219                      return
220                }
221                defer trans.Release()
222            case *models.ControlParams:
223            case *models.PolicyParams:
224            }
225         */
226         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 }
228
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
230         return nil
231 }
232
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234         xapp.Logger.Info("QueryHandler() called")
235
236         return c.registry.QueryHandler()
237 }
238
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240         xapp.Logger.Info("TestRestHandler() called")
241
242         pathParams := mux.Vars(r)
243         s := pathParams["testId"]
244
245         // This can be used to delete single subscription from db
246         if contains := strings.Contains(s, "deletesubid="); contains == true {
247                 var splits = strings.Split(s, "=")
248                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250                         c.RemoveSubscriptionFromSdl(uint32(subId))
251                         return
252                 }
253         }
254
255         // This can be used to remove all subscriptions db from
256         if s == "emptydb" {
257                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258                 c.RemoveAllSubscriptionsFromSdl()
259                 return
260         }
261
262         // This is meant to cause submgr's restart in testing
263         if s == "restart" {
264                 xapp.Logger.Info("os.Exit(1) called")
265                 os.Exit(1)
266         }
267
268         xapp.Logger.Info("Unsupported rest command received %s", s)
269 }
270
271 //-------------------------------------------------------------------
272 //
273 //-------------------------------------------------------------------
274
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276         params := &xapp.RMRParams{}
277         params.Mtype = trans.GetMtype()
278         params.SubId = int(subs.GetReqId().InstanceId)
279         params.Xid = ""
280         params.Meid = subs.GetMeid()
281         params.Src = ""
282         params.PayloadLen = len(trans.Payload.Buf)
283         params.Payload = trans.Payload.Buf
284         params.Mbuf = nil
285         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286         return c.SendWithRetry(params, false, 5)
287 }
288
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
290
291         params := &xapp.RMRParams{}
292         params.Mtype = trans.GetMtype()
293         params.SubId = int(subs.GetReqId().InstanceId)
294         params.Xid = trans.GetXid()
295         params.Meid = trans.GetMeid()
296         params.Src = ""
297         params.PayloadLen = len(trans.Payload.Buf)
298         params.Payload = trans.Payload.Buf
299         params.Mbuf = nil
300         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301         return c.SendWithRetry(params, false, 5)
302 }
303
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305         if c.RMRClient == nil {
306                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307                 xapp.Logger.Error("%s", err.Error())
308                 return
309         }
310         c.CntRecvMsg++
311
312         defer c.RMRClient.Free(msg.Mbuf)
313
314         // xapp-frame might use direct access to c buffer and
315         // when msg.Mbuf is freed, someone might take it into use
316         // and payload data might be invalid inside message handle function
317         //
318         // subscriptions won't load system a lot so there is no
319         // real performance hit by cloning buffer into new go byte slice
320         cPay := append(msg.Payload[:0:0], msg.Payload...)
321         msg.Payload = cPay
322         msg.PayloadLen = len(cPay)
323
324         switch msg.Mtype {
325         case xapp.RIC_SUB_REQ:
326                 go c.handleXAPPSubscriptionRequest(msg)
327         case xapp.RIC_SUB_RESP:
328                 go c.handleE2TSubscriptionResponse(msg)
329         case xapp.RIC_SUB_FAILURE:
330                 go c.handleE2TSubscriptionFailure(msg)
331         case xapp.RIC_SUB_DEL_REQ:
332                 go c.handleXAPPSubscriptionDeleteRequest(msg)
333         case xapp.RIC_SUB_DEL_RESP:
334                 go c.handleE2TSubscriptionDeleteResponse(msg)
335         case xapp.RIC_SUB_DEL_FAILURE:
336                 go c.handleE2TSubscriptionDeleteFailure(msg)
337         default:
338                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
339         }
340         return
341 }
342
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347         xapp.Logger.Info("MSG from XAPP: %s", params.String())
348         c.UpdateCounter(cSubReqFromXapp)
349
350         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
351         if err != nil {
352                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
353                 return
354         }
355
356         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
357         if trans == nil {
358                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
359                 return
360         }
361         defer trans.Release()
362
363         err = c.tracker.Track(trans)
364         if err != nil {
365                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
366                 return
367         }
368
369         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
370         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
371         if err != nil {
372                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
373                 return
374         }
375
376         //
377         // Wake subs request
378         //
379         go c.handleSubscriptionCreate(subs, trans)
380         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
381         err = nil
382         if event != nil {
383                 switch themsg := event.(type) {
384                 case *e2ap.E2APSubscriptionResponse:
385                         themsg.RequestId.Id = trans.RequestId.Id
386                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
387                         if err == nil {
388                                 trans.Release()
389                                 c.UpdateCounter(cSubRespToXapp)
390                                 c.rmrSendToXapp("", subs, trans)
391                                 return
392                         }
393                 case *e2ap.E2APSubscriptionFailure:
394                         themsg.RequestId.Id = trans.RequestId.Id
395                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
396                         if err == nil {
397                                 c.UpdateCounter(cSubFailToXapp)
398                                 c.rmrSendToXapp("", subs, trans)
399                         }
400                 default:
401                         break
402                 }
403         }
404         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
405         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
406 }
407
408 //-------------------------------------------------------------------
409 // handle from XAPP Subscription Delete Request
410 //------------------------------------------------------------------
411 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
412         xapp.Logger.Info("MSG from XAPP: %s", params.String())
413         c.UpdateCounter(cSubDelReqFromXapp)
414
415         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
416         if err != nil {
417                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
418                 return
419         }
420
421         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
422         if trans == nil {
423                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
424                 return
425         }
426         defer trans.Release()
427
428         err = c.tracker.Track(trans)
429         if err != nil {
430                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
431                 return
432         }
433
434         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
435         if err != nil {
436                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
437                 return
438         }
439
440         //
441         // Wake subs delete
442         //
443         go c.handleSubscriptionDelete(subs, trans)
444         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
445
446         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
447
448         if subs.NoRespToXapp == true {
449                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
450                 return
451         }
452
453         // Whatever is received success, fail or timeout, send successful delete response
454         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
455         subDelRespMsg.RequestId.Id = trans.RequestId.Id
456         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
457         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
458         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
459         if err == nil {
460                 c.UpdateCounter(cSubDelRespToXapp)
461                 c.rmrSendToXapp("", subs, trans)
462         }
463
464         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
465         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
466 }
467
468 //-------------------------------------------------------------------
469 // SUBS CREATE Handling
470 //-------------------------------------------------------------------
471 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
472
473         var removeSubscriptionFromDb bool = false
474         trans := c.tracker.NewSubsTransaction(subs)
475         subs.WaitTransactionTurn(trans)
476         defer subs.ReleaseTransactionTurn(trans)
477         defer trans.Release()
478
479         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
480
481         subRfMsg, valid := subs.GetCachedResponse()
482         if subRfMsg == nil && valid == true {
483                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
484                 switch event.(type) {
485                 case *e2ap.E2APSubscriptionResponse:
486                         subRfMsg, valid = subs.SetCachedResponse(event, true)
487                         subs.SubRespRcvd = true
488                 case *e2ap.E2APSubscriptionFailure:
489                         removeSubscriptionFromDb = true
490                         subRfMsg, valid = subs.SetCachedResponse(event, false)
491                         xapp.Logger.Info("SUBS-SubReq: internal delete  due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
492                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
493                 case *SubmgrRestartTestEvent:
494                         // This simulates that no response has been received and after restart subscriptions are restored from db
495                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
496                         return
497                 default:
498                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
499                         removeSubscriptionFromDb = true
500                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
501                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
502                 }
503                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
504         } else {
505                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
506         }
507
508         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
509         if valid == false {
510                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
511         }
512
513         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
514         parentTrans.SendEvent(subRfMsg, 0)
515 }
516
517 //-------------------------------------------------------------------
518 // SUBS DELETE Handling
519 //-------------------------------------------------------------------
520
521 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
522
523         trans := c.tracker.NewSubsTransaction(subs)
524         subs.WaitTransactionTurn(trans)
525         defer subs.ReleaseTransactionTurn(trans)
526         defer trans.Release()
527
528         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
529
530         subs.mutex.Lock()
531
532         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
533                 subs.valid = false
534                 subs.mutex.Unlock()
535                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
536         } else {
537                 subs.mutex.Unlock()
538         }
539         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
540         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
541         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
542         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
543         c.registry.UpdateSubscriptionToDb(subs, c)
544         parentTrans.SendEvent(nil, 0)
545 }
546
547 //-------------------------------------------------------------------
548 // send to E2T Subscription Request
549 //-------------------------------------------------------------------
550 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
551         var err error
552         var event interface{} = nil
553         var timedOut bool = false
554
555         subReqMsg := subs.SubReqMsg
556         subReqMsg.RequestId = subs.GetReqId().RequestId
557         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
558         if err != nil {
559                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
560                 return event
561         }
562
563         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
564         c.WriteSubscriptionToDb(subs)
565         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
566                 desc := fmt.Sprintf("(retry %d)", retries)
567                 if retries == 0 {
568                         c.UpdateCounter(cSubReqToE2)
569                 } else {
570                         c.UpdateCounter(cSubReReqToE2)
571                 }
572                 c.rmrSendToE2T(desc, subs, trans)
573                 if subs.DoNotWaitSubResp == false {
574                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
575                         if timedOut {
576                                 c.UpdateCounter(cSubReqTimerExpiry)
577                                 continue
578                         }
579                 } else {
580                         // Simulating case where subscrition request has been sent but response has not been received before restart
581                         event = &SubmgrRestartTestEvent{}
582                 }
583                 break
584         }
585         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
586         return event
587 }
588
589 //-------------------------------------------------------------------
590 // send to E2T Subscription Delete Request
591 //-------------------------------------------------------------------
592
593 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
594         var err error
595         var event interface{}
596         var timedOut bool
597
598         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
599         subDelReqMsg.RequestId = subs.GetReqId().RequestId
600         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
601         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
602         if err != nil {
603                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
604                 return event
605         }
606
607         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
608                 desc := fmt.Sprintf("(retry %d)", retries)
609                 if retries == 0 {
610                         c.UpdateCounter(cSubDelReqToE2)
611                 } else {
612                         c.UpdateCounter(cSubDelReReqToE2)
613                 }
614                 c.rmrSendToE2T(desc, subs, trans)
615                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
616                 if timedOut {
617                         c.UpdateCounter(cSubDelReqTimerExpiry)
618                         continue
619                 }
620                 break
621         }
622         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
623         return event
624 }
625
626 //-------------------------------------------------------------------
627 // handle from E2T Subscription Response
628 //-------------------------------------------------------------------
629 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
630         xapp.Logger.Info("MSG from E2T: %s", params.String())
631         c.UpdateCounter(cSubRespFromE2)
632         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
633         if err != nil {
634                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
635                 return
636         }
637         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
638         if err != nil {
639                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
640                 return
641         }
642         trans := subs.GetTransaction()
643         if trans == nil {
644                 err = fmt.Errorf("Ongoing transaction not found")
645                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
646                 return
647         }
648         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
649         if sendOk == false {
650                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
651                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
652         }
653         return
654 }
655
656 //-------------------------------------------------------------------
657 // handle from E2T Subscription Failure
658 //-------------------------------------------------------------------
659 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
660         xapp.Logger.Info("MSG from E2T: %s", params.String())
661         c.UpdateCounter(cSubFailFromE2)
662         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
663         if err != nil {
664                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
665                 return
666         }
667         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
668         if err != nil {
669                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
670                 return
671         }
672         trans := subs.GetTransaction()
673         if trans == nil {
674                 err = fmt.Errorf("Ongoing transaction not found")
675                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
676                 return
677         }
678         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
679         if sendOk == false {
680                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
681                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
682         }
683         return
684 }
685
686 //-------------------------------------------------------------------
687 // handle from E2T Subscription Delete Response
688 //-------------------------------------------------------------------
689 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
690         xapp.Logger.Info("MSG from E2T: %s", params.String())
691         c.UpdateCounter(cSubDelRespFromE2)
692         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
693         if err != nil {
694                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
695                 return
696         }
697         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
698         if err != nil {
699                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
700                 return
701         }
702         trans := subs.GetTransaction()
703         if trans == nil {
704                 err = fmt.Errorf("Ongoing transaction not found")
705                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
706                 return
707         }
708         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
709         if sendOk == false {
710                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
711                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
712         }
713         return
714 }
715
716 //-------------------------------------------------------------------
717 // handle from E2T Subscription Delete Failure
718 //-------------------------------------------------------------------
719 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
720         xapp.Logger.Info("MSG from E2T: %s", params.String())
721         c.UpdateCounter(cSubDelFailFromE2)
722         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
723         if err != nil {
724                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
725                 return
726         }
727         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
728         if err != nil {
729                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
730                 return
731         }
732         trans := subs.GetTransaction()
733         if trans == nil {
734                 err = fmt.Errorf("Ongoing transaction not found")
735                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
736                 return
737         }
738         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
739         if sendOk == false {
740                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
741                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
742         }
743         return
744 }
745
746 //-------------------------------------------------------------------
747 //
748 //-------------------------------------------------------------------
749 func typeofSubsMessage(v interface{}) string {
750         if v == nil {
751                 return "NIL"
752         }
753         switch v.(type) {
754         case *e2ap.E2APSubscriptionRequest:
755                 return "SubReq"
756         case *e2ap.E2APSubscriptionResponse:
757                 return "SubResp"
758         case *e2ap.E2APSubscriptionFailure:
759                 return "SubFail"
760         case *e2ap.E2APSubscriptionDeleteRequest:
761                 return "SubDelReq"
762         case *e2ap.E2APSubscriptionDeleteResponse:
763                 return "SubDelResp"
764         case *e2ap.E2APSubscriptionDeleteFailure:
765                 return "SubDelFail"
766         default:
767                 return "Unknown"
768         }
769 }
770
771 //-------------------------------------------------------------------
772 //
773 //-------------------------------------------------------------------
774 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
775         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
776         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
777         if err != nil {
778                 xapp.Logger.Error("%v", err)
779         }
780 }
781
782 //-------------------------------------------------------------------
783 //
784 //-------------------------------------------------------------------
785 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
786
787         if removeSubscriptionFromDb == true {
788                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
789                 c.RemoveSubscriptionFromDb(subs)
790         } else {
791                 // Update is needed for successful response and merge case here
792                 if subs.RetryFromXapp == false {
793                         c.WriteSubscriptionToDb(subs)
794                 }
795         }
796         subs.RetryFromXapp = false
797 }
798
799 //-------------------------------------------------------------------
800 //
801 //-------------------------------------------------------------------
802 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
803         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
804         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
805         if err != nil {
806                 xapp.Logger.Error("%v", err)
807         }
808 }
809
810 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
811
812         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
813
814         // Send delete for every endpoint in the subscription
815         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
816         subDelReqMsg.RequestId = subs.GetReqId().RequestId
817         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
818         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
819         if err != nil {
820                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
821                 return
822         }
823         for _, endPoint := range subs.EpList.Endpoints {
824                 params := &xapp.RMRParams{}
825                 params.Mtype = mType
826                 params.SubId = int(subs.GetReqId().InstanceId)
827                 params.Xid = ""
828                 params.Meid = subs.Meid
829                 params.Src = endPoint.String()
830                 params.PayloadLen = len(payload.Buf)
831                 params.Payload = payload.Buf
832                 params.Mbuf = nil
833
834                 if params == nil {
835                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
836                         return
837                 }
838
839                 subs.DeleteFromDb = true
840                 c.handleXAPPSubscriptionDeleteRequest(params)
841         }
842 }