ac142eb4688521d95ad57cdd50dc190806005ee1
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80 }
81
82 type RMRMeid struct {
83         PlmnID  string
84         EnbID   string
85         RanName string
86 }
87
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
90
91 func init() {
92         xapp.Logger.Info("SUBMGR")
93         viper.AutomaticEnv()
94         viper.SetEnvPrefix("submgr")
95         viper.AllowEmptyEnv(true)
96 }
97
98 func NewControl() *Control {
99
100         ReadConfigParameters()
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112
113         c := &Control{e2ap: new(E2ap),
114                 registry: registry,
115                 tracker:  tracker,
116                 db:       CreateSdl(),
117                 //subscriber: subscriber,
118                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119         }
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         // Read subscriptions from db
132         xapp.Logger.Info("Reading subscriptions from db")
133         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134         if err != nil {
135                 xapp.Logger.Error("%v", err)
136         } else {
137                 c.registry.subIds = subIds
138                 c.registry.register = register
139                 c.HandleUncompletedSubscriptions(register)
140         }
141         return c
142 }
143
144 //-------------------------------------------------------------------
145 //
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
148
149         // viper.GetDuration returns nanoseconds
150         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151         if e2tSubReqTimeout == 0 {
152                 e2tSubReqTimeout = 2000 * 1000000
153         }
154         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156         if e2tSubDelReqTime == 0 {
157                 e2tSubDelReqTime = 2000 * 1000000
158         }
159         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161         if e2tRecvMsgTimeout == 0 {
162                 e2tRecvMsgTimeout = 2000 * 1000000
163         }
164         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166         if e2tMaxSubReqTryCount == 0 {
167                 e2tMaxSubReqTryCount = 1
168         }
169         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171         if e2tMaxSubDelReqTryCount == 0 {
172                 e2tMaxSubDelReqTryCount = 1
173         }
174         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
175
176         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177         if readSubsFromDb == "" {
178                 readSubsFromDb = "true"
179         }
180         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 }
182
183 //-------------------------------------------------------------------
184 //
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
187
188         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189         for subId, subs := range register {
190                 if subs.SubRespRcvd == false {
191                         subs.NoRespToXapp = true
192                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193                         c.SendSubscriptionDeleteReq(subs)
194                 }
195         }
196 }
197
198 func (c *Control) ReadyCB(data interface{}) {
199         if c.RMRClient == nil {
200                 c.RMRClient = xapp.Rmr
201         }
202 }
203
204 func (c *Control) Run() {
205         xapp.SetReadyCB(c.ReadyCB, nil)
206         xapp.Run(c)
207 }
208
209 //-------------------------------------------------------------------
210 //
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
213         /*
214            switch p := params.(type) {
215            case *models.ReportParams:
216                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
217                if trans == nil {
218                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219                      return
220                }
221                defer trans.Release()
222            case *models.ControlParams:
223            case *models.PolicyParams:
224            }
225         */
226         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 }
228
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
230         return nil
231 }
232
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234         xapp.Logger.Info("QueryHandler() called")
235
236         return c.registry.QueryHandler()
237 }
238
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240         xapp.Logger.Info("TestRestHandler() called")
241
242         pathParams := mux.Vars(r)
243         s := pathParams["testId"]
244
245         // This can be used to delete single subscription from db
246         if contains := strings.Contains(s, "deletesubid="); contains == true {
247                 var splits = strings.Split(s, "=")
248                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250                         c.RemoveSubscriptionFromSdl(uint32(subId))
251                         return
252                 }
253         }
254
255         // This can be used to remove all subscriptions db from
256         if s == "emptydb" {
257                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258                 c.RemoveAllSubscriptionsFromSdl()
259                 return
260         }
261
262         // This is meant to cause submgr's restart in testing
263         if s == "restart" {
264                 xapp.Logger.Info("os.Exit(1) called")
265                 os.Exit(1)
266         }
267
268         xapp.Logger.Info("Unsupported rest command received %s", s)
269 }
270
271 //-------------------------------------------------------------------
272 //
273 //-------------------------------------------------------------------
274
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276         params := &xapp.RMRParams{}
277         params.Mtype = trans.GetMtype()
278         params.SubId = int(subs.GetReqId().InstanceId)
279         params.Xid = ""
280         params.Meid = subs.GetMeid()
281         params.Src = ""
282         params.PayloadLen = len(trans.Payload.Buf)
283         params.Payload = trans.Payload.Buf
284         params.Mbuf = nil
285         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286         return c.SendWithRetry(params, false, 5)
287 }
288
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
290
291         params := &xapp.RMRParams{}
292         params.Mtype = trans.GetMtype()
293         params.SubId = int(subs.GetReqId().InstanceId)
294         params.Xid = trans.GetXid()
295         params.Meid = trans.GetMeid()
296         params.Src = ""
297         params.PayloadLen = len(trans.Payload.Buf)
298         params.Payload = trans.Payload.Buf
299         params.Mbuf = nil
300         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301         return c.SendWithRetry(params, false, 5)
302 }
303
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305         if c.RMRClient == nil {
306                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307                 xapp.Logger.Error("%s", err.Error())
308                 return
309         }
310         c.CntRecvMsg++
311
312         defer c.RMRClient.Free(msg.Mbuf)
313
314         // xapp-frame might use direct access to c buffer and
315         // when msg.Mbuf is freed, someone might take it into use
316         // and payload data might be invalid inside message handle function
317         //
318         // subscriptions won't load system a lot so there is no
319         // real performance hit by cloning buffer into new go byte slice
320         cPay := append(msg.Payload[:0:0], msg.Payload...)
321         msg.Payload = cPay
322         msg.PayloadLen = len(cPay)
323
324         switch msg.Mtype {
325         case xapp.RIC_SUB_REQ:
326                 go c.handleXAPPSubscriptionRequest(msg)
327         case xapp.RIC_SUB_RESP:
328                 go c.handleE2TSubscriptionResponse(msg)
329         case xapp.RIC_SUB_FAILURE:
330                 go c.handleE2TSubscriptionFailure(msg)
331         case xapp.RIC_SUB_DEL_REQ:
332                 go c.handleXAPPSubscriptionDeleteRequest(msg)
333         case xapp.RIC_SUB_DEL_RESP:
334                 go c.handleE2TSubscriptionDeleteResponse(msg)
335         case xapp.RIC_SUB_DEL_FAILURE:
336                 go c.handleE2TSubscriptionDeleteFailure(msg)
337         default:
338                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
339         }
340         return
341 }
342
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347         xapp.Logger.Info("MSG from XAPP: %s", params.String())
348         c.UpdateCounter(cSubReqFromXapp)
349
350         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
351         if err != nil {
352                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
353                 return
354         }
355
356         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
357         if trans == nil {
358                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
359                 return
360         }
361         defer trans.Release()
362
363         if err = c.tracker.Track(trans); err != nil {
364                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365                 return
366         }
367
368         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
370         if err != nil {
371                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
372                 return
373         }
374
375         c.wakeSubscriptionRequest(subs, trans)
376 }
377
378 //-------------------------------------------------------------------
379 // Wake Subscription Request to E2node
380 //------------------------------------------------------------------
381 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
382
383         go c.handleSubscriptionCreate(subs, trans)
384         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
385         var err error
386         if event != nil {
387                 switch themsg := event.(type) {
388                 case *e2ap.E2APSubscriptionResponse:
389                         themsg.RequestId.Id = trans.RequestId.Id
390                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
391                         if err == nil {
392                                 trans.Release()
393                                 c.UpdateCounter(cSubRespToXapp)
394                                 c.rmrSendToXapp("", subs, trans)
395                                 return
396                         }
397                 case *e2ap.E2APSubscriptionFailure:
398                         themsg.RequestId.Id = trans.RequestId.Id
399                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
400                         if err == nil {
401                                 c.UpdateCounter(cSubFailToXapp)
402                                 c.rmrSendToXapp("", subs, trans)
403                         }
404                 default:
405                         break
406                 }
407         }
408         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
409         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
410 }
411
412 //-------------------------------------------------------------------
413 // handle from XAPP Subscription Delete Request
414 //------------------------------------------------------------------
415 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
416         xapp.Logger.Info("MSG from XAPP: %s", params.String())
417         c.UpdateCounter(cSubDelReqFromXapp)
418
419         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
420         if err != nil {
421                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
422                 return
423         }
424
425         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
426         if trans == nil {
427                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
428                 return
429         }
430         defer trans.Release()
431
432         err = c.tracker.Track(trans)
433         if err != nil {
434                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
435                 return
436         }
437
438         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
439         if err != nil {
440                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
441                 return
442         }
443
444         //
445         // Wake subs delete
446         //
447         go c.handleSubscriptionDelete(subs, trans)
448         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
449
450         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
451
452         if subs.NoRespToXapp == true {
453                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
454                 return
455         }
456
457         // Whatever is received success, fail or timeout, send successful delete response
458         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
459         subDelRespMsg.RequestId.Id = trans.RequestId.Id
460         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
461         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
462         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
463         if err == nil {
464                 c.UpdateCounter(cSubDelRespToXapp)
465                 c.rmrSendToXapp("", subs, trans)
466         }
467
468         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
469         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
470 }
471
472 //-------------------------------------------------------------------
473 // SUBS CREATE Handling
474 //-------------------------------------------------------------------
475 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
476
477         var removeSubscriptionFromDb bool = false
478         trans := c.tracker.NewSubsTransaction(subs)
479         subs.WaitTransactionTurn(trans)
480         defer subs.ReleaseTransactionTurn(trans)
481         defer trans.Release()
482
483         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
484
485         subRfMsg, valid := subs.GetCachedResponse()
486         if subRfMsg == nil && valid == true {
487                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
488                 switch event.(type) {
489                 case *e2ap.E2APSubscriptionResponse:
490                         subRfMsg, valid = subs.SetCachedResponse(event, true)
491                         subs.SubRespRcvd = true
492                 case *e2ap.E2APSubscriptionFailure:
493                         removeSubscriptionFromDb = true
494                         subRfMsg, valid = subs.SetCachedResponse(event, false)
495                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
496                 case *SubmgrRestartTestEvent:
497                         // This simulates that no response has been received and after restart subscriptions are restored from db
498                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
499                         return
500                 default:
501                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
502                         removeSubscriptionFromDb = true
503                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
504                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
505                 }
506                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
507         } else {
508                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
509         }
510
511         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
512         if valid == false {
513                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
514         }
515
516         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
517         parentTrans.SendEvent(subRfMsg, 0)
518 }
519
520 //-------------------------------------------------------------------
521 // SUBS DELETE Handling
522 //-------------------------------------------------------------------
523
524 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
525
526         trans := c.tracker.NewSubsTransaction(subs)
527         subs.WaitTransactionTurn(trans)
528         defer subs.ReleaseTransactionTurn(trans)
529         defer trans.Release()
530
531         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
532
533         subs.mutex.Lock()
534
535         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
536                 subs.valid = false
537                 subs.mutex.Unlock()
538                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
539         } else {
540                 subs.mutex.Unlock()
541         }
542         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
543         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
544         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
545         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
546         c.registry.UpdateSubscriptionToDb(subs, c)
547         parentTrans.SendEvent(nil, 0)
548 }
549
550 //-------------------------------------------------------------------
551 // send to E2T Subscription Request
552 //-------------------------------------------------------------------
553 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
554         var err error
555         var event interface{} = nil
556         var timedOut bool = false
557
558         subReqMsg := subs.SubReqMsg
559         subReqMsg.RequestId = subs.GetReqId().RequestId
560         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
561         if err != nil {
562                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
563                 return event
564         }
565
566         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
567         c.WriteSubscriptionToDb(subs)
568         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
569                 desc := fmt.Sprintf("(retry %d)", retries)
570                 if retries == 0 {
571                         c.UpdateCounter(cSubReqToE2)
572                 } else {
573                         c.UpdateCounter(cSubReReqToE2)
574                 }
575                 c.rmrSendToE2T(desc, subs, trans)
576                 if subs.DoNotWaitSubResp == false {
577                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
578                         if timedOut {
579                                 c.UpdateCounter(cSubReqTimerExpiry)
580                                 continue
581                         }
582                 } else {
583                         // Simulating case where subscrition request has been sent but response has not been received before restart
584                         event = &SubmgrRestartTestEvent{}
585                 }
586                 break
587         }
588         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
589         return event
590 }
591
592 //-------------------------------------------------------------------
593 // send to E2T Subscription Delete Request
594 //-------------------------------------------------------------------
595
596 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
597         var err error
598         var event interface{}
599         var timedOut bool
600
601         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
602         subDelReqMsg.RequestId = subs.GetReqId().RequestId
603         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
604         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
605         if err != nil {
606                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
607                 return event
608         }
609
610         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
611                 desc := fmt.Sprintf("(retry %d)", retries)
612                 if retries == 0 {
613                         c.UpdateCounter(cSubDelReqToE2)
614                 } else {
615                         c.UpdateCounter(cSubDelReReqToE2)
616                 }
617                 c.rmrSendToE2T(desc, subs, trans)
618                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
619                 if timedOut {
620                         c.UpdateCounter(cSubDelReqTimerExpiry)
621                         continue
622                 }
623                 break
624         }
625         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
626         return event
627 }
628
629 //-------------------------------------------------------------------
630 // handle from E2T Subscription Response
631 //-------------------------------------------------------------------
632 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
633         xapp.Logger.Info("MSG from E2T: %s", params.String())
634         c.UpdateCounter(cSubRespFromE2)
635         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
636         if err != nil {
637                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
638                 return
639         }
640         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
641         if err != nil {
642                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
643                 return
644         }
645         trans := subs.GetTransaction()
646         if trans == nil {
647                 err = fmt.Errorf("Ongoing transaction not found")
648                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
649                 return
650         }
651         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
652         if sendOk == false {
653                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
654                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
655         }
656         return
657 }
658
659 //-------------------------------------------------------------------
660 // handle from E2T Subscription Failure
661 //-------------------------------------------------------------------
662 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
663         xapp.Logger.Info("MSG from E2T: %s", params.String())
664         c.UpdateCounter(cSubFailFromE2)
665         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
666         if err != nil {
667                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
668                 return
669         }
670         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
671         if err != nil {
672                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
673                 return
674         }
675         trans := subs.GetTransaction()
676         if trans == nil {
677                 err = fmt.Errorf("Ongoing transaction not found")
678                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
679                 return
680         }
681         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
682         if sendOk == false {
683                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
684                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
685         }
686         return
687 }
688
689 //-------------------------------------------------------------------
690 // handle from E2T Subscription Delete Response
691 //-------------------------------------------------------------------
692 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
693         xapp.Logger.Info("MSG from E2T: %s", params.String())
694         c.UpdateCounter(cSubDelRespFromE2)
695         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
696         if err != nil {
697                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
698                 return
699         }
700         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
701         if err != nil {
702                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
703                 return
704         }
705         trans := subs.GetTransaction()
706         if trans == nil {
707                 err = fmt.Errorf("Ongoing transaction not found")
708                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
709                 return
710         }
711         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
712         if sendOk == false {
713                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
714                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
715         }
716         return
717 }
718
719 //-------------------------------------------------------------------
720 // handle from E2T Subscription Delete Failure
721 //-------------------------------------------------------------------
722 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
723         xapp.Logger.Info("MSG from E2T: %s", params.String())
724         c.UpdateCounter(cSubDelFailFromE2)
725         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
726         if err != nil {
727                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
728                 return
729         }
730         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
731         if err != nil {
732                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
733                 return
734         }
735         trans := subs.GetTransaction()
736         if trans == nil {
737                 err = fmt.Errorf("Ongoing transaction not found")
738                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
739                 return
740         }
741         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
742         if sendOk == false {
743                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
744                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
745         }
746         return
747 }
748
749 //-------------------------------------------------------------------
750 //
751 //-------------------------------------------------------------------
752 func typeofSubsMessage(v interface{}) string {
753         if v == nil {
754                 return "NIL"
755         }
756         switch v.(type) {
757         case *e2ap.E2APSubscriptionRequest:
758                 return "SubReq"
759         case *e2ap.E2APSubscriptionResponse:
760                 return "SubResp"
761         case *e2ap.E2APSubscriptionFailure:
762                 return "SubFail"
763         case *e2ap.E2APSubscriptionDeleteRequest:
764                 return "SubDelReq"
765         case *e2ap.E2APSubscriptionDeleteResponse:
766                 return "SubDelResp"
767         case *e2ap.E2APSubscriptionDeleteFailure:
768                 return "SubDelFail"
769         default:
770                 return "Unknown"
771         }
772 }
773
774 //-------------------------------------------------------------------
775 //
776 //-------------------------------------------------------------------
777 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
778         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
779         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
780         if err != nil {
781                 xapp.Logger.Error("%v", err)
782         }
783 }
784
785 //-------------------------------------------------------------------
786 //
787 //-------------------------------------------------------------------
788 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
789
790         if removeSubscriptionFromDb == true {
791                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
792                 c.RemoveSubscriptionFromDb(subs)
793         } else {
794                 // Update is needed for successful response and merge case here
795                 if subs.RetryFromXapp == false {
796                         c.WriteSubscriptionToDb(subs)
797                 }
798         }
799         subs.RetryFromXapp = false
800 }
801
802 //-------------------------------------------------------------------
803 //
804 //-------------------------------------------------------------------
805 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
806         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
807         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
808         if err != nil {
809                 xapp.Logger.Error("%v", err)
810         }
811 }
812
813 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
814
815         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
816
817         // Send delete for every endpoint in the subscription
818         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
819         subDelReqMsg.RequestId = subs.GetReqId().RequestId
820         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
821         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
822         if err != nil {
823                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
824                 return
825         }
826         for _, endPoint := range subs.EpList.Endpoints {
827                 params := &xapp.RMRParams{}
828                 params.Mtype = mType
829                 params.SubId = int(subs.GetReqId().InstanceId)
830                 params.Xid = ""
831                 params.Meid = subs.Meid
832                 params.Src = endPoint.String()
833                 params.PayloadLen = len(payload.Buf)
834                 params.Payload = payload.Buf
835                 params.Mbuf = nil
836
837                 if params == nil {
838                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
839                         return
840                 }
841
842                 subs.DeleteFromDb = true
843                 c.handleXAPPSubscriptionDeleteRequest(params)
844         }
845 }