3446f255cc90aea68717b158030e516b42558363
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54         }
55         return retval
56 }
57
58 //-----------------------------------------------------------------------------
59 //
60 //-----------------------------------------------------------------------------
61
62 var e2tSubReqTimeout time.Duration
63 var e2tSubDelReqTime time.Duration
64 var e2tRecvMsgTimeout time.Duration
65 var e2tMaxSubReqTryCount uint64    // Initial try + retry
66 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
67 var readSubsFromDb string
68
69 type Control struct {
70         *xapp.RMRClient
71         e2ap     *E2ap
72         registry *Registry
73         tracker  *Tracker
74         db       Sdlnterface
75         //subscriber *xapp.Subscriber
76         CntRecvMsg    uint64
77         ResetTestFlag bool
78         Counters      map[string]xapp.Counter
79 }
80
81 type RMRMeid struct {
82         PlmnID  string
83         EnbID   string
84         RanName string
85 }
86
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
89
90 func init() {
91         xapp.Logger.Info("SUBMGR")
92         viper.AutomaticEnv()
93         viper.SetEnvPrefix("submgr")
94         viper.AllowEmptyEnv(true)
95 }
96
97 func NewControl() *Control {
98
99         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
100         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
101
102         registry := new(Registry)
103         registry.Initialize()
104         registry.rtmgrClient = &rtmgrClient
105
106         tracker := new(Tracker)
107         tracker.Init()
108
109         c := &Control{e2ap: new(E2ap),
110                 registry: registry,
111                 tracker:  tracker,
112                 db:       CreateSdl(),
113                 //subscriber: subscriber,
114                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
115         }
116         c.ReadConfigParameters("")
117
118         // Register REST handler for testing support
119         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
120         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
121
122         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123
124         if readSubsFromDb == "false" {
125                 return c
126         }
127
128         // Read subscriptions from db
129         xapp.Logger.Info("Reading subscriptions from db")
130         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
131         if err != nil {
132                 xapp.Logger.Error("%v", err)
133         } else {
134                 c.registry.subIds = subIds
135                 c.registry.register = register
136                 c.HandleUncompletedSubscriptions(register)
137         }
138         return c
139 }
140
141 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
142         subscriptions, _ := c.registry.QueryHandler()
143         xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
144 }
145
146 //-------------------------------------------------------------------
147 //
148 //-------------------------------------------------------------------
149 func (c *Control) ReadConfigParameters(f string) {
150
151         // viper.GetDuration returns nanoseconds
152         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
153         if e2tSubReqTimeout == 0 {
154                 e2tSubReqTimeout = 2000 * 1000000
155         }
156         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
157         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
158         if e2tSubDelReqTime == 0 {
159                 e2tSubDelReqTime = 2000 * 1000000
160         }
161         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
162         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
163         if e2tRecvMsgTimeout == 0 {
164                 e2tRecvMsgTimeout = 2000 * 1000000
165         }
166         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
167         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
168         if e2tMaxSubReqTryCount == 0 {
169                 e2tMaxSubReqTryCount = 1
170         }
171         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
172         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
173         if e2tMaxSubDelReqTryCount == 0 {
174                 e2tMaxSubDelReqTryCount = 1
175         }
176         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
177
178         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
179         if readSubsFromDb == "" {
180                 readSubsFromDb = "true"
181         }
182         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
183 }
184
185 //-------------------------------------------------------------------
186 //
187 //-------------------------------------------------------------------
188 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
189
190         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
191         for subId, subs := range register {
192                 if subs.SubRespRcvd == false {
193                         subs.NoRespToXapp = true
194                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
195                         c.SendSubscriptionDeleteReq(subs)
196                 }
197         }
198 }
199
200 func (c *Control) ReadyCB(data interface{}) {
201         if c.RMRClient == nil {
202                 c.RMRClient = xapp.Rmr
203         }
204 }
205
206 func (c *Control) Run() {
207         xapp.SetReadyCB(c.ReadyCB, nil)
208         xapp.AddConfigChangeListener(c.ReadConfigParameters)
209         xapp.Run(c)
210 }
211
212 //-------------------------------------------------------------------
213 //
214 //-------------------------------------------------------------------
215 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
216         /*
217            switch p := params.(type) {
218            case *models.ReportParams:
219                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
220                if trans == nil {
221                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
222                      return
223                }
224                defer trans.Release()
225            case *models.ControlParams:
226            case *models.PolicyParams:
227            }
228         */
229         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
230 }
231
232 func (c *Control) SubscriptionDeleteHandler(s string) error {
233         return nil
234 }
235
236 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
237         xapp.Logger.Info("QueryHandler() called")
238
239         return c.registry.QueryHandler()
240 }
241
242 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
243         xapp.Logger.Info("TestRestHandler() called")
244
245         pathParams := mux.Vars(r)
246         s := pathParams["testId"]
247
248         // This can be used to delete single subscription from db
249         if contains := strings.Contains(s, "deletesubid="); contains == true {
250                 var splits = strings.Split(s, "=")
251                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
252                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
253                         c.RemoveSubscriptionFromSdl(uint32(subId))
254                         return
255                 }
256         }
257
258         // This can be used to remove all subscriptions db from
259         if s == "emptydb" {
260                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
261                 c.RemoveAllSubscriptionsFromSdl()
262                 return
263         }
264
265         // This is meant to cause submgr's restart in testing
266         if s == "restart" {
267                 xapp.Logger.Info("os.Exit(1) called")
268                 os.Exit(1)
269         }
270
271         xapp.Logger.Info("Unsupported rest command received %s", s)
272 }
273
274 //-------------------------------------------------------------------
275 //
276 //-------------------------------------------------------------------
277
278 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
279         params := &xapp.RMRParams{}
280         params.Mtype = trans.GetMtype()
281         params.SubId = int(subs.GetReqId().InstanceId)
282         params.Xid = ""
283         params.Meid = subs.GetMeid()
284         params.Src = ""
285         params.PayloadLen = len(trans.Payload.Buf)
286         params.Payload = trans.Payload.Buf
287         params.Mbuf = nil
288         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
289         err = c.SendWithRetry(params, false, 5)
290         if err != nil {
291                 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
292         }
293         return err
294 }
295
296 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
297
298         params := &xapp.RMRParams{}
299         params.Mtype = trans.GetMtype()
300         params.SubId = int(subs.GetReqId().InstanceId)
301         params.Xid = trans.GetXid()
302         params.Meid = trans.GetMeid()
303         params.Src = ""
304         params.PayloadLen = len(trans.Payload.Buf)
305         params.Payload = trans.Payload.Buf
306         params.Mbuf = nil
307         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
308         err = c.SendWithRetry(params, false, 5)
309         if err != nil {
310                 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
311         }
312         return err
313 }
314
315 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
316         if c.RMRClient == nil {
317                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
318                 xapp.Logger.Error("%s", err.Error())
319                 return
320         }
321         c.CntRecvMsg++
322
323         defer c.RMRClient.Free(msg.Mbuf)
324
325         // xapp-frame might use direct access to c buffer and
326         // when msg.Mbuf is freed, someone might take it into use
327         // and payload data might be invalid inside message handle function
328         //
329         // subscriptions won't load system a lot so there is no
330         // real performance hit by cloning buffer into new go byte slice
331         cPay := append(msg.Payload[:0:0], msg.Payload...)
332         msg.Payload = cPay
333         msg.PayloadLen = len(cPay)
334
335         switch msg.Mtype {
336         case xapp.RIC_SUB_REQ:
337                 go c.handleXAPPSubscriptionRequest(msg)
338         case xapp.RIC_SUB_RESP:
339                 go c.handleE2TSubscriptionResponse(msg)
340         case xapp.RIC_SUB_FAILURE:
341                 go c.handleE2TSubscriptionFailure(msg)
342         case xapp.RIC_SUB_DEL_REQ:
343                 go c.handleXAPPSubscriptionDeleteRequest(msg)
344         case xapp.RIC_SUB_DEL_RESP:
345                 go c.handleE2TSubscriptionDeleteResponse(msg)
346         case xapp.RIC_SUB_DEL_FAILURE:
347                 go c.handleE2TSubscriptionDeleteFailure(msg)
348         default:
349                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
350         }
351         return
352 }
353
354 //-------------------------------------------------------------------
355 // handle from XAPP Subscription Request
356 //------------------------------------------------------------------
357 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
358         xapp.Logger.Info("MSG from XAPP: %s", params.String())
359         c.UpdateCounter(cSubReqFromXapp)
360
361         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
362         if err != nil {
363                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
364                 return
365         }
366
367         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
368         if trans == nil {
369                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
370                 return
371         }
372         defer trans.Release()
373
374         if err = c.tracker.Track(trans); err != nil {
375                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
376                 return
377         }
378
379         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
380         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
381         if err != nil {
382                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
383                 return
384         }
385
386         c.wakeSubscriptionRequest(subs, trans)
387 }
388
389 //-------------------------------------------------------------------
390 // Wake Subscription Request to E2node
391 //------------------------------------------------------------------
392 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
393
394         go c.handleSubscriptionCreate(subs, trans)
395         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
396         var err error
397         if event != nil {
398                 switch themsg := event.(type) {
399                 case *e2ap.E2APSubscriptionResponse:
400                         themsg.RequestId.Id = trans.RequestId.Id
401                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
402                         if err == nil {
403                                 trans.Release()
404                                 c.UpdateCounter(cSubRespToXapp)
405                                 c.rmrSendToXapp("", subs, trans)
406                                 return
407                         }
408                 case *e2ap.E2APSubscriptionFailure:
409                         themsg.RequestId.Id = trans.RequestId.Id
410                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
411                         if err == nil {
412                                 c.UpdateCounter(cSubFailToXapp)
413                                 c.rmrSendToXapp("", subs, trans)
414                         }
415                 default:
416                         break
417                 }
418         }
419         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
420         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
421 }
422
423 //-------------------------------------------------------------------
424 // handle from XAPP Subscription Delete Request
425 //------------------------------------------------------------------
426 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
427         xapp.Logger.Info("MSG from XAPP: %s", params.String())
428         c.UpdateCounter(cSubDelReqFromXapp)
429
430         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
431         if err != nil {
432                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
433                 return
434         }
435
436         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
437         if trans == nil {
438                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
439                 return
440         }
441         defer trans.Release()
442
443         err = c.tracker.Track(trans)
444         if err != nil {
445                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
446                 return
447         }
448
449         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
450         if err != nil {
451                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
452                 return
453         }
454
455         //
456         // Wake subs delete
457         //
458         go c.handleSubscriptionDelete(subs, trans)
459         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
460
461         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
462
463         if subs.NoRespToXapp == true {
464                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
465                 return
466         }
467
468         // Whatever is received success, fail or timeout, send successful delete response
469         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
470         subDelRespMsg.RequestId.Id = trans.RequestId.Id
471         subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
472         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
473         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
474         if err == nil {
475                 c.UpdateCounter(cSubDelRespToXapp)
476                 c.rmrSendToXapp("", subs, trans)
477         }
478
479         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
480         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
481 }
482
483 //-------------------------------------------------------------------
484 // SUBS CREATE Handling
485 //-------------------------------------------------------------------
486 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
487
488         var removeSubscriptionFromDb bool = false
489         trans := c.tracker.NewSubsTransaction(subs)
490         subs.WaitTransactionTurn(trans)
491         defer subs.ReleaseTransactionTurn(trans)
492         defer trans.Release()
493
494         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
495
496         subRfMsg, valid := subs.GetCachedResponse()
497         if subRfMsg == nil && valid == true {
498                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
499                 switch event.(type) {
500                 case *e2ap.E2APSubscriptionResponse:
501                         subRfMsg, valid = subs.SetCachedResponse(event, true)
502                         subs.SubRespRcvd = true
503                 case *e2ap.E2APSubscriptionFailure:
504                         removeSubscriptionFromDb = true
505                         subRfMsg, valid = subs.SetCachedResponse(event, false)
506                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
507                 case *SubmgrRestartTestEvent:
508                         // This simulates that no response has been received and after restart subscriptions are restored from db
509                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
510                         return
511                 default:
512                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
513                         removeSubscriptionFromDb = true
514                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
515                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
516                 }
517                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
518         } else {
519                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
520         }
521
522         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
523         if valid == false {
524                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
525         }
526
527         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
528         parentTrans.SendEvent(subRfMsg, 0)
529 }
530
531 //-------------------------------------------------------------------
532 // SUBS DELETE Handling
533 //-------------------------------------------------------------------
534
535 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
536
537         trans := c.tracker.NewSubsTransaction(subs)
538         subs.WaitTransactionTurn(trans)
539         defer subs.ReleaseTransactionTurn(trans)
540         defer trans.Release()
541
542         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
543
544         subs.mutex.Lock()
545
546         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
547                 subs.valid = false
548                 subs.mutex.Unlock()
549                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
550         } else {
551                 subs.mutex.Unlock()
552         }
553         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
554         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
555         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
556         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
557         c.registry.UpdateSubscriptionToDb(subs, c)
558         parentTrans.SendEvent(nil, 0)
559 }
560
561 //-------------------------------------------------------------------
562 // send to E2T Subscription Request
563 //-------------------------------------------------------------------
564 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
565         var err error
566         var event interface{} = nil
567         var timedOut bool = false
568
569         subReqMsg := subs.SubReqMsg
570         subReqMsg.RequestId = subs.GetReqId().RequestId
571         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
572         if err != nil {
573                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
574                 return event
575         }
576
577         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
578         c.WriteSubscriptionToDb(subs)
579         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
580                 desc := fmt.Sprintf("(retry %d)", retries)
581                 if retries == 0 {
582                         c.UpdateCounter(cSubReqToE2)
583                 } else {
584                         c.UpdateCounter(cSubReReqToE2)
585                 }
586                 c.rmrSendToE2T(desc, subs, trans)
587                 if subs.DoNotWaitSubResp == false {
588                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
589                         if timedOut {
590                                 c.UpdateCounter(cSubReqTimerExpiry)
591                                 continue
592                         }
593                 } else {
594                         // Simulating case where subscrition request has been sent but response has not been received before restart
595                         event = &SubmgrRestartTestEvent{}
596                 }
597                 break
598         }
599         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
600         return event
601 }
602
603 //-------------------------------------------------------------------
604 // send to E2T Subscription Delete Request
605 //-------------------------------------------------------------------
606
607 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
608         var err error
609         var event interface{}
610         var timedOut bool
611
612         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
613         subDelReqMsg.RequestId = subs.GetReqId().RequestId
614         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
615         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
616         if err != nil {
617                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
618                 return event
619         }
620
621         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
622                 desc := fmt.Sprintf("(retry %d)", retries)
623                 if retries == 0 {
624                         c.UpdateCounter(cSubDelReqToE2)
625                 } else {
626                         c.UpdateCounter(cSubDelReReqToE2)
627                 }
628                 c.rmrSendToE2T(desc, subs, trans)
629                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
630                 if timedOut {
631                         c.UpdateCounter(cSubDelReqTimerExpiry)
632                         continue
633                 }
634                 break
635         }
636         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
637         return event
638 }
639
640 //-------------------------------------------------------------------
641 // handle from E2T Subscription Response
642 //-------------------------------------------------------------------
643 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
644         xapp.Logger.Info("MSG from E2T: %s", params.String())
645         c.UpdateCounter(cSubRespFromE2)
646         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
647         if err != nil {
648                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
649                 return
650         }
651         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
652         if err != nil {
653                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
654                 return
655         }
656         trans := subs.GetTransaction()
657         if trans == nil {
658                 err = fmt.Errorf("Ongoing transaction not found")
659                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
660                 return
661         }
662         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
663         if sendOk == false {
664                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
665                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
666         }
667         return
668 }
669
670 //-------------------------------------------------------------------
671 // handle from E2T Subscription Failure
672 //-------------------------------------------------------------------
673 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
674         xapp.Logger.Info("MSG from E2T: %s", params.String())
675         c.UpdateCounter(cSubFailFromE2)
676         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
677         if err != nil {
678                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
679                 return
680         }
681         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
682         if err != nil {
683                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
684                 return
685         }
686         trans := subs.GetTransaction()
687         if trans == nil {
688                 err = fmt.Errorf("Ongoing transaction not found")
689                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
690                 return
691         }
692         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
693         if sendOk == false {
694                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
695                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
696         }
697         return
698 }
699
700 //-------------------------------------------------------------------
701 // handle from E2T Subscription Delete Response
702 //-------------------------------------------------------------------
703 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
704         xapp.Logger.Info("MSG from E2T: %s", params.String())
705         c.UpdateCounter(cSubDelRespFromE2)
706         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
707         if err != nil {
708                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
709                 return
710         }
711         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
712         if err != nil {
713                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
714                 return
715         }
716         trans := subs.GetTransaction()
717         if trans == nil {
718                 err = fmt.Errorf("Ongoing transaction not found")
719                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
720                 return
721         }
722         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
723         if sendOk == false {
724                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
725                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
726         }
727         return
728 }
729
730 //-------------------------------------------------------------------
731 // handle from E2T Subscription Delete Failure
732 //-------------------------------------------------------------------
733 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
734         xapp.Logger.Info("MSG from E2T: %s", params.String())
735         c.UpdateCounter(cSubDelFailFromE2)
736         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
737         if err != nil {
738                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
739                 return
740         }
741         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
742         if err != nil {
743                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
744                 return
745         }
746         trans := subs.GetTransaction()
747         if trans == nil {
748                 err = fmt.Errorf("Ongoing transaction not found")
749                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
750                 return
751         }
752         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
753         if sendOk == false {
754                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
755                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
756         }
757         return
758 }
759
760 //-------------------------------------------------------------------
761 //
762 //-------------------------------------------------------------------
763 func typeofSubsMessage(v interface{}) string {
764         if v == nil {
765                 return "NIL"
766         }
767         switch v.(type) {
768         case *e2ap.E2APSubscriptionRequest:
769                 return "SubReq"
770         case *e2ap.E2APSubscriptionResponse:
771                 return "SubResp"
772         case *e2ap.E2APSubscriptionFailure:
773                 return "SubFail"
774         case *e2ap.E2APSubscriptionDeleteRequest:
775                 return "SubDelReq"
776         case *e2ap.E2APSubscriptionDeleteResponse:
777                 return "SubDelResp"
778         case *e2ap.E2APSubscriptionDeleteFailure:
779                 return "SubDelFail"
780         default:
781                 return "Unknown"
782         }
783 }
784
785 //-------------------------------------------------------------------
786 //
787 //-------------------------------------------------------------------
788 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
789         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
790         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
791         if err != nil {
792                 xapp.Logger.Error("%v", err)
793         }
794 }
795
796 //-------------------------------------------------------------------
797 //
798 //-------------------------------------------------------------------
799 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
800
801         if removeSubscriptionFromDb == true {
802                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
803                 c.RemoveSubscriptionFromDb(subs)
804         } else {
805                 // Update is needed for successful response and merge case here
806                 if subs.RetryFromXapp == false {
807                         c.WriteSubscriptionToDb(subs)
808                 }
809         }
810         subs.RetryFromXapp = false
811 }
812
813 //-------------------------------------------------------------------
814 //
815 //-------------------------------------------------------------------
816 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
817         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
818         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
819         if err != nil {
820                 xapp.Logger.Error("%v", err)
821         }
822 }
823
824 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
825
826         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
827
828         // Send delete for every endpoint in the subscription
829         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
830         subDelReqMsg.RequestId = subs.GetReqId().RequestId
831         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
832         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
833         if err != nil {
834                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
835                 return
836         }
837         for _, endPoint := range subs.EpList.Endpoints {
838                 params := &xapp.RMRParams{}
839                 params.Mtype = mType
840                 params.SubId = int(subs.GetReqId().InstanceId)
841                 params.Xid = ""
842                 params.Meid = subs.Meid
843                 params.Src = endPoint.String()
844                 params.PayloadLen = len(payload.Buf)
845                 params.Payload = payload.Buf
846                 params.Mbuf = nil
847
848                 if params == nil {
849                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
850                         return
851                 }
852
853                 subs.DeleteFromDb = true
854                 c.handleXAPPSubscriptionDeleteRequest(params)
855         }
856 }