2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 func idstring(err error, entries ...fmt.Stringer) string {
40 var retval string = ""
41 var filler string = ""
42 for _, entry := range entries {
43 retval += filler + entry.String()
47 retval += filler + "err(" + err.Error() + ")"
54 //-----------------------------------------------------------------------------
56 //-----------------------------------------------------------------------------
58 var e2tSubReqTimeout time.Duration = 5 * time.Second
59 var e2tSubDelReqTime time.Duration = 5 * time.Second
60 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
63 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
66 xapptweaks.XappWrapper
70 //subscriber *xapp.Subscriber
80 xapp.Logger.Info("SUBMGR")
82 viper.SetEnvPrefix("submgr")
83 viper.AllowEmptyEnv(true)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
91 registry := new(Registry)
93 registry.rtmgrClient = &rtmgrClient
95 tracker := new(Tracker)
98 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
100 c := &Control{e2ap: new(E2ap),
103 //subscriber: subscriber,
105 c.XappWrapper.Init("")
106 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
107 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
111 func (c *Control) ReadyCB(data interface{}) {
117 func (c *Control) Run() {
118 xapp.SetReadyCB(c.ReadyCB, nil)
122 //-------------------------------------------------------------------
124 //-------------------------------------------------------------------
125 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
127 switch p := params.(type) {
128 case *models.ReportParams:
129 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
131 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
134 defer trans.Release()
135 case *models.ControlParams:
136 case *models.PolicyParams:
139 return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
142 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
143 return c.registry.QueryHandler()
146 //-------------------------------------------------------------------
148 //-------------------------------------------------------------------
150 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
151 params := xapptweaks.NewParams(nil)
152 params.Mtype = trans.GetMtype()
153 params.SubId = int(subs.GetReqId().Seq)
155 params.Meid = subs.GetMeid()
157 params.PayloadLen = len(trans.Payload.Buf)
158 params.Payload = trans.Payload.Buf
160 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
161 return c.RmrSend(params, 5)
164 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
166 params := xapptweaks.NewParams(nil)
167 params.Mtype = trans.GetMtype()
168 params.SubId = int(subs.GetReqId().Seq)
169 params.Xid = trans.GetXid()
170 params.Meid = trans.GetMeid()
172 params.PayloadLen = len(trans.Payload.Buf)
173 params.Payload = trans.Payload.Buf
175 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
176 return c.RmrSend(params, 5)
179 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
180 msg := xapptweaks.NewParams(params)
182 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
183 xapp.Logger.Error("%s", err.Error())
188 defer c.Rmr.Free(msg.Mbuf)
191 case xapp.RIC_SUB_REQ:
192 go c.handleXAPPSubscriptionRequest(msg)
193 case xapp.RIC_SUB_RESP:
194 go c.handleE2TSubscriptionResponse(msg)
195 case xapp.RIC_SUB_FAILURE:
196 go c.handleE2TSubscriptionFailure(msg)
197 case xapp.RIC_SUB_DEL_REQ:
198 go c.handleXAPPSubscriptionDeleteRequest(msg)
199 case xapp.RIC_SUB_DEL_RESP:
200 go c.handleE2TSubscriptionDeleteResponse(msg)
201 case xapp.RIC_SUB_DEL_FAILURE:
202 go c.handleE2TSubscriptionDeleteFailure(msg)
204 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
209 //-------------------------------------------------------------------
210 // handle from XAPP Subscription Request
211 //------------------------------------------------------------------
212 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
213 xapp.Logger.Info("MSG from XAPP: %s", params.String())
215 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
217 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
221 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
223 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
226 defer trans.Release()
228 err = c.tracker.Track(trans)
230 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
234 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
236 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
243 go c.handleSubscriptionCreate(subs, trans)
244 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
248 switch themsg := event.(type) {
249 case *e2ap.E2APSubscriptionResponse:
250 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
253 c.rmrSendToXapp("", subs, trans)
256 case *e2ap.E2APSubscriptionFailure:
257 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
259 c.rmrSendToXapp("", subs, trans)
265 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
266 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
269 //-------------------------------------------------------------------
270 // handle from XAPP Subscription Delete Request
271 //------------------------------------------------------------------
272 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
273 xapp.Logger.Info("MSG from XAPP: %s", params.String())
275 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
277 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
281 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
283 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
286 defer trans.Release()
288 err = c.tracker.Track(trans)
290 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
294 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
296 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
303 go c.handleSubscriptionDelete(subs, trans)
304 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
306 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
308 // Whatever is received send ok delete response
309 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
310 subDelRespMsg.RequestId = subs.GetReqId().RequestId
311 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
312 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
314 c.rmrSendToXapp("", subs, trans)
317 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
320 //-------------------------------------------------------------------
321 // SUBS CREATE Handling
322 //-------------------------------------------------------------------
323 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
325 trans := c.tracker.NewSubsTransaction(subs)
326 subs.WaitTransactionTurn(trans)
327 defer subs.ReleaseTransactionTurn(trans)
328 defer trans.Release()
330 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
332 subRfMsg, valid := subs.GetCachedResponse()
333 if subRfMsg == nil && valid == true {
336 // In case of failure
337 // - make internal delete
338 // - in case duplicate cause, retry (currently max 1 retry)
340 maxRetries := uint64(1)
342 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
345 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
346 switch themsg := event.(type) {
347 case *e2ap.E2APSubscriptionResponse:
348 subRfMsg, valid = subs.SetCachedResponse(event, true)
349 case *e2ap.E2APSubscriptionFailure:
350 subRfMsg, valid = subs.SetCachedResponse(event, false)
352 for _, item := range themsg.ActionNotAdmittedList.Items {
353 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
358 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
359 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
361 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
362 subRfMsg, valid = subs.SetCachedResponse(nil, false)
363 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
367 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
369 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
372 parentTrans.SendEvent(subRfMsg, 0)
375 //-------------------------------------------------------------------
376 // SUBS DELETE Handling
377 //-------------------------------------------------------------------
379 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
381 trans := c.tracker.NewSubsTransaction(subs)
382 subs.WaitTransactionTurn(trans)
383 defer subs.ReleaseTransactionTurn(trans)
384 defer trans.Release()
386 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
389 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
392 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
397 parentTrans.SendEvent(nil, 0)
400 //-------------------------------------------------------------------
401 // send to E2T Subscription Request
402 //-------------------------------------------------------------------
403 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
405 var event interface{} = nil
406 var timedOut bool = false
408 subReqMsg := subs.SubReqMsg
409 subReqMsg.RequestId = subs.GetReqId().RequestId
410 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
412 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
416 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
417 desc := fmt.Sprintf("(retry %d)", retries)
418 c.rmrSendToE2T(desc, subs, trans)
419 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
425 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
429 //-------------------------------------------------------------------
430 // send to E2T Subscription Delete Request
431 //-------------------------------------------------------------------
433 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
435 var event interface{}
438 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
439 subDelReqMsg.RequestId = subs.GetReqId().RequestId
440 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
441 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
443 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
447 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
448 desc := fmt.Sprintf("(retry %d)", retries)
449 c.rmrSendToE2T(desc, subs, trans)
450 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
456 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
460 //-------------------------------------------------------------------
461 // handle from E2T Subscription Reponse
462 //-------------------------------------------------------------------
463 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
464 xapp.Logger.Info("MSG from E2T: %s", params.String())
465 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
467 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
470 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
472 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
475 trans := subs.GetTransaction()
477 err = fmt.Errorf("Ongoing transaction not found")
478 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
481 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
483 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
484 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
489 //-------------------------------------------------------------------
490 // handle from E2T Subscription Failure
491 //-------------------------------------------------------------------
492 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
493 xapp.Logger.Info("MSG from E2T: %s", params.String())
494 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
496 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
499 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
501 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
504 trans := subs.GetTransaction()
506 err = fmt.Errorf("Ongoing transaction not found")
507 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
510 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
512 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
513 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
518 //-------------------------------------------------------------------
519 // handle from E2T Subscription Delete Response
520 //-------------------------------------------------------------------
521 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
522 xapp.Logger.Info("MSG from E2T: %s", params.String())
523 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
525 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
528 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
530 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
533 trans := subs.GetTransaction()
535 err = fmt.Errorf("Ongoing transaction not found")
536 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
539 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
541 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
542 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
547 //-------------------------------------------------------------------
548 // handle from E2T Subscription Delete Failure
549 //-------------------------------------------------------------------
550 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
551 xapp.Logger.Info("MSG from E2T: %s", params.String())
552 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
554 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
557 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
559 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
562 trans := subs.GetTransaction()
564 err = fmt.Errorf("Ongoing transaction not found")
565 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
568 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
570 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
571 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
576 //-------------------------------------------------------------------
578 //-------------------------------------------------------------------
579 func typeofSubsMessage(v interface{}) string {
584 case *e2ap.E2APSubscriptionRequest:
586 case *e2ap.E2APSubscriptionResponse:
588 case *e2ap.E2APSubscriptionFailure:
590 case *e2ap.E2APSubscriptionDeleteRequest:
592 case *e2ap.E2APSubscriptionDeleteResponse:
594 case *e2ap.E2APSubscriptionDeleteFailure: