RICPLT-3014 Subs multiple rmr endpoints
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         httptransport "github.com/go-openapi/runtime/client"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "sync"
32         "time"
33 )
34
35 //-----------------------------------------------------------------------------
36 //
37 //-----------------------------------------------------------------------------
38
39 var subReqTime time.Duration = 5 * time.Second
40 var subDelReqTime time.Duration = 5 * time.Second
41 var maxSubReqTryCount uint64 = 2    // Initial try + retry
42 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
43
44 type Control struct {
45         e2ap         *E2ap
46         registry     *Registry
47         tracker      *Tracker
48         timerMap     *TimerMap
49         rmrSendMutex sync.Mutex
50         msgCounter   uint64
51 }
52
53 type RMRMeid struct {
54         PlmnID  string
55         EnbID   string
56         RanName string
57 }
58
59 const (
60         CREATE Action = 0
61         MERGE  Action = 1
62         NONE   Action = 2
63         DELETE Action = 3
64 )
65
66 func init() {
67         xapp.Logger.Info("SUBMGR")
68         viper.AutomaticEnv()
69         viper.SetEnvPrefix("submgr")
70         viper.AllowEmptyEnv(true)
71 }
72
73 func NewControl() *Control {
74
75         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
76         client := rtmgrclient.New(transport, strfmt.Default)
77         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
78         deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
79         rtmgrClient := RtmgrClient{client, handle, deleteHandle}
80
81         registry := new(Registry)
82         registry.Initialize()
83         registry.rtmgrClient = &rtmgrClient
84
85         tracker := new(Tracker)
86         tracker.Init()
87
88         timerMap := new(TimerMap)
89         timerMap.Init()
90
91         return &Control{e2ap: new(E2ap),
92                 registry:   registry,
93                 tracker:    tracker,
94                 timerMap:   timerMap,
95                 msgCounter: 0,
96         }
97 }
98
99 func (c *Control) Run() {
100         xapp.Run(c)
101 }
102
103 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
104
105         xapp.Logger.Info("%s: %s", desc, params.String())
106         status := false
107         i := 1
108         for ; i <= 10 && status == false; i++ {
109                 c.rmrSendMutex.Lock()
110                 status = xapp.Rmr.Send(params.RMRParams, false)
111                 c.rmrSendMutex.Unlock()
112                 if status == false {
113                         xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
114                         time.Sleep(500 * time.Millisecond)
115                 }
116         }
117         if status == false {
118                 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
119                 xapp.Logger.Error("%s: %s", desc, err.Error())
120                 xapp.Rmr.Free(params.Mbuf)
121         }
122         return
123 }
124
125 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
126         params := &RMRParams{&xapp.RMRParams{}}
127         params.Mtype = trans.GetMtype()
128         params.SubId = int(subs.GetSubId())
129         params.Xid = ""
130         params.Meid = subs.GetMeid()
131         params.Src = ""
132         params.PayloadLen = len(trans.Payload.Buf)
133         params.Payload = trans.Payload.Buf
134         params.Mbuf = nil
135
136         return c.rmrSendRaw(desc, params)
137 }
138
139 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
140         params := &RMRParams{&xapp.RMRParams{}}
141         params.Mtype = trans.GetMtype()
142         params.SubId = int(subs.GetSubId())
143         params.Xid = trans.GetXid()
144         params.Meid = trans.GetMeid()
145         params.Src = ""
146         params.PayloadLen = len(trans.Payload.Buf)
147         params.Payload = trans.Payload.Buf
148         params.Mbuf = nil
149
150         return c.rmrSendRaw(desc, params)
151 }
152
153 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
154         xapp.Rmr.Free(params.Mbuf)
155         params.Mbuf = nil
156         msg := &RMRParams{params}
157         c.msgCounter++
158         switch msg.Mtype {
159         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
160                 go c.handleSubscriptionRequest(msg)
161         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
162                 go c.handleSubscriptionResponse(msg)
163         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
164                 go c.handleSubscriptionFailure(msg)
165         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
166                 go c.handleSubscriptionDeleteRequest(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
168                 go c.handleSubscriptionDeleteResponse(msg)
169         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
170                 go c.handleSubscriptionDeleteFailure(msg)
171         default:
172                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
173         }
174
175         return nil
176 }
177 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
178         var retval string = ""
179         var filler string = ""
180         if trans != nil {
181                 retval += filler + trans.String()
182                 filler = " "
183         }
184         if subs != nil {
185                 retval += filler + subs.String()
186                 filler = " "
187         }
188         if err != nil {
189                 retval += filler + "err(" + err.Error() + ")"
190                 filler = " "
191         }
192         return retval
193 }
194
195 func (c *Control) findSubs(ids []int) (*Subscription, error) {
196         var subs *Subscription = nil
197         for _, id := range ids {
198                 if id >= 0 {
199                         subs = c.registry.GetSubscription(uint16(id))
200                 }
201                 if subs != nil {
202                         break
203                 }
204         }
205         if subs == nil {
206                 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
207         }
208         return subs, nil
209 }
210
211 func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) {
212         subs, err := c.findSubs(ids)
213         if err != nil {
214                 return nil, nil, err
215         }
216         trans := subs.GetTransaction()
217         if trans == nil {
218                 return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil))
219         }
220         return subs, trans, nil
221 }
222
223 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
224         xapp.Logger.Info("SubReq from xapp: %s", params.String())
225
226         SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
227         if err != nil {
228                 xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
229                 return
230         }
231
232         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
233         if err != nil {
234                 xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
235                 return
236         }
237         trans.SubReqMsg = SubReqMsg
238
239         subs, err := c.registry.ReserveSubscription(trans.Meid)
240         if err != nil {
241                 xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err))
242                 trans.Release()
243                 return
244         }
245
246         err = subs.SetTransaction(trans)
247         if err != nil {
248                 xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err))
249                 subs.Release()
250                 trans.Release()
251                 return
252         }
253         trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
254
255         xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil))
256
257         //
258         // TODO: subscription create is in fact owned by subscription and not transaction.
259         //       Transaction is toward xapp while Subscription is toward ran.
260         //       In merge several xapps may wake transactions, while only one subscription
261         //       toward ran occurs -> subscription owns subscription creation toward ran
262         //
263         //       This is intermediate solution while improving message handling
264         //
265         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
266         if err != nil {
267                 xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err))
268                 subs.Release()
269                 trans.Release()
270                 return
271         }
272
273         c.rmrSend("SubReq: SubReq to E2T", subs, trans)
274         c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
275         return
276 }
277
278 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
279         xapp.Logger.Info("SubResp from E2T: %s", params.String())
280
281         SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
282         if err != nil {
283                 xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err))
284                 return
285         }
286
287         subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId})
288         if err != nil {
289                 xapp.Logger.Error("SubResp: %s", idstring(params, nil, err))
290                 return
291         }
292         trans.SubRespMsg = SubRespMsg
293         xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil))
294
295         c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
296
297         responseReceived := trans.CheckResponseReceived()
298         if responseReceived == true {
299                 // Subscription timer already received
300                 return
301         }
302
303         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
304         if err != nil {
305                 xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err))
306                 trans.Release()
307                 return
308         }
309
310         subs.Confirmed()
311         trans.Release()
312         c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans)
313         return
314 }
315
316 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
317         xapp.Logger.Info("SubFail from E2T: %s", params.String())
318
319         SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
320         if err != nil {
321                 xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err))
322                 return
323         }
324
325         subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId})
326         if err != nil {
327                 xapp.Logger.Error("SubFail: %s", idstring(params, nil, err))
328                 return
329         }
330         trans.SubFailMsg = SubFailMsg
331         xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil))
332
333         c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
334         responseReceived := trans.CheckResponseReceived()
335         if responseReceived == true {
336                 // Subscription timer already received
337                 return
338         }
339
340         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
341         if err == nil {
342                 c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans)
343                 time.Sleep(3 * time.Second)
344         } else {
345                 //TODO error handling improvement
346                 xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err))
347         }
348
349         trans.Release()
350         subs.Release()
351         return
352 }
353
354 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
355         xapp.Logger.Info("SubReq timeout: subId: %v,  tryCount: %v", nbrId, tryCount)
356
357         subs, trans, err := c.findSubsAndTrans(([]int{nbrId}))
358         if err != nil {
359                 xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err))
360                 return
361         }
362         xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil))
363
364         responseReceived := trans.CheckResponseReceived()
365         if responseReceived == true {
366                 // Subscription Response or Failure already received
367                 return
368         }
369
370         if tryCount < maxSubReqTryCount {
371                 xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil))
372
373                 trans.RetryTransaction()
374
375                 c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans)
376
377                 tryCount++
378                 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
379                 return
380         }
381
382         // Release CREATE transaction
383         trans.Release()
384
385         // Create DELETE transaction (internal and no messages toward xapp)
386         deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false)
387         if err != nil {
388                 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
389                 //TODO improve error handling. Important at least in merge
390                 subs.Release()
391                 return
392         }
393
394         deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
395         deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
396         deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
397         deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
398         deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
399         if err != nil {
400                 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
401                 //TODO improve error handling. Important at least in merge
402                 deltrans.Release()
403                 subs.Release()
404                 return
405         }
406
407         err = subs.SetTransaction(deltrans)
408         if err != nil {
409                 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
410                 //TODO improve error handling. Important at least in merge
411                 deltrans.Release()
412                 return
413         }
414
415         c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans)
416         c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
417         return
418 }
419
420 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
421         xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
422
423         SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
424         if err != nil {
425                 xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
426                 return
427         }
428
429         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
430         if err != nil {
431                 xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
432                 return
433         }
434         trans.SubDelReqMsg = SubDelReqMsg
435
436         subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId})
437         if err != nil {
438                 xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err))
439                 trans.Release()
440                 return
441         }
442
443         err = subs.SetTransaction(trans)
444         if err != nil {
445                 xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
446                 trans.Release()
447                 return
448         }
449
450         xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil))
451
452         //
453         // TODO: subscription delete is in fact owned by subscription and not transaction.
454         //       Transaction is toward xapp while Subscription is toward ran.
455         //       In merge several xapps may wake transactions, while only one subscription
456         //       toward ran occurs -> subscription owns subscription creation toward ran
457         //
458         //       This is intermediate solution while improving message handling
459         //
460         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
461         if err != nil {
462                 xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
463                 trans.Release()
464                 return
465         }
466
467         subs.UnConfirmed()
468
469         c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans)
470
471         c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
472         return
473 }
474
475 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
476         xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
477
478         SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
479         if err != nil {
480                 xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err))
481                 return
482         }
483
484         subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId})
485         if err != nil {
486                 xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err))
487                 return
488         }
489         trans.SubDelRespMsg = SubDelRespMsg
490         xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil))
491
492         c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
493
494         responseReceived := trans.CheckResponseReceived()
495         if responseReceived == true {
496                 // Subscription Delete timer already received
497                 return
498         }
499
500         c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
501         return
502 }
503
504 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
505         xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
506
507         SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
508         if err != nil {
509                 xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err))
510                 return
511         }
512
513         subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId})
514         if err != nil {
515                 xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err))
516                 return
517         }
518         trans.SubDelFailMsg = SubDelFailMsg
519         xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil))
520
521         c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
522
523         responseReceived := trans.CheckResponseReceived()
524         if responseReceived == true {
525                 // Subscription Delete timer already received
526                 return
527         }
528
529         c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
530         return
531 }
532
533 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
534         xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
535
536         subs, trans, err := c.findSubsAndTrans([]int{nbrId})
537         if err != nil {
538                 xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err))
539                 return
540         }
541         xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil))
542
543         responseReceived := trans.CheckResponseReceived()
544         if responseReceived == true {
545                 // Subscription Delete Response or Failure already received
546                 return
547         }
548
549         if tryCount < maxSubDelReqTryCount {
550                 // Set possible to handle new response for the subId
551                 trans.RetryTransaction()
552                 c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans)
553                 tryCount++
554                 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
555                 return
556         }
557
558         c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
559         return
560 }
561
562 func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
563
564         if trans.ForwardRespToXapp == true {
565                 //Always generate SubDelResp
566                 trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
567                 trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
568                 trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
569                 trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
570
571                 var err error
572                 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
573                 if err == nil {
574                         c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans)
575                         time.Sleep(3 * time.Second)
576                 } else {
577                         //TODO error handling improvement
578                         xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err))
579                 }
580         }
581
582         trans.Release()
583         subs.Release()
584 }