RICPLT-3014 Subscription multiple endpoints
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         httptransport "github.com/go-openapi/runtime/client"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "sync"
32         "time"
33 )
34
35 //-----------------------------------------------------------------------------
36 //
37 //-----------------------------------------------------------------------------
38
39 var e2tSubReqTimeout time.Duration = 5 * time.Second
40 var e2tSubDelReqTime time.Duration = 5 * time.Second
41 var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
42 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
43
44 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
45
46 type Control struct {
47         e2ap         *E2ap
48         registry     *Registry
49         tracker      *Tracker
50         timerMap     *TimerMap
51         rmrSendMutex sync.Mutex
52         msgCounter   uint64
53 }
54
55 type RMRMeid struct {
56         PlmnID  string
57         EnbID   string
58         RanName string
59 }
60
61 const (
62         CREATE Action = 0
63         MERGE  Action = 1
64         NONE   Action = 2
65         DELETE Action = 3
66 )
67
68 func init() {
69         xapp.Logger.Info("SUBMGR")
70         viper.AutomaticEnv()
71         viper.SetEnvPrefix("submgr")
72         viper.AllowEmptyEnv(true)
73 }
74
75 func NewControl() *Control {
76
77         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
78         client := rtmgrclient.New(transport, strfmt.Default)
79         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
80         deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
81         rtmgrClient := RtmgrClient{client, handle, deleteHandle}
82
83         registry := new(Registry)
84         registry.Initialize()
85         registry.rtmgrClient = &rtmgrClient
86
87         tracker := new(Tracker)
88         tracker.Init()
89
90         timerMap := new(TimerMap)
91         timerMap.Init()
92
93         return &Control{e2ap: new(E2ap),
94                 registry:   registry,
95                 tracker:    tracker,
96                 timerMap:   timerMap,
97                 msgCounter: 0,
98         }
99 }
100
101 func (c *Control) Run() {
102         xapp.Run(c)
103 }
104
105 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
106
107         xapp.Logger.Info("%s: %s", desc, params.String())
108         status := false
109         i := 1
110         for ; i <= 10 && status == false; i++ {
111                 c.rmrSendMutex.Lock()
112                 status = xapp.Rmr.Send(params.RMRParams, false)
113                 c.rmrSendMutex.Unlock()
114                 if status == false {
115                         xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
116                         time.Sleep(500 * time.Millisecond)
117                 }
118         }
119         if status == false {
120                 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
121                 xapp.Logger.Error("%s: %s", desc, err.Error())
122                 xapp.Rmr.Free(params.Mbuf)
123         }
124         return
125 }
126
127 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
128         params := &RMRParams{&xapp.RMRParams{}}
129         params.Mtype = trans.GetMtype()
130         params.SubId = int(subs.GetSubId())
131         params.Xid = ""
132         params.Meid = subs.GetMeid()
133         params.Src = ""
134         params.PayloadLen = len(trans.Payload.Buf)
135         params.Payload = trans.Payload.Buf
136         params.Mbuf = nil
137
138         return c.rmrSendRaw(desc, params)
139 }
140
141 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
142         params := &RMRParams{&xapp.RMRParams{}}
143         params.Mtype = trans.GetMtype()
144         params.SubId = int(subs.GetSubId())
145         params.Xid = trans.GetXid()
146         params.Meid = trans.GetMeid()
147         params.Src = ""
148         params.PayloadLen = len(trans.Payload.Buf)
149         params.Payload = trans.Payload.Buf
150         params.Mbuf = nil
151
152         return c.rmrSendRaw(desc, params)
153 }
154
155 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
156         xapp.Rmr.Free(params.Mbuf)
157         params.Mbuf = nil
158         msg := &RMRParams{params}
159         c.msgCounter++
160         switch msg.Mtype {
161         case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162                 go c.handleXAPPSubscriptionRequest(msg)
163         case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164                 go c.handleE2TSubscriptionResponse(msg)
165         case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166                 go c.handleE2TSubscriptionFailure(msg)
167         case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168                 go c.handleXAPPSubscriptionDeleteRequest(msg)
169         case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170                 go c.handleE2TSubscriptionDeleteResponse(msg)
171         case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172                 go c.handleE2TSubscriptionDeleteFailure(msg)
173         default:
174                 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175         }
176
177         return nil
178 }
179 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
180         var retval string = ""
181         var filler string = ""
182         if trans != nil {
183                 retval += filler + trans.String()
184                 filler = " "
185         }
186         if subs != nil {
187                 retval += filler + subs.String()
188                 filler = " "
189         }
190         if err != nil {
191                 retval += filler + "err(" + err.Error() + ")"
192                 filler = " "
193
194         }
195         return retval
196 }
197
198 //-------------------------------------------------------------------
199 // handle from XAPP Subscription Request
200 //------------------------------------------------------------------
201 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
202         xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
203
204         subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
205         if err != nil {
206                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
207                 return
208         }
209
210         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
211         if err != nil {
212                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
213                 return
214         }
215         defer trans.Release()
216
217         subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
218         if err != nil {
219                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
220                 return
221         }
222
223         if subs.IsTransactionReserved() {
224                 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
225                 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
226                 return
227         }
228
229         //
230         // Wake subs request
231         //
232         go c.handleSubscriptionCreate(subs, trans)
233         event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
234
235         err = nil
236         if event != nil {
237                 switch themsg := event.(type) {
238                 case *e2ap.E2APSubscriptionResponse:
239                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
240                         if err == nil {
241                                 c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
242                                 return
243                         }
244                 case *e2ap.E2APSubscriptionFailure:
245                         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
246                         if err == nil {
247                                 c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
248                         }
249                         return
250                 default:
251                         break
252                 }
253         }
254         xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
255 }
256
257 //-------------------------------------------------------------------
258 // handle from XAPP Subscription Delete Request
259 //------------------------------------------------------------------
260 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
261         xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
262
263         subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
264         if err != nil {
265                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
266                 return
267         }
268
269         trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
270         if err != nil {
271                 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
272                 return
273         }
274         defer trans.Release()
275
276         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
277         if err != nil {
278                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
279                 return
280         }
281
282         if subs.IsTransactionReserved() {
283                 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
284                 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
285                 return
286         }
287
288         //
289         // Wake subs delete
290         //
291         go c.handleSubscriptionDelete(subs, trans)
292         trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
293
294         // Whatever is received send ok delete response
295         subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
296         subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
297         subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
298         subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
299         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
300         if err == nil {
301                 c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
302         }
303 }
304
305 //-------------------------------------------------------------------
306 // SUBS CREATE Handling
307 //-------------------------------------------------------------------
308 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
309
310         trans := c.tracker.NewTransaction(subs.GetMeid())
311         subs.WaitTransactionTurn(trans)
312         defer subs.ReleaseTransactionTurn(trans)
313         defer trans.Release()
314
315         xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
316
317         if subs.SubRespMsg != nil {
318                 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
319                 parentTrans.SendEvent(subs.SubRespMsg, 0)
320                 return
321         }
322
323         event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
324         switch themsg := event.(type) {
325         case *e2ap.E2APSubscriptionResponse:
326                 subs.SubRespMsg = themsg
327                 parentTrans.SendEvent(event, 0)
328                 return
329         case *e2ap.E2APSubscriptionFailure:
330                 //TODO: Possible delete and one retry for subs req
331                 parentTrans.SendEvent(event, 0)
332         default:
333                 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
334                 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
335                 parentTrans.SendEvent(nil, 0)
336         }
337
338         subs.DelEndpoint(parentTrans.GetEndpoint())
339 }
340
341 //-------------------------------------------------------------------
342 // SUBS DELETE Handling
343 //-------------------------------------------------------------------
344 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
345
346         trans := c.tracker.NewTransaction(subs.GetMeid())
347         subs.WaitTransactionTurn(trans)
348         defer subs.ReleaseTransactionTurn(trans)
349         defer trans.Release()
350
351         xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
352
353         event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
354
355         parentTrans.SendEvent(event, 0)
356         subs.DelEndpoint(parentTrans.GetEndpoint())
357 }
358
359 //-------------------------------------------------------------------
360 // send to E2T Subscription Request
361 //-------------------------------------------------------------------
362 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
363         var err error
364         var event interface{} = nil
365         var timedOut bool = false
366
367         subReqMsg := subs.SubReqMsg
368         subReqMsg.RequestId.Id = 123
369         subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
370         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
371         if err != nil {
372                 xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
373                 return event
374         }
375
376         for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
377                 desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
378                 c.rmrSend(desc, subs, trans)
379                 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
380                 if timedOut {
381                         continue
382                 }
383                 break
384         }
385         xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
386         return event
387 }
388
389 //-------------------------------------------------------------------
390 // send to E2T Subscription Delete Request
391 //-------------------------------------------------------------------
392
393 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
394         var err error
395         var event interface{}
396         var timedOut bool
397
398         subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
399         subDelReqMsg.RequestId.Id = 123
400         subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
401         subDelReqMsg.FunctionId = 0
402         trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
403         if err != nil {
404                 xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
405                 return event
406         }
407
408         for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
409                 desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
410                 c.rmrSend(desc, subs, trans)
411                 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
412                 if timedOut {
413                         continue
414                 }
415                 break
416         }
417         xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
418         return event
419 }
420
421 //-------------------------------------------------------------------
422 // handle from E2T Subscription Reponse
423 //-------------------------------------------------------------------
424 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
425         xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
426         subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
427         if err != nil {
428                 xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
429                 return
430         }
431         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
432         if err != nil {
433                 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
434                 return
435         }
436         trans := subs.GetTransaction()
437         if trans == nil {
438                 err = fmt.Errorf("Ongoing transaction not found")
439                 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
440                 return
441         }
442         sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
443         if sendOk == false {
444                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
445                 xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
446         }
447         return
448 }
449
450 //-------------------------------------------------------------------
451 // handle from E2T Subscription Failure
452 //-------------------------------------------------------------------
453 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
454         xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
455         subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
456         if err != nil {
457                 xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
458                 return
459         }
460         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
461         if err != nil {
462                 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
463                 return
464         }
465         trans := subs.GetTransaction()
466         if trans == nil {
467                 err = fmt.Errorf("Ongoing transaction not found")
468                 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
469                 return
470         }
471         sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
472         if sendOk == false {
473                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
474                 xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
475         }
476         return
477 }
478
479 //-------------------------------------------------------------------
480 // handle from E2T Subscription Delete Response
481 //-------------------------------------------------------------------
482 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
483         xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
484         subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
485         if err != nil {
486                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
487                 return
488         }
489         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
490         if err != nil {
491                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
492                 return
493         }
494         trans := subs.GetTransaction()
495         if trans == nil {
496                 err = fmt.Errorf("Ongoing transaction not found")
497                 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
498                 return
499         }
500         sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
501         if sendOk == false {
502                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
503                 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
504         }
505         return
506 }
507
508 //-------------------------------------------------------------------
509 // handle from E2T Subscription Delete Failure
510 //-------------------------------------------------------------------
511 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
512         xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
513         subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
514         if err != nil {
515                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
516                 return
517         }
518         subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
519         if err != nil {
520                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
521                 return
522         }
523         trans := subs.GetTransaction()
524         if trans == nil {
525                 err = fmt.Errorf("Ongoing transaction not found")
526                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
527                 return
528         }
529         sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
530         if sendOk == false {
531                 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
532                 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
533         }
534         return
535 }
536
537 //-------------------------------------------------------------------
538 //
539 //-------------------------------------------------------------------
540 func typeofSubsMessage(v interface{}) string {
541         if v == nil {
542                 return "NIL"
543         }
544         switch v.(type) {
545         case *e2ap.E2APSubscriptionRequest:
546                 return "SubReq"
547         case *e2ap.E2APSubscriptionResponse:
548                 return "SubResp"
549         case *e2ap.E2APSubscriptionFailure:
550                 return "SubFail"
551         case *e2ap.E2APSubscriptionDeleteRequest:
552                 return "SubDelReq"
553         case *e2ap.E2APSubscriptionDeleteResponse:
554                 return "SubDelResp"
555         case *e2ap.E2APSubscriptionDeleteFailure:
556                 return "SubDelFail"
557         default:
558                 return "Unknown"
559         }
560 }