eff14eed912266a3a07e36c9ba9c55161699055a
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79         Counters      map[string]xapp.Counter
80 }
81
82 type RMRMeid struct {
83         PlmnID  string
84         EnbID   string
85         RanName string
86 }
87
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
90
91 func init() {
92         xapp.Logger.Info("SUBMGR")
93         viper.AutomaticEnv()
94         viper.SetEnvPrefix("submgr")
95         viper.AllowEmptyEnv(true)
96 }
97
98 func NewControl() *Control {
99
100         ReadConfigParameters()
101         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103
104         registry := new(Registry)
105         registry.Initialize()
106         registry.rtmgrClient = &rtmgrClient
107
108         tracker := new(Tracker)
109         tracker.Init()
110
111         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112
113         c := &Control{e2ap: new(E2ap),
114                 registry: registry,
115                 tracker:  tracker,
116                 db:       CreateSdl(),
117                 //subscriber: subscriber,
118                 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119         }
120
121         // Register REST handler for testing support
122         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123
124         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
126
127         if readSubsFromDb == "false" {
128                 return c
129         }
130
131         // Read subscriptions from db
132         xapp.Logger.Info("Reading subscriptions from db")
133         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134         if err != nil {
135                 xapp.Logger.Error("%v", err)
136         } else {
137                 c.registry.subIds = subIds
138                 c.registry.register = register
139                 c.HandleUncompletedSubscriptions(register)
140         }
141         return c
142 }
143
144 //-------------------------------------------------------------------
145 //
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
148
149         // viper.GetDuration returns nanoseconds
150         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151         if e2tSubReqTimeout == 0 {
152                 e2tSubReqTimeout = 2000 * 1000000
153         }
154         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156         if e2tSubDelReqTime == 0 {
157                 e2tSubDelReqTime = 2000 * 1000000
158         }
159         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161         if e2tRecvMsgTimeout == 0 {
162                 e2tRecvMsgTimeout = 2000 * 1000000
163         }
164         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166         if e2tMaxSubReqTryCount == 0 {
167                 e2tMaxSubReqTryCount = 1
168         }
169         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171         if e2tMaxSubDelReqTryCount == 0 {
172                 e2tMaxSubDelReqTryCount = 1
173         }
174         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
175
176         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177         if readSubsFromDb == "" {
178                 readSubsFromDb = "true"
179         }
180         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 }
182
183 //-------------------------------------------------------------------
184 //
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
187
188         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189         for subId, subs := range register {
190                 if subs.SubRespRcvd == false {
191                         subs.NoRespToXapp = true
192                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193                         c.SendSubscriptionDeleteReq(subs)
194                 }
195         }
196 }
197
198 func (c *Control) ReadyCB(data interface{}) {
199         if c.RMRClient == nil {
200                 c.RMRClient = xapp.Rmr
201         }
202 }
203
204 func (c *Control) Run() {
205         xapp.SetReadyCB(c.ReadyCB, nil)
206         xapp.Run(c)
207 }
208
209 //-------------------------------------------------------------------
210 //
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
213         /*
214            switch p := params.(type) {
215            case *models.ReportParams:
216                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
217                if trans == nil {
218                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219                      return
220                }
221                defer trans.Release()
222            case *models.ControlParams:
223            case *models.PolicyParams:
224            }
225         */
226         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 }
228
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
230         return nil
231 }
232
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234         return c.registry.QueryHandler()
235 }
236
237 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
238
239         xapp.Logger.Info("TestRestHandler() called")
240
241         pathParams := mux.Vars(r)
242         s := pathParams["testId"]
243
244         // This can be used to delete single subscription from db
245         if contains := strings.Contains(s, "deletesubid="); contains == true {
246                 var splits = strings.Split(s, "=")
247                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
248                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
249                         c.RemoveSubscriptionFromSdl(uint32(subId))
250                         return
251                 }
252         }
253
254         // This can be used to remove all subscriptions db from
255         if s == "emptydb" {
256                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
257                 c.RemoveAllSubscriptionsFromSdl()
258                 return
259         }
260
261         // This is meant to cause submgr's restart in testing
262         if s == "restart" {
263                 xapp.Logger.Info("os.Exit(1) called")
264                 os.Exit(1)
265         }
266
267         xapp.Logger.Info("Unsupported rest command received %s", s)
268 }
269
270 //-------------------------------------------------------------------
271 //
272 //-------------------------------------------------------------------
273
274 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
275         params := &xapp.RMRParams{}
276         params.Mtype = trans.GetMtype()
277         params.SubId = int(subs.GetReqId().InstanceId)
278         params.Xid = ""
279         params.Meid = subs.GetMeid()
280         params.Src = ""
281         params.PayloadLen = len(trans.Payload.Buf)
282         params.Payload = trans.Payload.Buf
283         params.Mbuf = nil
284         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
285         return c.SendWithRetry(params, false, 5)
286 }
287
288 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
289
290         params := &xapp.RMRParams{}
291         params.Mtype = trans.GetMtype()
292         params.SubId = int(subs.GetReqId().InstanceId)
293         params.Xid = trans.GetXid()
294         params.Meid = trans.GetMeid()
295         params.Src = ""
296         params.PayloadLen = len(trans.Payload.Buf)
297         params.Payload = trans.Payload.Buf
298         params.Mbuf = nil
299         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
300         return c.SendWithRetry(params, false, 5)
301 }
302
303 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
304         if c.RMRClient == nil {
305                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
306                 xapp.Logger.Error("%s", err.Error())
307                 return
308         }
309         c.CntRecvMsg++
310
311         defer c.RMRClient.Free(msg.Mbuf)
312
313         // xapp-frame might use direct access to c buffer and
314         // when msg.Mbuf is freed, someone might take it into use
315         // and payload data might be invalid inside message handle function
316         //
317         // subscriptions won't load system a lot so there is no
318         // real performance hit by cloning buffer into new go byte slice
319         cPay := append(msg.Payload[:0:0], msg.Payload...)
320         msg.Payload = cPay
321         msg.PayloadLen = len(cPay)
322
323         switch msg.Mtype {
324         case xapp.RIC_SUB_REQ:
325                 go c.handleXAPPSubscriptionRequest(msg)
326         case xapp.RIC_SUB_RESP:
327                 go c.handleE2TSubscriptionResponse(msg)
328         case xapp.RIC_SUB_FAILURE:
329                 go c.handleE2TSubscriptionFailure(msg)
330         case xapp.RIC_SUB_DEL_REQ:
331                 go c.handleXAPPSubscriptionDeleteRequest(msg)
332         case xapp.RIC_SUB_DEL_RESP:
333                 go c.handleE2TSubscriptionDeleteResponse(msg)
334         case xapp.RIC_SUB_DEL_FAILURE:
335                 go c.handleE2TSubscriptionDeleteFailure(msg)
336         default:
337                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
338         }
339         return
340 }
341
342 //-------------------------------------------------------------------
343 // handle from XAPP Subscription Request
344 //------------------------------------------------------------------
345 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
346         xapp.Logger.Info("MSG from XAPP: %s", params.String())
347         c.UpdateCounter(cSubReqFromXapp)
348
349         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
350         if err != nil {
351                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
352                 return
353         }
354
355         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
356         if trans == nil {
357                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
358                 return
359         }
360         defer trans.Release()
361
362         err = c.tracker.Track(trans)
363         if err != nil {
364                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365                 return
366         }
367
368         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
370         if err != nil {
371                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
372                 return
373         }
374
375         //
376         // Wake subs request
377         //
378         go c.handleSubscriptionCreate(subs, trans)
379         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
380         err = nil
381         if event != nil {
382                 switch themsg := event.(type) {
383                 case *e2ap.E2APSubscriptionResponse:
384                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
385                         if err == nil {
386                                 trans.Release()
387                                 c.UpdateCounter(cSubRespToXapp)
388                                 c.rmrSendToXapp("", subs, trans)
389                                 return
390                         }
391                 case *e2ap.E2APSubscriptionFailure:
392                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
393                         if err == nil {
394                                 c.UpdateCounter(cSubFailToXapp)
395                                 c.rmrSendToXapp("", subs, trans)
396                         }
397                 default:
398                         break
399                 }
400         }
401         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
402         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
403 }
404
405 //-------------------------------------------------------------------
406 // handle from XAPP Subscription Delete Request
407 //------------------------------------------------------------------
408 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
409         xapp.Logger.Info("MSG from XAPP: %s", params.String())
410         c.UpdateCounter(cSubDelReqFromXapp)
411
412         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
413         if err != nil {
414                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
415                 return
416         }
417
418         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
419         if trans == nil {
420                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
421                 return
422         }
423         defer trans.Release()
424
425         err = c.tracker.Track(trans)
426         if err != nil {
427                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
428                 return
429         }
430
431         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
432         if err != nil {
433                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
434                 return
435         }
436
437         //
438         // Wake subs delete
439         //
440         go c.handleSubscriptionDelete(subs, trans)
441         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
442
443         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
444
445         if subs.NoRespToXapp == true {
446                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
447                 return
448         }
449
450         // Whatever is received success, fail or timeout, send successful delete response
451         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
452         subDelRespMsg.RequestId = subs.GetReqId().RequestId
453         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
454         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
455         if err == nil {
456                 c.UpdateCounter(cSubDelRespToXapp)
457                 c.rmrSendToXapp("", subs, trans)
458         }
459
460         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
461         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
462 }
463
464 //-------------------------------------------------------------------
465 // SUBS CREATE Handling
466 //-------------------------------------------------------------------
467 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
468
469         var removeSubscriptionFromDb bool = false
470         trans := c.tracker.NewSubsTransaction(subs)
471         subs.WaitTransactionTurn(trans)
472         defer subs.ReleaseTransactionTurn(trans)
473         defer trans.Release()
474
475         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
476
477         subRfMsg, valid := subs.GetCachedResponse()
478         if subRfMsg == nil && valid == true {
479                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
480                 switch event.(type) {
481                 case *e2ap.E2APSubscriptionResponse:
482                         subRfMsg, valid = subs.SetCachedResponse(event, true)
483                         subs.SubRespRcvd = true
484                 case *e2ap.E2APSubscriptionFailure:
485                         removeSubscriptionFromDb = true
486                         subRfMsg, valid = subs.SetCachedResponse(event, false)
487                         xapp.Logger.Info("SUBS-SubReq: internal delete  due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
488                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
489                 case *SubmgrRestartTestEvent:
490                         // This simulates that no response has been received and after restart subscriptions are restored from db
491                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
492                         return
493                 default:
494                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
495                         removeSubscriptionFromDb = true
496                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
497                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
498                 }
499                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
500         } else {
501                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
502         }
503
504         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
505         if valid == false {
506                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
507         }
508
509         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
510         parentTrans.SendEvent(subRfMsg, 0)
511 }
512
513 //-------------------------------------------------------------------
514 // SUBS DELETE Handling
515 //-------------------------------------------------------------------
516
517 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
518
519         trans := c.tracker.NewSubsTransaction(subs)
520         subs.WaitTransactionTurn(trans)
521         defer subs.ReleaseTransactionTurn(trans)
522         defer trans.Release()
523
524         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
525
526         subs.mutex.Lock()
527
528         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
529                 subs.valid = false
530                 subs.mutex.Unlock()
531                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
532         } else {
533                 subs.mutex.Unlock()
534         }
535         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
536         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
537         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
538         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
539         c.registry.UpdateSubscriptionToDb(subs, c)
540         parentTrans.SendEvent(nil, 0)
541 }
542
543 //-------------------------------------------------------------------
544 // send to E2T Subscription Request
545 //-------------------------------------------------------------------
546 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
547         var err error
548         var event interface{} = nil
549         var timedOut bool = false
550
551         subReqMsg := subs.SubReqMsg
552         subReqMsg.RequestId = subs.GetReqId().RequestId
553         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
554         if err != nil {
555                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
556                 return event
557         }
558
559         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
560         c.WriteSubscriptionToDb(subs)
561         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
562                 desc := fmt.Sprintf("(retry %d)", retries)
563                 if retries == 0 {
564                         c.UpdateCounter(cSubReqToE2)
565                 } else {
566                         c.UpdateCounter(cSubReReqToE2)
567                 }
568                 c.rmrSendToE2T(desc, subs, trans)
569                 if subs.DoNotWaitSubResp == false {
570                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
571                         if timedOut {
572                                 c.UpdateCounter(cSubReqTimerExpiry)
573                                 continue
574                         }
575                 } else {
576                         // Simulating case where subscrition request has been sent but response has not been received before restart
577                         event = &SubmgrRestartTestEvent{}
578                 }
579                 break
580         }
581         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
582         return event
583 }
584
585 //-------------------------------------------------------------------
586 // send to E2T Subscription Delete Request
587 //-------------------------------------------------------------------
588
589 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
590         var err error
591         var event interface{}
592         var timedOut bool
593
594         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
595         subDelReqMsg.RequestId = subs.GetReqId().RequestId
596         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
597         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
598         if err != nil {
599                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
600                 return event
601         }
602
603         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
604                 desc := fmt.Sprintf("(retry %d)", retries)
605                 if retries == 0 {
606                         c.UpdateCounter(cSubDelReqToE2)
607                 } else {
608                         c.UpdateCounter(cSubDelReReqToE2)
609                 }
610                 c.rmrSendToE2T(desc, subs, trans)
611                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
612                 if timedOut {
613                         c.UpdateCounter(cSubDelReqTimerExpiry)
614                         continue
615                 }
616                 break
617         }
618         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
619         return event
620 }
621
622 //-------------------------------------------------------------------
623 // handle from E2T Subscription Response
624 //-------------------------------------------------------------------
625 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
626         xapp.Logger.Info("MSG from E2T: %s", params.String())
627         c.UpdateCounter(cSubRespFromE2)
628         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
629         if err != nil {
630                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
631                 return
632         }
633         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
634         if err != nil {
635                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
636                 return
637         }
638         trans := subs.GetTransaction()
639         if trans == nil {
640                 err = fmt.Errorf("Ongoing transaction not found")
641                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
642                 return
643         }
644         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
645         if sendOk == false {
646                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
647                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
648         }
649         return
650 }
651
652 //-------------------------------------------------------------------
653 // handle from E2T Subscription Failure
654 //-------------------------------------------------------------------
655 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
656         xapp.Logger.Info("MSG from E2T: %s", params.String())
657         c.UpdateCounter(cSubFailFromE2)
658         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
659         if err != nil {
660                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
661                 return
662         }
663         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
664         if err != nil {
665                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
666                 return
667         }
668         trans := subs.GetTransaction()
669         if trans == nil {
670                 err = fmt.Errorf("Ongoing transaction not found")
671                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
672                 return
673         }
674         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
675         if sendOk == false {
676                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
677                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
678         }
679         return
680 }
681
682 //-------------------------------------------------------------------
683 // handle from E2T Subscription Delete Response
684 //-------------------------------------------------------------------
685 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
686         xapp.Logger.Info("MSG from E2T: %s", params.String())
687         c.UpdateCounter(cSubDelRespFromE2)
688         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
689         if err != nil {
690                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
691                 return
692         }
693         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
694         if err != nil {
695                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
696                 return
697         }
698         trans := subs.GetTransaction()
699         if trans == nil {
700                 err = fmt.Errorf("Ongoing transaction not found")
701                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
702                 return
703         }
704         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
705         if sendOk == false {
706                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
707                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
708         }
709         return
710 }
711
712 //-------------------------------------------------------------------
713 // handle from E2T Subscription Delete Failure
714 //-------------------------------------------------------------------
715 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
716         xapp.Logger.Info("MSG from E2T: %s", params.String())
717         c.UpdateCounter(cSubDelFailFromE2)
718         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
719         if err != nil {
720                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
721                 return
722         }
723         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
724         if err != nil {
725                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
726                 return
727         }
728         trans := subs.GetTransaction()
729         if trans == nil {
730                 err = fmt.Errorf("Ongoing transaction not found")
731                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
732                 return
733         }
734         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
735         if sendOk == false {
736                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
737                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
738         }
739         return
740 }
741
742 //-------------------------------------------------------------------
743 //
744 //-------------------------------------------------------------------
745 func typeofSubsMessage(v interface{}) string {
746         if v == nil {
747                 return "NIL"
748         }
749         switch v.(type) {
750         case *e2ap.E2APSubscriptionRequest:
751                 return "SubReq"
752         case *e2ap.E2APSubscriptionResponse:
753                 return "SubResp"
754         case *e2ap.E2APSubscriptionFailure:
755                 return "SubFail"
756         case *e2ap.E2APSubscriptionDeleteRequest:
757                 return "SubDelReq"
758         case *e2ap.E2APSubscriptionDeleteResponse:
759                 return "SubDelResp"
760         case *e2ap.E2APSubscriptionDeleteFailure:
761                 return "SubDelFail"
762         default:
763                 return "Unknown"
764         }
765 }
766
767 //-------------------------------------------------------------------
768 //
769 //-------------------------------------------------------------------
770 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
771         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
772         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
773         if err != nil {
774                 xapp.Logger.Error("%v", err)
775         }
776 }
777
778 //-------------------------------------------------------------------
779 //
780 //-------------------------------------------------------------------
781 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
782
783         if removeSubscriptionFromDb == true {
784                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
785                 c.RemoveSubscriptionFromDb(subs)
786         } else {
787                 // Update is needed for successful response and merge case here
788                 if subs.RetryFromXapp == false {
789                         c.WriteSubscriptionToDb(subs)
790                 }
791         }
792         subs.RetryFromXapp = false
793 }
794
795 //-------------------------------------------------------------------
796 //
797 //-------------------------------------------------------------------
798 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
799         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
800         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
801         if err != nil {
802                 xapp.Logger.Error("%v", err)
803         }
804 }
805
806 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
807
808         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
809
810         // Send delete for every endpoint in the subscription
811         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
812         subDelReqMsg.RequestId = subs.GetReqId().RequestId
813         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
814         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
815         if err != nil {
816                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
817                 return
818         }
819         for _, endPoint := range subs.EpList.Endpoints {
820                 params := &xapp.RMRParams{}
821                 params.Mtype = mType
822                 params.SubId = int(subs.GetReqId().InstanceId)
823                 params.Xid = ""
824                 params.Meid = subs.Meid
825                 params.Src = endPoint.String()
826                 params.PayloadLen = len(payload.Buf)
827                 params.Payload = payload.Buf
828                 params.Mbuf = nil
829
830                 if params == nil {
831                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
832                         return
833                 }
834
835                 subs.DeleteFromDb = true
836                 c.handleXAPPSubscriptionDeleteRequest(params)
837         }
838 }