2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 func idstring(err error, entries ...fmt.Stringer) string {
40 var retval string = ""
41 var filler string = ""
42 for _, entry := range entries {
43 retval += filler + entry.String()
47 retval += filler + "err(" + err.Error() + ")"
54 //-----------------------------------------------------------------------------
56 //-----------------------------------------------------------------------------
58 var e2tSubReqTimeout time.Duration = 5 * time.Second
59 var e2tSubDelReqTime time.Duration = 5 * time.Second
60 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
63 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
66 xapptweaks.XappWrapper
70 //subscriber *xapp.Subscriber
80 xapp.Logger.Info("SUBMGR")
82 viper.SetEnvPrefix("submgr")
83 viper.AllowEmptyEnv(true)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
91 registry := new(Registry)
93 registry.rtmgrClient = &rtmgrClient
95 tracker := new(Tracker)
98 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
100 c := &Control{e2ap: new(E2ap),
103 //subscriber: subscriber,
105 c.XappWrapper.Init("")
106 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
107 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
111 func (c *Control) ReadyCB(data interface{}) {
117 func (c *Control) Run() {
118 xapp.SetReadyCB(c.ReadyCB, nil)
122 //-------------------------------------------------------------------
124 //-------------------------------------------------------------------
125 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
127 switch p := params.(type) {
128 case *models.ReportParams:
129 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
131 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
134 defer trans.Release()
135 case *models.ControlParams:
136 case *models.PolicyParams:
139 return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
142 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
143 return c.registry.QueryHandler()
146 //-------------------------------------------------------------------
148 //-------------------------------------------------------------------
150 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
151 params := xapptweaks.NewParams(nil)
152 params.Mtype = trans.GetMtype()
153 params.SubId = int(subs.GetReqId().Seq)
155 params.Meid = subs.GetMeid()
157 params.PayloadLen = len(trans.Payload.Buf)
158 params.Payload = trans.Payload.Buf
160 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
161 return c.RmrSend(params)
164 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
166 params := xapptweaks.NewParams(nil)
167 params.Mtype = trans.GetMtype()
168 params.SubId = int(subs.GetReqId().Seq)
169 params.Xid = trans.GetXid()
170 params.Meid = trans.GetMeid()
172 params.PayloadLen = len(trans.Payload.Buf)
173 params.Payload = trans.Payload.Buf
175 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
176 return c.RmrSend(params)
179 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
180 msg := xapptweaks.NewParams(params)
182 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
183 xapp.Logger.Error("%s", err.Error())
188 defer c.Rmr.Free(msg.Mbuf)
191 case xapp.RIC_SUB_REQ:
192 go c.handleXAPPSubscriptionRequest(msg)
193 case xapp.RIC_SUB_RESP:
194 go c.handleE2TSubscriptionResponse(msg)
195 case xapp.RIC_SUB_FAILURE:
196 go c.handleE2TSubscriptionFailure(msg)
197 case xapp.RIC_SUB_DEL_REQ:
198 go c.handleXAPPSubscriptionDeleteRequest(msg)
199 case xapp.RIC_SUB_DEL_RESP:
200 go c.handleE2TSubscriptionDeleteResponse(msg)
201 case xapp.RIC_SUB_DEL_FAILURE:
202 go c.handleE2TSubscriptionDeleteFailure(msg)
204 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
209 //-------------------------------------------------------------------
210 // handle from XAPP Subscription Request
211 //------------------------------------------------------------------
212 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
213 xapp.Logger.Info("MSG from XAPP: %s", params.String())
215 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
217 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
221 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
223 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
226 defer trans.Release()
228 err = c.tracker.Track(trans)
230 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
234 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
236 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
243 go c.handleSubscriptionCreate(subs, trans)
244 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
248 switch themsg := event.(type) {
249 case *e2ap.E2APSubscriptionResponse:
250 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
253 c.rmrSendToXapp("", subs, trans)
256 case *e2ap.E2APSubscriptionFailure:
257 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
259 c.rmrSendToXapp("", subs, trans)
265 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
266 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
269 //-------------------------------------------------------------------
270 // handle from XAPP Subscription Delete Request
271 //------------------------------------------------------------------
272 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
273 xapp.Logger.Info("MSG from XAPP: %s", params.String())
275 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
277 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
281 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
283 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
286 defer trans.Release()
288 err = c.tracker.Track(trans)
290 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
294 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
296 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
303 go c.handleSubscriptionDelete(subs, trans)
304 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
306 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
308 // Whatever is received send ok delete response
309 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
310 subDelRespMsg.RequestId = subs.GetReqId().RequestId
311 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
312 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
314 c.rmrSendToXapp("", subs, trans)
317 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
320 //-------------------------------------------------------------------
321 // SUBS CREATE Handling
322 //-------------------------------------------------------------------
323 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
325 trans := c.tracker.NewSubsTransaction(subs)
326 subs.WaitTransactionTurn(trans)
327 defer subs.ReleaseTransactionTurn(trans)
328 defer trans.Release()
330 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
332 subRfMsg, valid := subs.GetCachedResponse()
333 if subRfMsg == nil && valid == true {
334 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
335 switch event.(type) {
336 case *e2ap.E2APSubscriptionResponse:
337 subRfMsg, valid = subs.SetCachedResponse(event, true)
338 case *e2ap.E2APSubscriptionFailure:
339 subRfMsg, valid = subs.SetCachedResponse(event, false)
341 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
342 subRfMsg, valid = subs.SetCachedResponse(nil, false)
343 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
345 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
347 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
350 parentTrans.SendEvent(subRfMsg, 0)
353 //-------------------------------------------------------------------
354 // SUBS DELETE Handling
355 //-------------------------------------------------------------------
357 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
359 trans := c.tracker.NewSubsTransaction(subs)
360 subs.WaitTransactionTurn(trans)
361 defer subs.ReleaseTransactionTurn(trans)
362 defer trans.Release()
364 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
367 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
370 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
375 parentTrans.SendEvent(nil, 0)
378 //-------------------------------------------------------------------
379 // send to E2T Subscription Request
380 //-------------------------------------------------------------------
381 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
383 var event interface{} = nil
384 var timedOut bool = false
386 subReqMsg := subs.SubReqMsg
387 subReqMsg.RequestId = subs.GetReqId().RequestId
388 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
390 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
394 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
395 desc := fmt.Sprintf("(retry %d)", retries)
396 c.rmrSendToE2T(desc, subs, trans)
397 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
403 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
407 //-------------------------------------------------------------------
408 // send to E2T Subscription Delete Request
409 //-------------------------------------------------------------------
411 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
413 var event interface{}
416 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
417 subDelReqMsg.RequestId = subs.GetReqId().RequestId
418 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
419 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
421 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
425 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
426 desc := fmt.Sprintf("(retry %d)", retries)
427 c.rmrSendToE2T(desc, subs, trans)
428 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
434 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
438 //-------------------------------------------------------------------
439 // handle from E2T Subscription Reponse
440 //-------------------------------------------------------------------
441 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
442 xapp.Logger.Info("MSG from E2T: %s", params.String())
443 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
445 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
448 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
450 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
453 trans := subs.GetTransaction()
455 err = fmt.Errorf("Ongoing transaction not found")
456 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
459 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
461 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
462 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
467 //-------------------------------------------------------------------
468 // handle from E2T Subscription Failure
469 //-------------------------------------------------------------------
470 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
471 xapp.Logger.Info("MSG from E2T: %s", params.String())
472 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
474 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
477 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
479 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
482 trans := subs.GetTransaction()
484 err = fmt.Errorf("Ongoing transaction not found")
485 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
488 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
490 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
491 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
496 //-------------------------------------------------------------------
497 // handle from E2T Subscription Delete Response
498 //-------------------------------------------------------------------
499 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
500 xapp.Logger.Info("MSG from E2T: %s", params.String())
501 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
503 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
506 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
508 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
511 trans := subs.GetTransaction()
513 err = fmt.Errorf("Ongoing transaction not found")
514 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
517 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
519 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
520 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
525 //-------------------------------------------------------------------
526 // handle from E2T Subscription Delete Failure
527 //-------------------------------------------------------------------
528 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
529 xapp.Logger.Info("MSG from E2T: %s", params.String())
530 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
532 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
535 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
537 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
540 trans := subs.GetTransaction()
542 err = fmt.Errorf("Ongoing transaction not found")
543 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
546 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
548 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
549 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
554 //-------------------------------------------------------------------
556 //-------------------------------------------------------------------
557 func typeofSubsMessage(v interface{}) string {
562 case *e2ap.E2APSubscriptionRequest:
564 case *e2ap.E2APSubscriptionResponse:
566 case *e2ap.E2APSubscriptionFailure:
568 case *e2ap.E2APSubscriptionDeleteRequest:
570 case *e2ap.E2APSubscriptionDeleteResponse:
572 case *e2ap.E2APSubscriptionDeleteFailure: