RICPLT-2801, RICPLT-2802
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "errors"
24         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
25         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/strfmt"
29         "github.com/spf13/viper"
30         "math/rand"
31         "sync"
32         "time"
33 )
34
35 var subReqTime time.Duration = 5 * time.Second
36 var subDelReqTime time.Duration = 5 * time.Second
37 var maxSubReqTryCount uint64 = 2    // Initial try + retry
38 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
39
40 type Control struct {
41         e2ap         *E2ap
42         registry     *Registry
43         rtmgrClient  *RtmgrClient
44         tracker      *Tracker
45         timerMap     *TimerMap
46         rmrSendMutex sync.Mutex
47 }
48
49 type RMRMeid struct {
50         PlmnID  string
51         EnbID   string
52         RanName string
53 }
54
55 var seedSN uint16
56
57 const (
58         CREATE Action = 0
59         MERGE  Action = 1
60         NONE   Action = 2
61         DELETE Action = 3
62 )
63
64 func init() {
65         xapp.Logger.Info("SUBMGR")
66         viper.AutomaticEnv()
67         viper.SetEnvPrefix("submgr")
68         viper.AllowEmptyEnv(true)
69         seedSN = uint16(viper.GetInt("seed_sn"))
70         if seedSN == 0 {
71                 rand.Seed(time.Now().UnixNano())
72                 seedSN = uint16(rand.Intn(65535))
73         }
74         if seedSN > 65535 {
75                 seedSN = 0
76         }
77         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
78 }
79
80 func NewControl() *Control {
81         registry := new(Registry)
82         registry.Initialize(seedSN)
83
84         tracker := new(Tracker)
85         tracker.Init()
86
87         timerMap := new(TimerMap)
88         timerMap.Init()
89
90         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
91         client := rtmgrclient.New(transport, strfmt.Default)
92         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
93         deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
94         rtmgrClient := RtmgrClient{client, handle, deleteHandle}
95
96         return &Control{e2ap: new(E2ap),
97                 registry:    registry,
98                 rtmgrClient: &rtmgrClient,
99                 tracker:     tracker,
100                 timerMap:    timerMap,
101         }
102 }
103
104 func (c *Control) Run() {
105         xapp.Run(c)
106 }
107
108 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
109         status := false
110         i := 1
111         for ; i <= 10 && status == false; i++ {
112                 c.rmrSendMutex.Lock()
113                 status = xapp.Rmr.Send(params, false)
114                 c.rmrSendMutex.Unlock()
115                 if status == false {
116                         xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
117                         time.Sleep(500 * time.Millisecond)
118                 }
119         }
120         if status == false {
121                 err = errors.New("rmr.Send() failed")
122                 xapp.Rmr.Free(params.Mbuf)
123         }
124         return
125 }
126
127 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
128         c.rmrSend(params)
129         return
130 }
131
132 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
133         switch msg.Mtype {
134         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
135                 go c.handleSubscriptionRequest(msg)
136         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
137                 go c.handleSubscriptionResponse(msg)
138         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
139                 go c.handleSubscriptionFailure(msg)
140         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
141                 go c.handleSubscriptionDeleteRequest(msg)
142         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
143                 go c.handleSubscriptionDeleteResponse(msg)
144         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
145                 go c.handleSubscriptionDeleteFailure(msg)
146         default:
147                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
148         }
149         return nil
150 }
151
152 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
153         xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
154         xapp.Rmr.Free(params.Mbuf)
155         params.Mbuf = nil
156
157         /* Reserve a sequence number and set it in the payload */
158         newSubId, isIdValid := c.registry.ReserveSequenceNumber()
159         if isIdValid != true {
160                 xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
161                 c.registry.releaseSequenceNumber(newSubId)
162                 return
163         }
164
165         params.SubId = int(newSubId)
166         err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
167         if err != nil {
168                 xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
169                 c.registry.releaseSequenceNumber(newSubId)
170                 return
171         }
172
173         srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
174         if err != nil {
175                 xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
176                 c.registry.releaseSequenceNumber(newSubId)
177                 return
178         }
179
180         // Create transatcion record for every subscription request
181         var forwardRespToXapp bool = true
182         var responseReceived bool = false
183         transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
184         if err != nil {
185                 xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
186                 c.registry.releaseSequenceNumber(newSubId)
187                 return
188         }
189
190         // Update routing manager about the new subscription
191         subRouteAction := transaction.SubRouteInfo()
192         xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
193
194         err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
195         if err != nil {
196                 xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
197                 c.registry.releaseSequenceNumber(newSubId)
198                 return
199         }
200
201         // Setting new subscription ID in the RMR header
202         xapp.Logger.Info("SubReq: Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid)
203         err = c.rmrSend(params)
204         if err != nil {
205                 xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
206         }
207         c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
208         xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable)
209         return
210 }
211
212 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
213         xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
214         xapp.Rmr.Free(params.Mbuf)
215         params.Mbuf = nil
216
217         payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
218         if err != nil {
219                 xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
220                 return
221         }
222         xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum)
223
224         if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
225                 xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
226                 return
227         }
228
229         c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
230
231         transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, CREATE)
232         if err != nil {
233                 xapp.Logger.Info("SubResp: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
234                 return
235         }
236
237         if responseReceived == true {
238                 // Subscription timer already received
239                 return
240         }
241         xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v.", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port)
242
243         c.registry.setSubscriptionToConfirmed(payloadSeqNum)
244
245         params.SubId = int(payloadSeqNum)
246         params.Xid = transaction.OrigParams.Xid
247
248         xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid)
249         err = c.rmrReplyToSender(params)
250         if err != nil {
251                 xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
252         }
253
254         xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port)
255         _, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
256         if err != nil {
257                 xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
258                 return
259         }
260         return
261 }
262
263 func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
264         xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
265         xapp.Rmr.Free(params.Mbuf)
266         params.Mbuf = nil
267
268         payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
269         if err != nil {
270                 xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
271                 return
272         }
273         xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum)
274
275         c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
276
277         transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, CREATE)
278         if err != nil {
279                 xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
280                 return
281         }
282
283         if responseReceived == true {
284                 // Subscription timer already received
285                 return
286         }
287         xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port)
288
289         time.Sleep(3 * time.Second)
290
291         xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
292         subRouteAction := transaction.SubRouteInfo()
293         err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
294         if err != nil {
295                 xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
296         }
297
298         xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
299         if c.registry.releaseSequenceNumber(payloadSeqNum) {
300                 _, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
301                 if err != nil {
302                         xapp.Logger.Error("SubFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
303                         return
304                 }
305         } else {
306                 xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
307                 return
308         }
309         return
310 }
311
312 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
313         subId := uint16(nbrId)
314         xapp.Logger.Info("handleSubTimer: SubReq timer expired. subId: %v,  tryCount: %v", subId, tryCount)
315
316         transaction, responseReceived, err := c.tracker.CheckResponseReceived(subId, CREATE)
317         if err != nil {
318                 xapp.Logger.Info("handleSubTimer: Dropping this timer action. Err: %v SubId: %v", err, subId)
319                 return
320         }
321
322         if responseReceived == true {
323                 // Subscription Response or Failure already received
324                 return
325         }
326
327         if tryCount < maxSubReqTryCount {
328                 xapp.Logger.Info("handleSubTimer: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
329                 // Set possible to handle new response for the subId
330                 err = c.tracker.RetryTransaction(subId, CREATE)
331                 if err != nil {
332                         xapp.Logger.Error("handleSubDelTimer: Failed to retry transaction record. Dropping timer action. Err %v, SubId: %v", err, transaction.OrigParams.SubId)
333                         return
334                 }
335
336                 err = c.rmrSend(transaction.OrigParams)
337                 if err != nil {
338                         xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid)
339                 }
340
341                 tryCount++
342                 c.timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
343                 return
344         }
345
346         var subDelReqPayload []byte
347         subDelReqPayload, err = c.e2ap.PackSubscriptionDeleteRequest(transaction.OrigParams.Payload, subId)
348         if err != nil {
349                 xapp.Logger.Error("handleSubTimer: Packing SubDelReq failed. Err: %v", err)
350                 return
351         }
352
353         // Cancel failed subscription
354         var params xapp.RMRParams
355         params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
356         params.SubId = int(subId)
357         params.Xid = transaction.OrigParams.Xid
358         params.Meid = transaction.OrigParams.Meid
359         params.Src = transaction.OrigParams.Src
360         params.PayloadLen = len(subDelReqPayload)
361         params.Payload = subDelReqPayload
362         params.Mbuf = nil
363
364         // Delete CREATE transaction
365         _, err = c.tracker.completeTransaction(subId, CREATE)
366         if err != nil {
367                 xapp.Logger.Error("handleSubTimer: Failed to delete create transaction record. Dropping this timer action. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
368                 return
369         }
370
371         // Create DELETE transaction
372         var forwardRespToXapp bool = false
373         _, err = c.trackDeleteTransaction(&params, subId, forwardRespToXapp)
374         if err != nil {
375                 xapp.Logger.Error("handleSubTimer: Failed to create delete transaction record. Dropping this timer action. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
376                 return
377         }
378
379         xapp.Logger.Info("handleSubTimer: Sending SubDelReq to E2T: Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid)
380         c.rmrSend(&params)
381         if err != nil {
382                 xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
383         }
384         c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
385         return
386 }
387
388 func (act Action) String() string {
389         actions := [...]string{
390                 "CREATE",
391                 "MERGE",
392                 "NONE",
393                 "DELETE",
394         }
395
396         if act < CREATE || act > DELETE {
397                 return "Unknown"
398         }
399         return actions[act]
400 }
401
402 func (act Action) valid() bool {
403         switch act {
404         case CREATE, MERGE, DELETE:
405                 return true
406         default:
407                 return false
408         }
409 }
410
411 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
412         xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
413         xapp.Rmr.Free(params.Mbuf)
414         params.Mbuf = nil
415
416         payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
417         if err != nil {
418                 xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
419                 return
420         }
421         xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
422
423         if c.registry.IsValidSequenceNumber(payloadSeqNum) {
424                 c.registry.deleteSubscription(payloadSeqNum)
425                 var forwardRespToXapp bool = true
426                 _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp)
427                 if err != nil {
428                         xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
429                         return
430                 }
431         } else {
432                 xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
433                 return
434         }
435
436         xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
437         c.rmrSend(params)
438         if err != nil {
439                 xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
440         }
441         c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
442         return
443 }
444
445 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16, forwardRespToXapp bool) (transaction *Transaction, err error) {
446         srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
447         if err != nil {
448                 xapp.Logger.Error("Failed to split source address. Err: %s, SubId: %v, Xid: %s", err, payloadSeqNum, params.Xid)
449         }
450         var respReceived bool = false
451         transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params, respReceived, forwardRespToXapp)
452         return
453 }
454
455 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
456         xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
457         xapp.Rmr.Free(params.Mbuf)
458         params.Mbuf = nil
459
460         payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
461         if err != nil {
462                 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
463                 return
464         }
465         xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
466
467         c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
468
469         transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, DELETE)
470         if err != nil {
471                 xapp.Logger.Info("SubDelResp: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
472                 return
473         }
474
475         if responseReceived == true {
476                 // Subscription Delete timer already received
477                 return
478         }
479         xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port)
480
481         if transaction.ForwardRespToXapp == true {
482                 params.SubId = int(payloadSeqNum)
483                 params.Xid = transaction.OrigParams.Xid
484                 xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
485                 err = c.rmrReplyToSender(params)
486                 if err != nil {
487                         xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
488                 }
489
490                 time.Sleep(3 * time.Second)
491         }
492
493         xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
494         subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
495         err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
496         if err != nil {
497                 xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
498                 return
499         }
500
501         xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
502         if c.registry.releaseSequenceNumber(payloadSeqNum) {
503                 _, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
504                 if err != nil {
505                         xapp.Logger.Error("SubDelResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
506                         return
507                 }
508         } else {
509                 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
510                 return
511         }
512         return
513 }
514
515 func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
516         xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
517         xapp.Rmr.Free(params.Mbuf)
518         params.Mbuf = nil
519
520         payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
521         if err != nil {
522                 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
523                 return
524         }
525         xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
526
527         c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
528
529         transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, DELETE)
530         if err != nil {
531                 xapp.Logger.Info("SubDelFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
532                 return
533         }
534
535         if responseReceived == true {
536                 // Subscription Delete timer already received
537                 return
538         }
539         xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port)
540
541         if transaction.ForwardRespToXapp == true {
542                 var subDelRespPayload []byte
543                 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, payloadSeqNum)
544                 if err != nil {
545                         xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
546                         return
547                 }
548
549                 params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE
550                 params.SubId = int(payloadSeqNum)
551                 params.Xid = transaction.OrigParams.Xid
552                 params.Meid = transaction.OrigParams.Meid
553                 params.Src = transaction.OrigParams.Src
554                 params.PayloadLen = len(subDelRespPayload)
555                 params.Payload = subDelRespPayload
556                 params.Mbuf = nil
557                 xapp.Logger.Info("SubDelFail: Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
558                 err = c.rmrReplyToSender(params)
559                 if err != nil {
560                         xapp.Logger.Error("SubDelFail: Failed to send SubDelFail to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
561                 }
562
563                 time.Sleep(3 * time.Second)
564         }
565
566         xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
567         subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
568         err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
569         if err != nil {
570                 xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
571                 return
572         }
573
574         xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
575         if c.registry.releaseSequenceNumber(payloadSeqNum) {
576                 _, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
577                 if err != nil {
578                         xapp.Logger.Error("SubDelFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
579                         return
580                 }
581         } else {
582                 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
583                 return
584         }
585         return
586 }
587
588 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
589         subId := uint16(nbrId)
590         xapp.Logger.Info("handleSubDelTimer: SubDelReq timer expired. subId: %v, tryCount: %v", subId, tryCount)
591
592         transaction, responseReceived, err := c.tracker.CheckResponseReceived(subId, DELETE)
593         if err != nil {
594                 xapp.Logger.Info("handleSubTimer: Dropping this timer action. Err: %v SubId: %v", err, subId)
595                 return
596         }
597
598         if responseReceived == true {
599                 // Subscription Delete Response or Failure already received
600                 return
601         }
602
603         if tryCount < maxSubDelReqTryCount {
604                 xapp.Logger.Info("handleSubDelTimer: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
605                 // Set possible to handle new response for the subId
606                 err = c.tracker.RetryTransaction(subId, DELETE)
607                 if err != nil {
608                         xapp.Logger.Error("handleSubDelTimer: Failed to retry transaction record. Dropping  timer action. Err %v, SubId: %v", err, transaction.OrigParams.SubId)
609                         return
610                 }
611
612                 err = c.rmrSend(transaction.OrigParams)
613                 if err != nil {
614                         xapp.Logger.Error("handleSubDelTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid)
615                 }
616
617                 tryCount++
618                 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
619                 return
620         }
621
622         var params xapp.RMRParams
623         if transaction.ForwardRespToXapp == true {
624                 var subDelRespPayload []byte
625                 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, subId)
626                 if err != nil {
627                         xapp.Logger.Error("handleSubDelTimer: Unable to pack payload. Dropping this timer action. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subId, transaction.OrigParams.Xid, transaction.OrigParams.Payload)
628                         return
629                 }
630
631                 params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE
632                 params.SubId = int(subId)
633                 params.Meid = transaction.OrigParams.Meid
634                 params.Xid = transaction.OrigParams.Xid
635                 params.Src = transaction.OrigParams.Src
636                 params.PayloadLen = len(subDelRespPayload)
637                 params.Payload = subDelRespPayload
638                 params.Mbuf = nil
639
640                 xapp.Logger.Info("handleSubDelTimer: Sending SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
641                 err = c.rmrReplyToSender(&params)
642                 if err != nil {
643                         xapp.Logger.Error("handleSubDelTimer: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
644                 }
645
646                 time.Sleep(3 * time.Second)
647         }
648
649         xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid)
650         subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
651         err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
652         if err != nil {
653                 xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
654                 return
655         }
656
657         xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid)
658         if c.registry.releaseSequenceNumber(subId) {
659                 _, err = c.tracker.completeTransaction(subId, DELETE)
660                 if err != nil {
661                         xapp.Logger.Error("handleSubDelTimer: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
662                         return
663                 }
664         } else {
665                 xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid)
666         }
667         return
668 }