933df7f9986b23c6a9f36902f380c55e65d7fa45
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "net/http"
25         "os"
26         "strconv"
27         "strings"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         httptransport "github.com/go-openapi/runtime/client"
35         "github.com/go-openapi/strfmt"
36         "github.com/gorilla/mux"
37         "github.com/spf13/viper"
38 )
39
40 //-----------------------------------------------------------------------------
41 //
42 //-----------------------------------------------------------------------------
43
44 func idstring(err error, entries ...fmt.Stringer) string {
45         var retval string = ""
46         var filler string = ""
47         for _, entry := range entries {
48                 retval += filler + entry.String()
49                 filler = " "
50         }
51         if err != nil {
52                 retval += filler + "err(" + err.Error() + ")"
53                 filler = " "
54
55         }
56         return retval
57 }
58
59 //-----------------------------------------------------------------------------
60 //
61 //-----------------------------------------------------------------------------
62
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64    // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
69
70 type Control struct {
71         *xapp.RMRClient
72         e2ap     *E2ap
73         registry *Registry
74         tracker  *Tracker
75         db       Sdlnterface
76         //subscriber *xapp.Subscriber
77         CntRecvMsg    uint64
78         ResetTestFlag bool
79 }
80
81 type RMRMeid struct {
82         PlmnID  string
83         EnbID   string
84         RanName string
85 }
86
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
89
90 func init() {
91         xapp.Logger.Info("SUBMGR")
92         viper.AutomaticEnv()
93         viper.SetEnvPrefix("submgr")
94         viper.AllowEmptyEnv(true)
95 }
96
97 func NewControl() *Control {
98
99         ReadConfigParameters()
100         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101         rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102
103         registry := new(Registry)
104         registry.Initialize()
105         registry.rtmgrClient = &rtmgrClient
106
107         tracker := new(Tracker)
108         tracker.Init()
109
110         //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
111
112         c := &Control{e2ap: new(E2ap),
113                 registry: registry,
114                 tracker:  tracker,
115                 db:       CreateSdl(),
116                 //subscriber: subscriber,
117         }
118
119         // Register REST handler for testing support
120         xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
121
122         go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123         //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
124
125         if readSubsFromDb == "false" {
126                 return c
127         }
128
129         // Read subscriptions from db
130         xapp.Logger.Info("Reading subscriptions from db")
131         subIds, register, err := c.ReadAllSubscriptionsFromSdl()
132         if err != nil {
133                 xapp.Logger.Error("%v", err)
134         } else {
135                 c.registry.subIds = subIds
136                 c.registry.register = register
137                 c.HandleUncompletedSubscriptions(register)
138         }
139         return c
140 }
141
142 //-------------------------------------------------------------------
143 //
144 //-------------------------------------------------------------------
145 func ReadConfigParameters() {
146
147         // viper.GetDuration returns nanoseconds
148         e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
149         if e2tSubReqTimeout == 0 {
150                 e2tSubReqTimeout = 2000 * 1000000
151         }
152         xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
153         e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
154         if e2tSubDelReqTime == 0 {
155                 e2tSubDelReqTime = 2000 * 1000000
156         }
157         xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
158         e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
159         if e2tRecvMsgTimeout == 0 {
160                 e2tRecvMsgTimeout = 2000 * 1000000
161         }
162         xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
163         e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
164         if e2tMaxSubReqTryCount == 0 {
165                 e2tMaxSubReqTryCount = 1
166         }
167         xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
168         e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
169         if e2tMaxSubDelReqTryCount == 0 {
170                 e2tMaxSubDelReqTryCount = 1
171         }
172         xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
173
174         readSubsFromDb = viper.GetString("controls.readSubsFromDb")
175         if readSubsFromDb == "" {
176                 readSubsFromDb = "true"
177         }
178         xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
179 }
180
181 //-------------------------------------------------------------------
182 //
183 //-------------------------------------------------------------------
184 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
185
186         xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
187         for subId, subs := range register {
188                 if subs.SubRespRcvd == false {
189                         subs.NoRespToXapp = true
190                         xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
191                         c.SendSubscriptionDeleteReq(subs)
192                 }
193         }
194 }
195
196 func (c *Control) ReadyCB(data interface{}) {
197         if c.RMRClient == nil {
198                 c.RMRClient = xapp.Rmr
199         }
200 }
201
202 func (c *Control) Run() {
203         xapp.SetReadyCB(c.ReadyCB, nil)
204         xapp.Run(c)
205 }
206
207 //-------------------------------------------------------------------
208 //
209 //-------------------------------------------------------------------
210 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
211         /*
212            switch p := params.(type) {
213            case *models.ReportParams:
214                trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
215                if trans == nil {
216                      xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
217                      return
218                }
219                defer trans.Release()
220            case *models.ControlParams:
221            case *models.PolicyParams:
222            }
223         */
224         return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
225 }
226
227 func (c *Control) SubscriptionDeleteHandler(s string) error {
228         return nil
229 }
230
231 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
232         return c.registry.QueryHandler()
233 }
234
235 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
236
237         xapp.Logger.Info("TestRestHandler() called")
238
239         pathParams := mux.Vars(r)
240         s := pathParams["testId"]
241
242         // This can be used to delete single subscription from db
243         if contains := strings.Contains(s, "deletesubid="); contains == true {
244                 var splits = strings.Split(s, "=")
245                 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246                         xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247                         c.RemoveSubscriptionFromSdl(uint32(subId))
248                         return
249                 }
250         }
251
252         // This can be used to remove all subscriptions db from
253         if s == "emptydb" {
254                 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255                 c.RemoveAllSubscriptionsFromSdl()
256                 return
257         }
258
259         // This is meant to cause submgr's restart in testing
260         if s == "restart" {
261                 xapp.Logger.Info("os.Exit(1) called")
262                 os.Exit(1)
263         }
264
265         xapp.Logger.Info("Unsupported rest command received %s", s)
266 }
267
268 //-------------------------------------------------------------------
269 //
270 //-------------------------------------------------------------------
271
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273         params := &xapp.RMRParams{}
274         params.Mtype = trans.GetMtype()
275         params.SubId = int(subs.GetReqId().InstanceId)
276         params.Xid = ""
277         params.Meid = subs.GetMeid()
278         params.Src = ""
279         params.PayloadLen = len(trans.Payload.Buf)
280         params.Payload = trans.Payload.Buf
281         params.Mbuf = nil
282         xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283         return c.SendWithRetry(params, false, 5)
284 }
285
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
287
288         params := &xapp.RMRParams{}
289         params.Mtype = trans.GetMtype()
290         params.SubId = int(subs.GetReqId().InstanceId)
291         params.Xid = trans.GetXid()
292         params.Meid = trans.GetMeid()
293         params.Src = ""
294         params.PayloadLen = len(trans.Payload.Buf)
295         params.Payload = trans.Payload.Buf
296         params.Mbuf = nil
297         xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298         return c.SendWithRetry(params, false, 5)
299 }
300
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302         if c.RMRClient == nil {
303                 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304                 xapp.Logger.Error("%s", err.Error())
305                 return
306         }
307         c.CntRecvMsg++
308
309         defer c.RMRClient.Free(msg.Mbuf)
310
311         // xapp-frame might use direct access to c buffer and
312         // when msg.Mbuf is freed, someone might take it into use
313         // and payload data might be invalid inside message handle function
314         //
315         // subscriptions won't load system a lot so there is no
316         // real performance hit by cloning buffer into new go byte slice
317         cPay := append(msg.Payload[:0:0], msg.Payload...)
318         msg.Payload = cPay
319         msg.PayloadLen = len(cPay)
320
321         switch msg.Mtype {
322         case xapp.RIC_SUB_REQ:
323                 go c.handleXAPPSubscriptionRequest(msg)
324         case xapp.RIC_SUB_RESP:
325                 go c.handleE2TSubscriptionResponse(msg)
326         case xapp.RIC_SUB_FAILURE:
327                 go c.handleE2TSubscriptionFailure(msg)
328         case xapp.RIC_SUB_DEL_REQ:
329                 go c.handleXAPPSubscriptionDeleteRequest(msg)
330         case xapp.RIC_SUB_DEL_RESP:
331                 go c.handleE2TSubscriptionDeleteResponse(msg)
332         case xapp.RIC_SUB_DEL_FAILURE:
333                 go c.handleE2TSubscriptionDeleteFailure(msg)
334         default:
335                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
336         }
337         return
338 }
339
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344         xapp.Logger.Info("MSG from XAPP: %s", params.String())
345
346         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
347         if err != nil {
348                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
349                 return
350         }
351
352         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
353         if trans == nil {
354                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
355                 return
356         }
357         defer trans.Release()
358
359         err = c.tracker.Track(trans)
360         if err != nil {
361                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
362                 return
363         }
364
365         //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366         subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
367         if err != nil {
368                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
369                 return
370         }
371
372         //
373         // Wake subs request
374         //
375         go c.handleSubscriptionCreate(subs, trans)
376         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
377         err = nil
378         if event != nil {
379                 switch themsg := event.(type) {
380                 case *e2ap.E2APSubscriptionResponse:
381                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
382                         if err == nil {
383                                 trans.Release()
384                                 c.rmrSendToXapp("", subs, trans)
385                                 return
386                         }
387                 case *e2ap.E2APSubscriptionFailure:
388                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
389                         if err == nil {
390                                 c.rmrSendToXapp("", subs, trans)
391                         }
392                 default:
393                         break
394                 }
395         }
396         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
397         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
398 }
399
400 //-------------------------------------------------------------------
401 // handle from XAPP Subscription Delete Request
402 //------------------------------------------------------------------
403 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
404         xapp.Logger.Info("MSG from XAPP: %s", params.String())
405
406         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
407         if err != nil {
408                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
409                 return
410         }
411
412         trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
413         if trans == nil {
414                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
415                 return
416         }
417         defer trans.Release()
418
419         err = c.tracker.Track(trans)
420         if err != nil {
421                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
422                 return
423         }
424
425         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
426         if err != nil {
427                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
428                 return
429         }
430
431         //
432         // Wake subs delete
433         //
434         go c.handleSubscriptionDelete(subs, trans)
435         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
436
437         xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
438
439         if subs.NoRespToXapp == true {
440                 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
441                 return
442         }
443
444         // Whatever is received success, fail or timeout, send successful delete response
445         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
446         subDelRespMsg.RequestId = subs.GetReqId().RequestId
447         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
448         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
449         if err == nil {
450                 c.rmrSendToXapp("", subs, trans)
451         }
452
453         //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
454         //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
455 }
456
457 //-------------------------------------------------------------------
458 // SUBS CREATE Handling
459 //-------------------------------------------------------------------
460 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
461
462         var removeSubscriptionFromDb bool = false
463         trans := c.tracker.NewSubsTransaction(subs)
464         subs.WaitTransactionTurn(trans)
465         defer subs.ReleaseTransactionTurn(trans)
466         defer trans.Release()
467
468         xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
469
470         subRfMsg, valid := subs.GetCachedResponse()
471         if subRfMsg == nil && valid == true {
472                 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
473                 switch event.(type) {
474                 case *e2ap.E2APSubscriptionResponse:
475                         subRfMsg, valid = subs.SetCachedResponse(event, true)
476                         subs.SubRespRcvd = true
477                 case *e2ap.E2APSubscriptionFailure:
478                         removeSubscriptionFromDb = true
479                         subRfMsg, valid = subs.SetCachedResponse(event, false)
480                         xapp.Logger.Info("SUBS-SubReq: internal delete  due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
481                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
482                 case *SubmgrRestartTestEvent:
483                         // This simulates that no response has been received and after restart subscriptions are restored from db
484                         xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
485                         return
486                 default:
487                         xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
488                         removeSubscriptionFromDb = true
489                         subRfMsg, valid = subs.SetCachedResponse(nil, false)
490                         c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
491                 }
492                 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
493         } else {
494                 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
495         }
496
497         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
498         if valid == false {
499                 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
500         }
501
502         c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
503         parentTrans.SendEvent(subRfMsg, 0)
504 }
505
506 //-------------------------------------------------------------------
507 // SUBS DELETE Handling
508 //-------------------------------------------------------------------
509
510 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
511
512         trans := c.tracker.NewSubsTransaction(subs)
513         subs.WaitTransactionTurn(trans)
514         defer subs.ReleaseTransactionTurn(trans)
515         defer trans.Release()
516
517         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
518
519         subs.mutex.Lock()
520         if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
521                 subs.valid = false
522                 subs.mutex.Unlock()
523                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
524         } else {
525                 subs.mutex.Unlock()
526         }
527         //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
528         //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
529         //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
530         c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
531         c.registry.UpdateSubscriptionToDb(subs, c)
532         parentTrans.SendEvent(nil, 0)
533 }
534
535 //-------------------------------------------------------------------
536 // send to E2T Subscription Request
537 //-------------------------------------------------------------------
538 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
539         var err error
540         var event interface{} = nil
541         var timedOut bool = false
542
543         subReqMsg := subs.SubReqMsg
544         subReqMsg.RequestId = subs.GetReqId().RequestId
545         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
546         if err != nil {
547                 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
548                 return event
549         }
550
551         // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
552         c.WriteSubscriptionToDb(subs)
553         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
554                 desc := fmt.Sprintf("(retry %d)", retries)
555                 c.rmrSendToE2T(desc, subs, trans)
556                 if subs.DoNotWaitSubResp == false {
557                         event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
558                         if timedOut {
559                                 continue
560                         }
561                 } else {
562                         // Simulating case where subscrition request has been sent but response has not been received before restart
563                         event = &SubmgrRestartTestEvent{}
564                 }
565                 break
566         }
567         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
568         return event
569 }
570
571 //-------------------------------------------------------------------
572 // send to E2T Subscription Delete Request
573 //-------------------------------------------------------------------
574
575 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
576         var err error
577         var event interface{}
578         var timedOut bool
579
580         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
581         subDelReqMsg.RequestId = subs.GetReqId().RequestId
582         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
583         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
584         if err != nil {
585                 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
586                 return event
587         }
588
589         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
590                 desc := fmt.Sprintf("(retry %d)", retries)
591                 c.rmrSendToE2T(desc, subs, trans)
592                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
593                 if timedOut {
594                         continue
595                 }
596                 break
597         }
598         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
599         return event
600 }
601
602 //-------------------------------------------------------------------
603 // handle from E2T Subscription Reponse
604 //-------------------------------------------------------------------
605 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
606         xapp.Logger.Info("MSG from E2T: %s", params.String())
607         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
608         if err != nil {
609                 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
610                 return
611         }
612         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
613         if err != nil {
614                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
615                 return
616         }
617         trans := subs.GetTransaction()
618         if trans == nil {
619                 err = fmt.Errorf("Ongoing transaction not found")
620                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
621                 return
622         }
623         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
624         if sendOk == false {
625                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
626                 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
627         }
628         return
629 }
630
631 //-------------------------------------------------------------------
632 // handle from E2T Subscription Failure
633 //-------------------------------------------------------------------
634 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
635         xapp.Logger.Info("MSG from E2T: %s", params.String())
636         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
637         if err != nil {
638                 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
639                 return
640         }
641         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
642         if err != nil {
643                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
644                 return
645         }
646         trans := subs.GetTransaction()
647         if trans == nil {
648                 err = fmt.Errorf("Ongoing transaction not found")
649                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
650                 return
651         }
652         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
653         if sendOk == false {
654                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
655                 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
656         }
657         return
658 }
659
660 //-------------------------------------------------------------------
661 // handle from E2T Subscription Delete Response
662 //-------------------------------------------------------------------
663 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
664         xapp.Logger.Info("MSG from E2T: %s", params.String())
665         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
666         if err != nil {
667                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
668                 return
669         }
670         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
671         if err != nil {
672                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
673                 return
674         }
675         trans := subs.GetTransaction()
676         if trans == nil {
677                 err = fmt.Errorf("Ongoing transaction not found")
678                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
679                 return
680         }
681         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
682         if sendOk == false {
683                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
684                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
685         }
686         return
687 }
688
689 //-------------------------------------------------------------------
690 // handle from E2T Subscription Delete Failure
691 //-------------------------------------------------------------------
692 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
693         xapp.Logger.Info("MSG from E2T: %s", params.String())
694         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
695         if err != nil {
696                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
697                 return
698         }
699         subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
700         if err != nil {
701                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
702                 return
703         }
704         trans := subs.GetTransaction()
705         if trans == nil {
706                 err = fmt.Errorf("Ongoing transaction not found")
707                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
708                 return
709         }
710         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
711         if sendOk == false {
712                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
713                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
714         }
715         return
716 }
717
718 //-------------------------------------------------------------------
719 //
720 //-------------------------------------------------------------------
721 func typeofSubsMessage(v interface{}) string {
722         if v == nil {
723                 return "NIL"
724         }
725         switch v.(type) {
726         case *e2ap.E2APSubscriptionRequest:
727                 return "SubReq"
728         case *e2ap.E2APSubscriptionResponse:
729                 return "SubResp"
730         case *e2ap.E2APSubscriptionFailure:
731                 return "SubFail"
732         case *e2ap.E2APSubscriptionDeleteRequest:
733                 return "SubDelReq"
734         case *e2ap.E2APSubscriptionDeleteResponse:
735                 return "SubDelResp"
736         case *e2ap.E2APSubscriptionDeleteFailure:
737                 return "SubDelFail"
738         default:
739                 return "Unknown"
740         }
741 }
742
743 //-------------------------------------------------------------------
744 //
745 //-------------------------------------------------------------------
746 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
747         xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
748         err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
749         if err != nil {
750                 xapp.Logger.Error("%v", err)
751         }
752 }
753
754 //-------------------------------------------------------------------
755 //
756 //-------------------------------------------------------------------
757 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
758
759         if removeSubscriptionFromDb == true {
760                 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
761                 c.RemoveSubscriptionFromDb(subs)
762         } else {
763                 // Update is needed for successful response and merge case here
764                 if subs.RetryFromXapp == false {
765                         c.WriteSubscriptionToDb(subs)
766                 }
767         }
768         subs.RetryFromXapp = false
769 }
770
771 //-------------------------------------------------------------------
772 //
773 //-------------------------------------------------------------------
774 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
775         xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
776         err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
777         if err != nil {
778                 xapp.Logger.Error("%v", err)
779         }
780 }
781
782 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
783
784         xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
785
786         // Send delete for every endpoint in the subscription
787         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
788         subDelReqMsg.RequestId = subs.GetReqId().RequestId
789         subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
790         mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
791         if err != nil {
792                 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
793                 return
794         }
795         for _, endPoint := range subs.EpList.Endpoints {
796                 params := &xapp.RMRParams{}
797                 params.Mtype = mType
798                 params.SubId = int(subs.GetReqId().InstanceId)
799                 params.Xid = ""
800                 params.Meid = subs.Meid
801                 params.Src = endPoint.String()
802                 params.PayloadLen = len(payload.Buf)
803                 params.Payload = payload.Buf
804                 params.Mbuf = nil
805
806                 if params == nil {
807                         xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
808                         return
809                 }
810
811                 subs.DeleteFromDb = true
812                 c.handleXAPPSubscriptionDeleteRequest(params)
813         }
814 }