Submgr restart improvement
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79 }
80
81 type RMRMeid struct {
82         PlmnID  string
83         EnbID   string
84         RanName string
85 }
86
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
89
90 func init() {
91         xapp.Logger.Info("SUBMGR")
92         viper.AutomaticEnv()
93         viper.SetEnvPrefix("submgr")
94         viper.AllowEmptyEnv(true)
95 }
96
97 func NewControl() *Control {
98
99         ReadConfigParameters()
100         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102
103         registry := new(Registry)
104         registry.Initialize()
105         registry.rtmgrClient = &rtmgrClient
106
107         tracker := new(Tracker)
108         tracker.Init()
109
110         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
111
112         c := &Control{e2ap: new(E2ap),
113                 registry: registry,
114                 tracker:  tracker,
115                 db:       CreateSdl(),
116                 //subscriber: subscriber,
117         }
118
119         // Register REST handler for testing support
120         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
121
122         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
124
125         if readSubsFromDb == "false" {
126                 return c
127         }
128
129         // Read subscriptions from db
130         xapp.Logger.Info("Reading subscriptions from db")
131         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
132         if err != nil {
133                 xapp.Logger.Error("%v", err)
134         } else {
135                 c.registry.subIds = subIds
136                 c.registry.register = register
137                 c.HandleUncompletedSubscriptions(register)
138         }
139         return c
140 }
141
142 //-------------------------------------------------------------------
143 //
144 //-------------------------------------------------------------------
145 func ReadConfigParameters() {
146
147         // viper.GetDuration returns nanoseconds
148         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
149         if e2tSubReqTimeout == 0 {
150                 e2tSubReqTimeout = 2000 * 1000000
151         }
152         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
153         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
154         if e2tSubDelReqTime == 0 {
155                 e2tSubDelReqTime = 2000 * 1000000
156         }
157         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
158         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
159         if e2tRecvMsgTimeout == 0 {
160                 e2tRecvMsgTimeout = 2000 * 1000000
161         }
162         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
163         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
164         if e2tMaxSubReqTryCount == 0 {
165                 e2tMaxSubReqTryCount = 1
166         }
167         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
168         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
169         if e2tMaxSubDelReqTryCount == 0 {
170                 e2tMaxSubDelReqTryCount = 1
171         }
172         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
173
174         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
175         if readSubsFromDb == "" {
176                 readSubsFromDb = "true"
177         }
178         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
179 }
180
181 //-------------------------------------------------------------------
182 //
183 //-------------------------------------------------------------------
184 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
185
186         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
187         for subId, subs := range register {
188                 if subs.SubRespRcvd == false {
189                         subs.NoRespToXapp = true
190                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
191                         c.SendSubscriptionDeleteReq(subs)
192                 }
193         }
194 }
195
196 func (c *Control) ReadyCB(data interface{}) {
197         if c.RMRClient == nil {
198                 c.RMRClient = xapp.Rmr
199         }
200 }
201
202 func (c *Control) Run() {
203         xapp.SetReadyCB(c.ReadyCB, nil)
204         xapp.Run(c)
205 }
206
207 //-------------------------------------------------------------------
208 //
209 //-------------------------------------------------------------------
210 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
211         /*
212            switch p := params.(type) {
213            case *models.ReportParams:
214                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
215                if trans == nil {
216                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
217                      return
218                }
219                defer trans.Release()
220            case *models.ControlParams:
221            case *models.PolicyParams:
222            }
223         */
224         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
225 }
226
227 func (c *Control) SubscriptionDeleteHandler(s string) error {
228         return nil
229 }
230
231 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
232         return c.registry.QueryHandler()
233 }
234
235 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
236
237         xapp.Logger.Info("TestRestHandler() called")
238
239         pathParams := mux.Vars(r)
240         s := pathParams["testId"]
241
242         // This can be used to delete single subscription from db
243         if contains := strings.Contains(s, "deletesubid="); contains == true {
244                 var splits = strings.Split(s, "=")
245                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247                         c.RemoveSubscriptionFromSdl(uint32(subId))
248                         return
249                 }
250         }
251
252         // This can be used to remove all subscriptions db from
253         if s == "emptydb" {
254                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255                 c.RemoveAllSubscriptionsFromSdl()
256                 return
257         }
258
259         // This is meant to cause submgr's restart in testing
260         if s == "restart" {
261                 xapp.Logger.Info("os.Exit(1) called")
262                 os.Exit(1)
263         }
264
265         xapp.Logger.Info("Unsupported rest command received %s", s)
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273         params := &xapp.RMRParams{}
274         params.Mtype = trans.GetMtype()
275         params.SubId = int(subs.GetReqId().InstanceId)
276         params.Xid = ""
277         params.Meid = subs.GetMeid()
278         params.Src = ""
279         params.PayloadLen = len(trans.Payload.Buf)
280         params.Payload = trans.Payload.Buf
281         params.Mbuf = nil
282         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283         return c.SendWithRetry(params, false, 5)
284 }
285
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
287
288         params := &xapp.RMRParams{}
289         params.Mtype = trans.GetMtype()
290         params.SubId = int(subs.GetReqId().InstanceId)
291         params.Xid = trans.GetXid()
292         params.Meid = trans.GetMeid()
293         params.Src = ""
294         params.PayloadLen = len(trans.Payload.Buf)
295         params.Payload = trans.Payload.Buf
296         params.Mbuf = nil
297         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298         return c.SendWithRetry(params, false, 5)
299 }
300
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302         if c.RMRClient == nil {
303                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304                 xapp.Logger.Error("%s", err.Error())
305                 return
306         }
307         c.CntRecvMsg++
308
309         defer c.RMRClient.Free(msg.Mbuf)
310
311         // xapp-frame might use direct access to c buffer and
312         // when msg.Mbuf is freed, someone might take it into use
313         // and payload data might be invalid inside message handle function
314         //
315         // subscriptions won't load system a lot so there is no
316         // real performance hit by cloning buffer into new go byte slice
317         cPay := append(msg.Payload[:0:0], msg.Payload...)
318         msg.Payload = cPay
319         msg.PayloadLen = len(cPay)
320
321         switch msg.Mtype {
322         case xapp.RIC_SUB_REQ:
323                 go c.handleXAPPSubscriptionRequest(msg)
324         case xapp.RIC_SUB_RESP:
325                 go c.handleE2TSubscriptionResponse(msg)
326         case xapp.RIC_SUB_FAILURE:
327                 go c.handleE2TSubscriptionFailure(msg)
328         case xapp.RIC_SUB_DEL_REQ:
329                 go c.handleXAPPSubscriptionDeleteRequest(msg)
330         case xapp.RIC_SUB_DEL_RESP:
331                 go c.handleE2TSubscriptionDeleteResponse(msg)
332         case xapp.RIC_SUB_DEL_FAILURE:
333                 go c.handleE2TSubscriptionDeleteFailure(msg)
334         default:
335                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
336         }
337         return
338 }
339
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344         xapp.Logger.Info("MSG from XAPP: %s", params.String())
345
346         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
347         if err != nil {
348                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
349                 return
350         }
351
352         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
353         if trans == nil {
354                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
355                 return
356         }
357         defer trans.Release()
358
359         err = c.tracker.Track(trans)
360         if err != nil {
361                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
362                 return
363         }
364
365         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
367         if err != nil {
368                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
369                 return
370         }
371
372         //
373         // Wake subs request
374         //
375         go c.handleSubscriptionCreate(subs, trans)
376         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
377         err = nil
378         if event != nil {
379                 switch themsg := event.(type) {
380                 case *e2ap.E2APSubscriptionResponse:
381                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
382                         if err == nil {
383                                 trans.Release()
384                                 c.rmrSendToXapp("", subs, trans)
385                                 return
386                         }
387                 case *e2ap.E2APSubscriptionFailure:
388                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
389                         if err == nil {
390                                 c.rmrSendToXapp("", subs, trans)
391                         }
392                 default:
393                         break
394                 }
395         }
396         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
397         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
398 }
399
400 //-------------------------------------------------------------------
401 // handle from XAPP Subscription Delete Request
402 //------------------------------------------------------------------
403 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
404         xapp.Logger.Info("MSG from XAPP: %s", params.String())
405
406         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
407         if err != nil {
408                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
409                 return
410         }
411
412         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
413         if trans == nil {
414                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
415                 return
416         }
417         defer trans.Release()
418
419         err = c.tracker.Track(trans)
420         if err != nil {
421                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
422                 return
423         }
424
425         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
426         if err != nil {
427                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
428                 return
429         }
430
431         //
432         // Wake subs delete
433         //
434         go c.handleSubscriptionDelete(subs, trans)
435         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
436
437         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
438
439         if subs.NoRespToXapp == true {
440                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
441                 return
442         }
443
444         // Whatever is received success, fail or timeout, send successful delete response
445         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
446         subDelRespMsg.RequestId = subs.GetReqId().RequestId
447         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
448         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
449         if err == nil {
450                 c.rmrSendToXapp("", subs, trans)
451         }
452
453         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
454         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
455 }
456
457 //-------------------------------------------------------------------
458 // SUBS CREATE Handling
459 //-------------------------------------------------------------------
460 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
461
462         var removeSubscriptionFromDb bool = false
463         trans := c.tracker.NewSubsTransaction(subs)
464         subs.WaitTransactionTurn(trans)
465         defer subs.ReleaseTransactionTurn(trans)
466         defer trans.Release()
467
468         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
469
470         subRfMsg, valid := subs.GetCachedResponse()
471         if subRfMsg == nil && valid == true {
472
473                 //
474                 // In case of failure
475                 // - make internal delete
476                 // - in case duplicate cause, retry (currently max 1 retry)
477                 //
478                 maxRetries := uint64(1)
479                 doRetry := true
480                 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
481                         doRetry = false
482
483                         event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
484                         switch themsg := event.(type) {
485                         case *e2ap.E2APSubscriptionResponse:
486                                 subRfMsg, valid = subs.SetCachedResponse(event, true)
487                                 subs.SubRespRcvd = true
488                         case *e2ap.E2APSubscriptionFailure:
489                                 removeSubscriptionFromDb = true
490                                 subRfMsg, valid = subs.SetCachedResponse(event, false)
491                                 doRetry = true
492                                 for _, item := range themsg.ActionNotAdmittedList.Items {
493                                         if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
494                                                 doRetry = false
495                                                 break
496                                         }
497                                 }
498                                 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
499                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
500                         case *SubmgrRestartTestEvent:
501                                 // This simulates that no response has been received and after restart subscriptions are restored from db
502                                 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
503                                 return
504                         default:
505                                 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
506                                 removeSubscriptionFromDb = true
507                                 subRfMsg, valid = subs.SetCachedResponse(nil, false)
508                                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
509                         }
510                 }
511                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
512         } else {
513                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
514         }
515
516         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
517         if valid == false {
518                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
519         }
520
521         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
522         parentTrans.SendEvent(subRfMsg, 0)
523 }
524
525 //-------------------------------------------------------------------
526 // SUBS DELETE Handling
527 //-------------------------------------------------------------------
528
529 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
530
531         trans := c.tracker.NewSubsTransaction(subs)
532         subs.WaitTransactionTurn(trans)
533         defer subs.ReleaseTransactionTurn(trans)
534         defer trans.Release()
535
536         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
537
538         subs.mutex.Lock()
539         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
540                 subs.valid = false
541                 subs.mutex.Unlock()
542                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
543         } else {
544                 subs.mutex.Unlock()
545         }
546         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
547         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
548         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
549         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
550         c.registry.UpdateSubscriptionToDb(subs, c)
551         parentTrans.SendEvent(nil, 0)
552 }
553
554 //-------------------------------------------------------------------
555 // send to E2T Subscription Request
556 //-------------------------------------------------------------------
557 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
558         var err error
559         var event interface{} = nil
560         var timedOut bool = false
561
562         subReqMsg := subs.SubReqMsg
563         subReqMsg.RequestId = subs.GetReqId().RequestId
564         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
565         if err != nil {
566                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
567                 return event
568         }
569
570         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
571         c.WriteSubscriptionToDb(subs)
572         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
573                 desc := fmt.Sprintf("(retry %d)", retries)
574                 c.rmrSendToE2T(desc, subs, trans)
575                 if subs.DoNotWaitSubResp == false {
576                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
577                         if timedOut {
578                                 continue
579                         }
580                 } else {
581                         // Simulating case where subscrition request has been sent but response has not been received before restart
582                         event = &SubmgrRestartTestEvent{}
583                 }
584                 break
585         }
586         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
587         return event
588 }
589
590 //-------------------------------------------------------------------
591 // send to E2T Subscription Delete Request
592 //-------------------------------------------------------------------
593
594 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
595         var err error
596         var event interface{}
597         var timedOut bool
598
599         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
600         subDelReqMsg.RequestId = subs.GetReqId().RequestId
601         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
602         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
603         if err != nil {
604                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
605                 return event
606         }
607
608         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
609                 desc := fmt.Sprintf("(retry %d)", retries)
610                 c.rmrSendToE2T(desc, subs, trans)
611                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
612                 if timedOut {
613                         continue
614                 }
615                 break
616         }
617         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
618         return event
619 }
620
621 //-------------------------------------------------------------------
622 // handle from E2T Subscription Reponse
623 //-------------------------------------------------------------------
624 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
625         xapp.Logger.Info("MSG from E2T: %s", params.String())
626         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
627         if err != nil {
628                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
629                 return
630         }
631         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
632         if err != nil {
633                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
634                 return
635         }
636         trans := subs.GetTransaction()
637         if trans == nil {
638                 err = fmt.Errorf("Ongoing transaction not found")
639                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
640                 return
641         }
642         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
643         if sendOk == false {
644                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
645                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
646         }
647         return
648 }
649
650 //-------------------------------------------------------------------
651 // handle from E2T Subscription Failure
652 //-------------------------------------------------------------------
653 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
654         xapp.Logger.Info("MSG from E2T: %s", params.String())
655         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
656         if err != nil {
657                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
658                 return
659         }
660         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
661         if err != nil {
662                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
663                 return
664         }
665         trans := subs.GetTransaction()
666         if trans == nil {
667                 err = fmt.Errorf("Ongoing transaction not found")
668                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
669                 return
670         }
671         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
672         if sendOk == false {
673                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
674                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
675         }
676         return
677 }
678
679 //-------------------------------------------------------------------
680 // handle from E2T Subscription Delete Response
681 //-------------------------------------------------------------------
682 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
683         xapp.Logger.Info("MSG from E2T: %s", params.String())
684         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
685         if err != nil {
686                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
687                 return
688         }
689         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
690         if err != nil {
691                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
692                 return
693         }
694         trans := subs.GetTransaction()
695         if trans == nil {
696                 err = fmt.Errorf("Ongoing transaction not found")
697                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
698                 return
699         }
700         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
701         if sendOk == false {
702                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
703                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
704         }
705         return
706 }
707
708 //-------------------------------------------------------------------
709 // handle from E2T Subscription Delete Failure
710 //-------------------------------------------------------------------
711 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
712         xapp.Logger.Info("MSG from E2T: %s", params.String())
713         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
714         if err != nil {
715                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
716                 return
717         }
718         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
719         if err != nil {
720                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
721                 return
722         }
723         trans := subs.GetTransaction()
724         if trans == nil {
725                 err = fmt.Errorf("Ongoing transaction not found")
726                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
727                 return
728         }
729         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
730         if sendOk == false {
731                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
732                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
733         }
734         return
735 }
736
737 //-------------------------------------------------------------------
738 //
739 //-------------------------------------------------------------------
740 func typeofSubsMessage(v interface{}) string {
741         if v == nil {
742                 return "NIL"
743         }
744         switch v.(type) {
745         case *e2ap.E2APSubscriptionRequest:
746                 return "SubReq"
747         case *e2ap.E2APSubscriptionResponse:
748                 return "SubResp"
749         case *e2ap.E2APSubscriptionFailure:
750                 return "SubFail"
751         case *e2ap.E2APSubscriptionDeleteRequest:
752                 return "SubDelReq"
753         case *e2ap.E2APSubscriptionDeleteResponse:
754                 return "SubDelResp"
755         case *e2ap.E2APSubscriptionDeleteFailure:
756                 return "SubDelFail"
757         default:
758                 return "Unknown"
759         }
760 }
761
762 //-------------------------------------------------------------------
763 //
764 //-------------------------------------------------------------------
765 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
766         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
767         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
768         if err != nil {
769                 xapp.Logger.Error("%v", err)
770         }
771 }
772
773 //-------------------------------------------------------------------
774 //
775 //-------------------------------------------------------------------
776 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
777
778         if removeSubscriptionFromDb == true {
779                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
780                 c.RemoveSubscriptionFromDb(subs)
781         } else {
782                 // Update is needed for successful response and merge case here
783                 if subs.RetryFromXapp == false {
784                         c.WriteSubscriptionToDb(subs)
785                 }
786         }
787         subs.RetryFromXapp = false
788 }
789
790 //-------------------------------------------------------------------
791 //
792 //-------------------------------------------------------------------
793 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
794         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
795         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
796         if err != nil {
797                 xapp.Logger.Error("%v", err)
798         }
799 }
800
801 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
802
803         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
804
805         // Send delete for every endpoint in the subscription
806         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
807         subDelReqMsg.RequestId = subs.GetReqId().RequestId
808         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
809         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
810         if err != nil {
811                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
812                 return
813         }
814         for _, endPoint := range subs.EpList.Endpoints {
815                 params := &xapp.RMRParams{}
816                 params.Mtype = mType
817                 params.SubId = int(subs.GetReqId().InstanceId)
818                 params.Xid = ""
819                 params.Meid = subs.Meid
820                 params.Src = endPoint.String()
821                 params.PayloadLen = len(payload.Buf)
822                 params.Payload = payload.Buf
823                 params.Mbuf = nil
824
825                 if params == nil {
826                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
827                         return
828                 }
829
830                 subs.DeleteFromDb = true
831                 c.handleXAPPSubscriptionDeleteRequest(params)
832         }
833 }