ConfigMap parameter handling change
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54         }
55         return retval
56 }
57
58 //-----------------------------------------------------------------------------
59 //
60 //-----------------------------------------------------------------------------
61
62 var e2tSubReqTimeout time.Duration
63 var e2tSubDelReqTime time.Duration
64 var e2tRecvMsgTimeout time.Duration
65 var e2tMaxSubReqTryCount uint64    // Initial try + retry
66 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
67 var readSubsFromDb string
68
69 type Control struct {
70         *xapp.RMRClient
71         e2ap     *E2ap
72         registry *Registry
73         tracker  *Tracker
74         db       Sdlnterface
75         //subscriber *xapp.Subscriber
76         CntRecvMsg    uint64
77         ResetTestFlag bool
78         Counters      map[string]xapp.Counter
79 }
80
81 type RMRMeid struct {
82         PlmnID  string
83         EnbID   string
84         RanName string
85 }
86
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
89
90 func init() {
91         xapp.Logger.Info("SUBMGR")
92         viper.AutomaticEnv()
93         viper.SetEnvPrefix("submgr")
94         viper.AllowEmptyEnv(true)
95 }
96
97 func NewControl() *Control {
98
99         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
100         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
101
102         registry := new(Registry)
103         registry.Initialize()
104         registry.rtmgrClient = &rtmgrClient
105
106         tracker := new(Tracker)
107         tracker.Init()
108
109         c := &Control{e2ap: new(E2ap),
110                 registry: registry,
111                 tracker:  tracker,
112                 db:       CreateSdl(),
113                 //subscriber: subscriber,
114                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
115         }
116         c.ReadConfigParameters("")
117
118         // Register REST handler for testing support
119         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
120
121         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
122
123         if readSubsFromDb == "false" {
124                 return c
125         }
126
127         // Read subscriptions from db
128         xapp.Logger.Info("Reading subscriptions from db")
129         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
130         if err != nil {
131                 xapp.Logger.Error("%v", err)
132         } else {
133                 c.registry.subIds = subIds
134                 c.registry.register = register
135                 c.HandleUncompletedSubscriptions(register)
136         }
137         return c
138 }
139
140 //-------------------------------------------------------------------
141 //
142 //-------------------------------------------------------------------
143 func (c *Control) ReadConfigParameters(f string) {
144
145         // viper.GetDuration returns nanoseconds
146         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
147         if e2tSubReqTimeout == 0 {
148                 e2tSubReqTimeout = 2000 * 1000000
149         }
150         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
151         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
152         if e2tSubDelReqTime == 0 {
153                 e2tSubDelReqTime = 2000 * 1000000
154         }
155         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
156         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
157         if e2tRecvMsgTimeout == 0 {
158                 e2tRecvMsgTimeout = 2000 * 1000000
159         }
160         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
161         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
162         if e2tMaxSubReqTryCount == 0 {
163                 e2tMaxSubReqTryCount = 1
164         }
165         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
166         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
167         if e2tMaxSubDelReqTryCount == 0 {
168                 e2tMaxSubDelReqTryCount = 1
169         }
170         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
171
172         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
173         if readSubsFromDb == "" {
174                 readSubsFromDb = "true"
175         }
176         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
177 }
178
179 //-------------------------------------------------------------------
180 //
181 //-------------------------------------------------------------------
182 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
183
184         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
185         for subId, subs := range register {
186                 if subs.SubRespRcvd == false {
187                         subs.NoRespToXapp = true
188                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
189                         c.SendSubscriptionDeleteReq(subs)
190                 }
191         }
192 }
193
194 func (c *Control) ReadyCB(data interface{}) {
195         if c.RMRClient == nil {
196                 c.RMRClient = xapp.Rmr
197         }
198 }
199
200 func (c *Control) Run() {
201         xapp.SetReadyCB(c.ReadyCB, nil)
202         xapp.AddConfigChangeListener(c.ReadConfigParameters)
203         xapp.Run(c)
204 }
205
206 //-------------------------------------------------------------------
207 //
208 //-------------------------------------------------------------------
209 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
210         /*
211            switch p := params.(type) {
212            case *models.ReportParams:
213                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
214                if trans == nil {
215                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
216                      return
217                }
218                defer trans.Release()
219            case *models.ControlParams:
220            case *models.PolicyParams:
221            }
222         */
223         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
224 }
225
226 func (c *Control) SubscriptionDeleteHandler(s string) error {
227         return nil
228 }
229
230 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
231         xapp.Logger.Info("QueryHandler() called")
232
233         return c.registry.QueryHandler()
234 }
235
236 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
237         xapp.Logger.Info("TestRestHandler() called")
238
239         pathParams := mux.Vars(r)
240         s := pathParams["testId"]
241
242         // This can be used to delete single subscription from db
243         if contains := strings.Contains(s, "deletesubid="); contains == true {
244                 var splits = strings.Split(s, "=")
245                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247                         c.RemoveSubscriptionFromSdl(uint32(subId))
248                         return
249                 }
250         }
251
252         // This can be used to remove all subscriptions db from
253         if s == "emptydb" {
254                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255                 c.RemoveAllSubscriptionsFromSdl()
256                 return
257         }
258
259         // This is meant to cause submgr's restart in testing
260         if s == "restart" {
261                 xapp.Logger.Info("os.Exit(1) called")
262                 os.Exit(1)
263         }
264
265         xapp.Logger.Info("Unsupported rest command received %s", s)
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273         params := &xapp.RMRParams{}
274         params.Mtype = trans.GetMtype()
275         params.SubId = int(subs.GetReqId().InstanceId)
276         params.Xid = ""
277         params.Meid = subs.GetMeid()
278         params.Src = ""
279         params.PayloadLen = len(trans.Payload.Buf)
280         params.Payload = trans.Payload.Buf
281         params.Mbuf = nil
282         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283         return c.SendWithRetry(params, false, 5)
284 }
285
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
287
288         params := &xapp.RMRParams{}
289         params.Mtype = trans.GetMtype()
290         params.SubId = int(subs.GetReqId().InstanceId)
291         params.Xid = trans.GetXid()
292         params.Meid = trans.GetMeid()
293         params.Src = ""
294         params.PayloadLen = len(trans.Payload.Buf)
295         params.Payload = trans.Payload.Buf
296         params.Mbuf = nil
297         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298         return c.SendWithRetry(params, false, 5)
299 }
300
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302         if c.RMRClient == nil {
303                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304                 xapp.Logger.Error("%s", err.Error())
305                 return
306         }
307         c.CntRecvMsg++
308
309         defer c.RMRClient.Free(msg.Mbuf)
310
311         // xapp-frame might use direct access to c buffer and
312         // when msg.Mbuf is freed, someone might take it into use
313         // and payload data might be invalid inside message handle function
314         //
315         // subscriptions won't load system a lot so there is no
316         // real performance hit by cloning buffer into new go byte slice
317         cPay := append(msg.Payload[:0:0], msg.Payload...)
318         msg.Payload = cPay
319         msg.PayloadLen = len(cPay)
320
321         switch msg.Mtype {
322         case xapp.RIC_SUB_REQ:
323                 go c.handleXAPPSubscriptionRequest(msg)
324         case xapp.RIC_SUB_RESP:
325                 go c.handleE2TSubscriptionResponse(msg)
326         case xapp.RIC_SUB_FAILURE:
327                 go c.handleE2TSubscriptionFailure(msg)
328         case xapp.RIC_SUB_DEL_REQ:
329                 go c.handleXAPPSubscriptionDeleteRequest(msg)
330         case xapp.RIC_SUB_DEL_RESP:
331                 go c.handleE2TSubscriptionDeleteResponse(msg)
332         case xapp.RIC_SUB_DEL_FAILURE:
333                 go c.handleE2TSubscriptionDeleteFailure(msg)
334         default:
335                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
336         }
337         return
338 }
339
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344         xapp.Logger.Info("MSG from XAPP: %s", params.String())
345         c.UpdateCounter(cSubReqFromXapp)
346
347         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
348         if err != nil {
349                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
350                 return
351         }
352
353         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
354         if trans == nil {
355                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
356                 return
357         }
358         defer trans.Release()
359
360         if err = c.tracker.Track(trans); err != nil {
361                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
362                 return
363         }
364
365         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
367         if err != nil {
368                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
369                 return
370         }
371
372         c.wakeSubscriptionRequest(subs, trans)
373 }
374
375 //-------------------------------------------------------------------
376 // Wake Subscription Request to E2node
377 //------------------------------------------------------------------
378 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
379
380         go c.handleSubscriptionCreate(subs, trans)
381         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
382         var err error
383         if event != nil {
384                 switch themsg := event.(type) {
385                 case *e2ap.E2APSubscriptionResponse:
386                         themsg.RequestId.Id = trans.RequestId.Id
387                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
388                         if err == nil {
389                                 trans.Release()
390                                 c.UpdateCounter(cSubRespToXapp)
391                                 c.rmrSendToXapp("", subs, trans)
392                                 return
393                         }
394                 case *e2ap.E2APSubscriptionFailure:
395                         themsg.RequestId.Id = trans.RequestId.Id
396                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
397                         if err == nil {
398                                 c.UpdateCounter(cSubFailToXapp)
399                                 c.rmrSendToXapp("", subs, trans)
400                         }
401                 default:
402                         break
403                 }
404         }
405         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
406         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
407 }
408
409 //-------------------------------------------------------------------
410 // handle from XAPP Subscription Delete Request
411 //------------------------------------------------------------------
412 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
413         xapp.Logger.Info("MSG from XAPP: %s", params.String())
414         c.UpdateCounter(cSubDelReqFromXapp)
415
416         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
417         if err != nil {
418                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
419                 return
420         }
421
422         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
423         if trans == nil {
424                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
425                 return
426         }
427         defer trans.Release()
428
429         err = c.tracker.Track(trans)
430         if err != nil {
431                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
432                 return
433         }
434
435         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
436         if err != nil {
437                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
438                 return
439         }
440
441         //
442         // Wake subs delete
443         //
444         go c.handleSubscriptionDelete(subs, trans)
445         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
446
447         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
448
449         if subs.NoRespToXapp == true {
450                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
451                 return
452         }
453
454         // Whatever is received success, fail or timeout, send successful delete response
455         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
456         subDelRespMsg.RequestId.Id = trans.RequestId.Id
457         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
458         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
459         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
460         if err == nil {
461                 c.UpdateCounter(cSubDelRespToXapp)
462                 c.rmrSendToXapp("", subs, trans)
463         }
464
465         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
466         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
467 }
468
469 //-------------------------------------------------------------------
470 // SUBS CREATE Handling
471 //-------------------------------------------------------------------
472 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
473
474         var removeSubscriptionFromDb bool = false
475         trans := c.tracker.NewSubsTransaction(subs)
476         subs.WaitTransactionTurn(trans)
477         defer subs.ReleaseTransactionTurn(trans)
478         defer trans.Release()
479
480         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
481
482         subRfMsg, valid := subs.GetCachedResponse()
483         if subRfMsg == nil && valid == true {
484                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
485                 switch event.(type) {
486                 case *e2ap.E2APSubscriptionResponse:
487                         subRfMsg, valid = subs.SetCachedResponse(event, true)
488                         subs.SubRespRcvd = true
489                 case *e2ap.E2APSubscriptionFailure:
490                         removeSubscriptionFromDb = true
491                         subRfMsg, valid = subs.SetCachedResponse(event, false)
492                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
493                 case *SubmgrRestartTestEvent:
494                         // This simulates that no response has been received and after restart subscriptions are restored from db
495                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
496                         return
497                 default:
498                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
499                         removeSubscriptionFromDb = true
500                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
501                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
502                 }
503                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
504         } else {
505                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
506         }
507
508         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
509         if valid == false {
510                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
511         }
512
513         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
514         parentTrans.SendEvent(subRfMsg, 0)
515 }
516
517 //-------------------------------------------------------------------
518 // SUBS DELETE Handling
519 //-------------------------------------------------------------------
520
521 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
522
523         trans := c.tracker.NewSubsTransaction(subs)
524         subs.WaitTransactionTurn(trans)
525         defer subs.ReleaseTransactionTurn(trans)
526         defer trans.Release()
527
528         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
529
530         subs.mutex.Lock()
531
532         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
533                 subs.valid = false
534                 subs.mutex.Unlock()
535                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
536         } else {
537                 subs.mutex.Unlock()
538         }
539         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
540         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
541         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
542         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
543         c.registry.UpdateSubscriptionToDb(subs, c)
544         parentTrans.SendEvent(nil, 0)
545 }
546
547 //-------------------------------------------------------------------
548 // send to E2T Subscription Request
549 //-------------------------------------------------------------------
550 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
551         var err error
552         var event interface{} = nil
553         var timedOut bool = false
554
555         subReqMsg := subs.SubReqMsg
556         subReqMsg.RequestId = subs.GetReqId().RequestId
557         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
558         if err != nil {
559                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
560                 return event
561         }
562
563         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
564         c.WriteSubscriptionToDb(subs)
565         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
566                 desc := fmt.Sprintf("(retry %d)", retries)
567                 if retries == 0 {
568                         c.UpdateCounter(cSubReqToE2)
569                 } else {
570                         c.UpdateCounter(cSubReReqToE2)
571                 }
572                 c.rmrSendToE2T(desc, subs, trans)
573                 if subs.DoNotWaitSubResp == false {
574                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
575                         if timedOut {
576                                 c.UpdateCounter(cSubReqTimerExpiry)
577                                 continue
578                         }
579                 } else {
580                         // Simulating case where subscrition request has been sent but response has not been received before restart
581                         event = &SubmgrRestartTestEvent{}
582                 }
583                 break
584         }
585         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
586         return event
587 }
588
589 //-------------------------------------------------------------------
590 // send to E2T Subscription Delete Request
591 //-------------------------------------------------------------------
592
593 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
594         var err error
595         var event interface{}
596         var timedOut bool
597
598         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
599         subDelReqMsg.RequestId = subs.GetReqId().RequestId
600         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
601         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
602         if err != nil {
603                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
604                 return event
605         }
606
607         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
608                 desc := fmt.Sprintf("(retry %d)", retries)
609                 if retries == 0 {
610                         c.UpdateCounter(cSubDelReqToE2)
611                 } else {
612                         c.UpdateCounter(cSubDelReReqToE2)
613                 }
614                 c.rmrSendToE2T(desc, subs, trans)
615                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
616                 if timedOut {
617                         c.UpdateCounter(cSubDelReqTimerExpiry)
618                         continue
619                 }
620                 break
621         }
622         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
623         return event
624 }
625
626 //-------------------------------------------------------------------
627 // handle from E2T Subscription Response
628 //-------------------------------------------------------------------
629 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
630         xapp.Logger.Info("MSG from E2T: %s", params.String())
631         c.UpdateCounter(cSubRespFromE2)
632         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
633         if err != nil {
634                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
635                 return
636         }
637         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
638         if err != nil {
639                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
640                 return
641         }
642         trans := subs.GetTransaction()
643         if trans == nil {
644                 err = fmt.Errorf("Ongoing transaction not found")
645                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
646                 return
647         }
648         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
649         if sendOk == false {
650                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
651                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
652         }
653         return
654 }
655
656 //-------------------------------------------------------------------
657 // handle from E2T Subscription Failure
658 //-------------------------------------------------------------------
659 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
660         xapp.Logger.Info("MSG from E2T: %s", params.String())
661         c.UpdateCounter(cSubFailFromE2)
662         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
663         if err != nil {
664                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
665                 return
666         }
667         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
668         if err != nil {
669                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
670                 return
671         }
672         trans := subs.GetTransaction()
673         if trans == nil {
674                 err = fmt.Errorf("Ongoing transaction not found")
675                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
676                 return
677         }
678         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
679         if sendOk == false {
680                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
681                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
682         }
683         return
684 }
685
686 //-------------------------------------------------------------------
687 // handle from E2T Subscription Delete Response
688 //-------------------------------------------------------------------
689 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
690         xapp.Logger.Info("MSG from E2T: %s", params.String())
691         c.UpdateCounter(cSubDelRespFromE2)
692         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
693         if err != nil {
694                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
695                 return
696         }
697         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
698         if err != nil {
699                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
700                 return
701         }
702         trans := subs.GetTransaction()
703         if trans == nil {
704                 err = fmt.Errorf("Ongoing transaction not found")
705                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
706                 return
707         }
708         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
709         if sendOk == false {
710                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
711                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
712         }
713         return
714 }
715
716 //-------------------------------------------------------------------
717 // handle from E2T Subscription Delete Failure
718 //-------------------------------------------------------------------
719 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
720         xapp.Logger.Info("MSG from E2T: %s", params.String())
721         c.UpdateCounter(cSubDelFailFromE2)
722         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
723         if err != nil {
724                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
725                 return
726         }
727         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
728         if err != nil {
729                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
730                 return
731         }
732         trans := subs.GetTransaction()
733         if trans == nil {
734                 err = fmt.Errorf("Ongoing transaction not found")
735                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
736                 return
737         }
738         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
739         if sendOk == false {
740                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
741                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
742         }
743         return
744 }
745
746 //-------------------------------------------------------------------
747 //
748 //-------------------------------------------------------------------
749 func typeofSubsMessage(v interface{}) string {
750         if v == nil {
751                 return "NIL"
752         }
753         switch v.(type) {
754         case *e2ap.E2APSubscriptionRequest:
755                 return "SubReq"
756         case *e2ap.E2APSubscriptionResponse:
757                 return "SubResp"
758         case *e2ap.E2APSubscriptionFailure:
759                 return "SubFail"
760         case *e2ap.E2APSubscriptionDeleteRequest:
761                 return "SubDelReq"
762         case *e2ap.E2APSubscriptionDeleteResponse:
763                 return "SubDelResp"
764         case *e2ap.E2APSubscriptionDeleteFailure:
765                 return "SubDelFail"
766         default:
767                 return "Unknown"
768         }
769 }
770
771 //-------------------------------------------------------------------
772 //
773 //-------------------------------------------------------------------
774 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
775         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
776         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
777         if err != nil {
778                 xapp.Logger.Error("%v", err)
779         }
780 }
781
782 //-------------------------------------------------------------------
783 //
784 //-------------------------------------------------------------------
785 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
786
787         if removeSubscriptionFromDb == true {
788                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
789                 c.RemoveSubscriptionFromDb(subs)
790         } else {
791                 // Update is needed for successful response and merge case here
792                 if subs.RetryFromXapp == false {
793                         c.WriteSubscriptionToDb(subs)
794                 }
795         }
796         subs.RetryFromXapp = false
797 }
798
799 //-------------------------------------------------------------------
800 //
801 //-------------------------------------------------------------------
802 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
803         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
804         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
805         if err != nil {
806                 xapp.Logger.Error("%v", err)
807         }
808 }
809
810 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
811
812         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
813
814         // Send delete for every endpoint in the subscription
815         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
816         subDelReqMsg.RequestId = subs.GetReqId().RequestId
817         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
818         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
819         if err != nil {
820                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
821                 return
822         }
823         for _, endPoint := range subs.EpList.Endpoints {
824                 params := &xapp.RMRParams{}
825                 params.Mtype = mType
826                 params.SubId = int(subs.GetReqId().InstanceId)
827                 params.Xid = ""
828                 params.Meid = subs.Meid
829                 params.Src = endPoint.String()
830                 params.PayloadLen = len(payload.Buf)
831                 params.Payload = payload.Buf
832                 params.Mbuf = nil
833
834                 if params == nil {
835                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
836                         return
837                 }
838
839                 subs.DeleteFromDb = true
840                 c.handleXAPPSubscriptionDeleteRequest(params)
841         }
842 }