Metrics related code refactoring
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80 }
81
82 type RMRMeid struct {
83         PlmnID  string
84         EnbID   string
85         RanName string
86 }
87
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
90
91 func init() {
92         xapp.Logger.Info("SUBMGR")
93         viper.AutomaticEnv()
94         viper.SetEnvPrefix("submgr")
95         viper.AllowEmptyEnv(true)
96 }
97
98 func NewControl() *Control {
99
100         ReadConfigParameters()
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112
113         c := &Control{e2ap: new(E2ap),
114                 registry: registry,
115                 tracker:  tracker,
116                 db:       CreateSdl(),
117                 //subscriber: subscriber,
118                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119         }
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         // Read subscriptions from db
132         xapp.Logger.Info("Reading subscriptions from db")
133         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134         if err != nil {
135                 xapp.Logger.Error("%v", err)
136         } else {
137                 c.registry.subIds = subIds
138                 c.registry.register = register
139                 c.HandleUncompletedSubscriptions(register)
140         }
141         return c
142 }
143
144 //-------------------------------------------------------------------
145 //
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
148
149         // viper.GetDuration returns nanoseconds
150         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151         if e2tSubReqTimeout == 0 {
152                 e2tSubReqTimeout = 2000 * 1000000
153         }
154         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156         if e2tSubDelReqTime == 0 {
157                 e2tSubDelReqTime = 2000 * 1000000
158         }
159         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161         if e2tRecvMsgTimeout == 0 {
162                 e2tRecvMsgTimeout = 2000 * 1000000
163         }
164         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166         if e2tMaxSubReqTryCount == 0 {
167                 e2tMaxSubReqTryCount = 1
168         }
169         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171         if e2tMaxSubDelReqTryCount == 0 {
172                 e2tMaxSubDelReqTryCount = 1
173         }
174         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
175
176         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177         if readSubsFromDb == "" {
178                 readSubsFromDb = "true"
179         }
180         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 }
182
183 //-------------------------------------------------------------------
184 //
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
187
188         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189         for subId, subs := range register {
190                 if subs.SubRespRcvd == false {
191                         subs.NoRespToXapp = true
192                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193                         c.SendSubscriptionDeleteReq(subs)
194                 }
195         }
196 }
197
198 func (c *Control) ReadyCB(data interface{}) {
199         if c.RMRClient == nil {
200                 c.RMRClient = xapp.Rmr
201         }
202 }
203
204 func (c *Control) Run() {
205         xapp.SetReadyCB(c.ReadyCB, nil)
206         xapp.Run(c)
207 }
208
209 //-------------------------------------------------------------------
210 //
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
213         /*
214            switch p := params.(type) {
215            case *models.ReportParams:
216                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
217                if trans == nil {
218                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219                      return
220                }
221                defer trans.Release()
222            case *models.ControlParams:
223            case *models.PolicyParams:
224            }
225         */
226         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 }
228
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
230         return nil
231 }
232
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234         xapp.Logger.Info("QueryHandler() called")
235
236         return c.registry.QueryHandler()
237 }
238
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240         xapp.Logger.Info("TestRestHandler() called")
241
242         pathParams := mux.Vars(r)
243         s := pathParams["testId"]
244
245         // This can be used to delete single subscription from db
246         if contains := strings.Contains(s, "deletesubid="); contains == true {
247                 var splits = strings.Split(s, "=")
248                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250                         c.RemoveSubscriptionFromSdl(uint32(subId))
251                         return
252                 }
253         }
254
255         // This can be used to remove all subscriptions db from
256         if s == "emptydb" {
257                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258                 c.RemoveAllSubscriptionsFromSdl()
259                 return
260         }
261
262         // This is meant to cause submgr's restart in testing
263         if s == "restart" {
264                 xapp.Logger.Info("os.Exit(1) called")
265                 os.Exit(1)
266         }
267
268         xapp.Logger.Info("Unsupported rest command received %s", s)
269 }
270
271 //-------------------------------------------------------------------
272 //
273 //-------------------------------------------------------------------
274
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276         params := &xapp.RMRParams{}
277         params.Mtype = trans.GetMtype()
278         params.SubId = int(subs.GetReqId().InstanceId)
279         params.Xid = ""
280         params.Meid = subs.GetMeid()
281         params.Src = ""
282         params.PayloadLen = len(trans.Payload.Buf)
283         params.Payload = trans.Payload.Buf
284         params.Mbuf = nil
285         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286         return c.SendWithRetry(params, false, 5)
287 }
288
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
290
291         params := &xapp.RMRParams{}
292         params.Mtype = trans.GetMtype()
293         params.SubId = int(subs.GetReqId().InstanceId)
294         params.Xid = trans.GetXid()
295         params.Meid = trans.GetMeid()
296         params.Src = ""
297         params.PayloadLen = len(trans.Payload.Buf)
298         params.Payload = trans.Payload.Buf
299         params.Mbuf = nil
300         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301         return c.SendWithRetry(params, false, 5)
302 }
303
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305         if c.RMRClient == nil {
306                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307                 xapp.Logger.Error("%s", err.Error())
308                 return
309         }
310         c.CntRecvMsg++
311
312         defer c.RMRClient.Free(msg.Mbuf)
313
314         // xapp-frame might use direct access to c buffer and
315         // when msg.Mbuf is freed, someone might take it into use
316         // and payload data might be invalid inside message handle function
317         //
318         // subscriptions won't load system a lot so there is no
319         // real performance hit by cloning buffer into new go byte slice
320         cPay := append(msg.Payload[:0:0], msg.Payload...)
321         msg.Payload = cPay
322         msg.PayloadLen = len(cPay)
323
324         switch msg.Mtype {
325         case xapp.RIC_SUB_REQ:
326                 go c.handleXAPPSubscriptionRequest(msg)
327         case xapp.RIC_SUB_RESP:
328                 go c.handleE2TSubscriptionResponse(msg)
329         case xapp.RIC_SUB_FAILURE:
330                 go c.handleE2TSubscriptionFailure(msg)
331         case xapp.RIC_SUB_DEL_REQ:
332                 go c.handleXAPPSubscriptionDeleteRequest(msg)
333         case xapp.RIC_SUB_DEL_RESP:
334                 go c.handleE2TSubscriptionDeleteResponse(msg)
335         case xapp.RIC_SUB_DEL_FAILURE:
336                 go c.handleE2TSubscriptionDeleteFailure(msg)
337         default:
338                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
339         }
340         return
341 }
342
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347         xapp.Logger.Info("MSG from XAPP: %s", params.String())
348         c.UpdateCounter(cSubReqFromXapp)
349
350         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
351         if err != nil {
352                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
353                 return
354         }
355
356         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
357         if trans == nil {
358                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
359                 return
360         }
361         defer trans.Release()
362
363         if err = c.tracker.Track(trans); err != nil {
364                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365                 return
366         }
367
368         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
370         if err != nil {
371                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
372                 return
373         }
374
375         c.wakeSubscriptionRequest(subs, trans)
376 }
377
378 //-------------------------------------------------------------------
379 // Wake Subscription Request to E2node
380 //------------------------------------------------------------------
381 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
382
383         go c.handleSubscriptionCreate(subs, trans)
384         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
385         var err error
386         if event != nil {
387                 switch themsg := event.(type) {
388                 case *e2ap.E2APSubscriptionResponse:
389                         themsg.RequestId.Id = trans.RequestId.Id
390                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
391                         if err == nil {
392                                 trans.Release()
393                                 c.UpdateCounter(cSubRespToXapp)
394                                 c.rmrSendToXapp("", subs, trans)
395                                 return
396                         }
397                 case *e2ap.E2APSubscriptionFailure:
398                         themsg.RequestId.Id = trans.RequestId.Id
399                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
400                         if err == nil {
401                                 c.UpdateCounter(cSubFailToXapp)
402                                 c.rmrSendToXapp("", subs, trans)
403                         }
404                 default:
405                         break
406                 }
407         }
408         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
409         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
410 }
411
412 //-------------------------------------------------------------------
413 // handle from XAPP Subscription Delete Request
414 //------------------------------------------------------------------
415 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
416         xapp.Logger.Info("MSG from XAPP: %s", params.String())
417         c.UpdateCounter(cSubDelReqFromXapp)
418
419         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
420         if err != nil {
421                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
422                 return
423         }
424
425         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
426         if trans == nil {
427                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
428                 return
429         }
430         defer trans.Release()
431
432         err = c.tracker.Track(trans)
433         if err != nil {
434                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
435                 return
436         }
437
438         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
439         if err != nil {
440                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
441                 return
442         }
443
444         //
445         // Wake subs delete
446         //
447         go c.handleSubscriptionDelete(subs, trans)
448         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
449
450         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
451
452         if subs.NoRespToXapp == true {
453                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
454                 return
455         }
456
457         // Whatever is received success, fail or timeout, send successful delete response
458         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
459         subDelRespMsg.RequestId.Id = trans.RequestId.Id
460         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
461         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
462         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
463         if err == nil {
464                 c.UpdateCounter(cSubDelRespToXapp)
465                 c.rmrSendToXapp("", subs, trans)
466         }
467
468         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
469         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
470 }
471
472 //-------------------------------------------------------------------
473 // SUBS CREATE Handling
474 //-------------------------------------------------------------------
475 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
476
477         var removeSubscriptionFromDb bool = false
478         trans := c.tracker.NewSubsTransaction(subs)
479         subs.WaitTransactionTurn(trans)
480         defer subs.ReleaseTransactionTurn(trans)
481         defer trans.Release()
482
483         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
484
485         subRfMsg, valid := subs.GetCachedResponse()
486         if subRfMsg == nil && valid == true {
487                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
488                 switch event.(type) {
489                 case *e2ap.E2APSubscriptionResponse:
490                         subRfMsg, valid = subs.SetCachedResponse(event, true)
491                         subs.SubRespRcvd = true
492                 case *e2ap.E2APSubscriptionFailure:
493                         removeSubscriptionFromDb = true
494                         subRfMsg, valid = subs.SetCachedResponse(event, false)
495                         xapp.Logger.Info("SUBS-SubReq: internal delete  due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
496                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
497                 case *SubmgrRestartTestEvent:
498                         // This simulates that no response has been received and after restart subscriptions are restored from db
499                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
500                         return
501                 default:
502                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
503                         removeSubscriptionFromDb = true
504                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
505                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
506                 }
507                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
508         } else {
509                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
510         }
511
512         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
513         if valid == false {
514                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
515         }
516
517         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
518         parentTrans.SendEvent(subRfMsg, 0)
519 }
520
521 //-------------------------------------------------------------------
522 // SUBS DELETE Handling
523 //-------------------------------------------------------------------
524
525 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
526
527         trans := c.tracker.NewSubsTransaction(subs)
528         subs.WaitTransactionTurn(trans)
529         defer subs.ReleaseTransactionTurn(trans)
530         defer trans.Release()
531
532         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
533
534         subs.mutex.Lock()
535
536         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
537                 subs.valid = false
538                 subs.mutex.Unlock()
539                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
540         } else {
541                 subs.mutex.Unlock()
542         }
543         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
544         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
545         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
546         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
547         c.registry.UpdateSubscriptionToDb(subs, c)
548         parentTrans.SendEvent(nil, 0)
549 }
550
551 //-------------------------------------------------------------------
552 // send to E2T Subscription Request
553 //-------------------------------------------------------------------
554 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
555         var err error
556         var event interface{} = nil
557         var timedOut bool = false
558
559         subReqMsg := subs.SubReqMsg
560         subReqMsg.RequestId = subs.GetReqId().RequestId
561         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
562         if err != nil {
563                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
564                 return event
565         }
566
567         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
568         c.WriteSubscriptionToDb(subs)
569         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
570                 desc := fmt.Sprintf("(retry %d)", retries)
571                 if retries == 0 {
572                         c.UpdateCounter(cSubReqToE2)
573                 } else {
574                         c.UpdateCounter(cSubReReqToE2)
575                 }
576                 c.rmrSendToE2T(desc, subs, trans)
577                 if subs.DoNotWaitSubResp == false {
578                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
579                         if timedOut {
580                                 c.UpdateCounter(cSubReqTimerExpiry)
581                                 continue
582                         }
583                 } else {
584                         // Simulating case where subscrition request has been sent but response has not been received before restart
585                         event = &SubmgrRestartTestEvent{}
586                 }
587                 break
588         }
589         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
590         return event
591 }
592
593 //-------------------------------------------------------------------
594 // send to E2T Subscription Delete Request
595 //-------------------------------------------------------------------
596
597 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
598         var err error
599         var event interface{}
600         var timedOut bool
601
602         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
603         subDelReqMsg.RequestId = subs.GetReqId().RequestId
604         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
605         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
606         if err != nil {
607                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
608                 return event
609         }
610
611         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
612                 desc := fmt.Sprintf("(retry %d)", retries)
613                 if retries == 0 {
614                         c.UpdateCounter(cSubDelReqToE2)
615                 } else {
616                         c.UpdateCounter(cSubDelReReqToE2)
617                 }
618                 c.rmrSendToE2T(desc, subs, trans)
619                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
620                 if timedOut {
621                         c.UpdateCounter(cSubDelReqTimerExpiry)
622                         continue
623                 }
624                 break
625         }
626         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
627         return event
628 }
629
630 //-------------------------------------------------------------------
631 // handle from E2T Subscription Response
632 //-------------------------------------------------------------------
633 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
634         xapp.Logger.Info("MSG from E2T: %s", params.String())
635         c.UpdateCounter(cSubRespFromE2)
636         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
637         if err != nil {
638                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
639                 return
640         }
641         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
642         if err != nil {
643                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
644                 return
645         }
646         trans := subs.GetTransaction()
647         if trans == nil {
648                 err = fmt.Errorf("Ongoing transaction not found")
649                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
650                 return
651         }
652         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
653         if sendOk == false {
654                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
655                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
656         }
657         return
658 }
659
660 //-------------------------------------------------------------------
661 // handle from E2T Subscription Failure
662 //-------------------------------------------------------------------
663 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
664         xapp.Logger.Info("MSG from E2T: %s", params.String())
665         c.UpdateCounter(cSubFailFromE2)
666         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
667         if err != nil {
668                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
669                 return
670         }
671         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
672         if err != nil {
673                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
674                 return
675         }
676         trans := subs.GetTransaction()
677         if trans == nil {
678                 err = fmt.Errorf("Ongoing transaction not found")
679                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
680                 return
681         }
682         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
683         if sendOk == false {
684                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
685                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
686         }
687         return
688 }
689
690 //-------------------------------------------------------------------
691 // handle from E2T Subscription Delete Response
692 //-------------------------------------------------------------------
693 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
694         xapp.Logger.Info("MSG from E2T: %s", params.String())
695         c.UpdateCounter(cSubDelRespFromE2)
696         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
697         if err != nil {
698                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
699                 return
700         }
701         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
702         if err != nil {
703                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
704                 return
705         }
706         trans := subs.GetTransaction()
707         if trans == nil {
708                 err = fmt.Errorf("Ongoing transaction not found")
709                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
710                 return
711         }
712         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
713         if sendOk == false {
714                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
715                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
716         }
717         return
718 }
719
720 //-------------------------------------------------------------------
721 // handle from E2T Subscription Delete Failure
722 //-------------------------------------------------------------------
723 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
724         xapp.Logger.Info("MSG from E2T: %s", params.String())
725         c.UpdateCounter(cSubDelFailFromE2)
726         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
727         if err != nil {
728                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
729                 return
730         }
731         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
732         if err != nil {
733                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
734                 return
735         }
736         trans := subs.GetTransaction()
737         if trans == nil {
738                 err = fmt.Errorf("Ongoing transaction not found")
739                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
740                 return
741         }
742         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
743         if sendOk == false {
744                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
745                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
746         }
747         return
748 }
749
750 //-------------------------------------------------------------------
751 //
752 //-------------------------------------------------------------------
753 func typeofSubsMessage(v interface{}) string {
754         if v == nil {
755                 return "NIL"
756         }
757         switch v.(type) {
758         case *e2ap.E2APSubscriptionRequest:
759                 return "SubReq"
760         case *e2ap.E2APSubscriptionResponse:
761                 return "SubResp"
762         case *e2ap.E2APSubscriptionFailure:
763                 return "SubFail"
764         case *e2ap.E2APSubscriptionDeleteRequest:
765                 return "SubDelReq"
766         case *e2ap.E2APSubscriptionDeleteResponse:
767                 return "SubDelResp"
768         case *e2ap.E2APSubscriptionDeleteFailure:
769                 return "SubDelFail"
770         default:
771                 return "Unknown"
772         }
773 }
774
775 //-------------------------------------------------------------------
776 //
777 //-------------------------------------------------------------------
778 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
779         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
780         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
781         if err != nil {
782                 xapp.Logger.Error("%v", err)
783         }
784 }
785
786 //-------------------------------------------------------------------
787 //
788 //-------------------------------------------------------------------
789 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
790
791         if removeSubscriptionFromDb == true {
792                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
793                 c.RemoveSubscriptionFromDb(subs)
794         } else {
795                 // Update is needed for successful response and merge case here
796                 if subs.RetryFromXapp == false {
797                         c.WriteSubscriptionToDb(subs)
798                 }
799         }
800         subs.RetryFromXapp = false
801 }
802
803 //-------------------------------------------------------------------
804 //
805 //-------------------------------------------------------------------
806 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
807         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
808         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
809         if err != nil {
810                 xapp.Logger.Error("%v", err)
811         }
812 }
813
814 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
815
816         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
817
818         // Send delete for every endpoint in the subscription
819         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
820         subDelReqMsg.RequestId = subs.GetReqId().RequestId
821         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
822         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
823         if err != nil {
824                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
825                 return
826         }
827         for _, endPoint := range subs.EpList.Endpoints {
828                 params := &xapp.RMRParams{}
829                 params.Mtype = mType
830                 params.SubId = int(subs.GetReqId().InstanceId)
831                 params.Xid = ""
832                 params.Meid = subs.Meid
833                 params.Src = endPoint.String()
834                 params.PayloadLen = len(payload.Buf)
835                 params.Payload = payload.Buf
836                 params.Mbuf = nil
837
838                 if params == nil {
839                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
840                         return
841                 }
842
843                 subs.DeleteFromDb = true
844                 c.handleXAPPSubscriptionDeleteRequest(params)
845         }
846 }