2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 func idstring(err error, entries ...fmt.Stringer) string {
40 var retval string = ""
41 var filler string = ""
42 for _, entry := range entries {
43 retval += filler + entry.String()
47 retval += filler + "err(" + err.Error() + ")"
54 //-----------------------------------------------------------------------------
56 //-----------------------------------------------------------------------------
58 var e2tSubReqTimeout time.Duration = 5 * time.Second
59 var e2tSubDelReqTime time.Duration = 5 * time.Second
60 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
63 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
66 xapptweaks.XappWrapper
70 //subscriber *xapp.Subscriber
80 xapp.Logger.Info("SUBMGR")
82 viper.SetEnvPrefix("submgr")
83 viper.AllowEmptyEnv(true)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
91 registry := new(Registry)
93 registry.rtmgrClient = &rtmgrClient
95 tracker := new(Tracker)
98 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
100 c := &Control{e2ap: new(E2ap),
103 //subscriber: subscriber,
105 c.XappWrapper.Init("")
106 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
107 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
111 func (c *Control) ReadyCB(data interface{}) {
117 func (c *Control) Run() {
118 xapp.SetReadyCB(c.ReadyCB, nil)
122 //-------------------------------------------------------------------
124 //-------------------------------------------------------------------
125 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
127 switch p := params.(type) {
128 case *models.ReportParams:
129 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
131 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
134 defer trans.Release()
135 case *models.ControlParams:
136 case *models.PolicyParams:
139 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
142 func (c *Control) SubscriptionDeleteHandler(string) error {
143 return fmt.Errorf("Subscription rest interface not implemented")
146 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
147 return c.registry.QueryHandler()
150 //-------------------------------------------------------------------
152 //-------------------------------------------------------------------
154 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
155 params := xapptweaks.NewParams(nil)
156 params.Mtype = trans.GetMtype()
157 params.SubId = int(subs.GetReqId().InstanceId)
159 params.Meid = subs.GetMeid()
161 params.PayloadLen = len(trans.Payload.Buf)
162 params.Payload = trans.Payload.Buf
164 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
165 return c.RmrSend(params, 5)
168 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
170 params := xapptweaks.NewParams(nil)
171 params.Mtype = trans.GetMtype()
172 params.SubId = int(subs.GetReqId().InstanceId)
173 params.Xid = trans.GetXid()
174 params.Meid = trans.GetMeid()
176 params.PayloadLen = len(trans.Payload.Buf)
177 params.Payload = trans.Payload.Buf
179 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
180 return c.RmrSend(params, 5)
183 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
184 msg := xapptweaks.NewParams(params)
186 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
187 xapp.Logger.Error("%s", err.Error())
192 defer c.Rmr.Free(msg.Mbuf)
195 case xapp.RIC_SUB_REQ:
196 go c.handleXAPPSubscriptionRequest(msg)
197 case xapp.RIC_SUB_RESP:
198 go c.handleE2TSubscriptionResponse(msg)
199 case xapp.RIC_SUB_FAILURE:
200 go c.handleE2TSubscriptionFailure(msg)
201 case xapp.RIC_SUB_DEL_REQ:
202 go c.handleXAPPSubscriptionDeleteRequest(msg)
203 case xapp.RIC_SUB_DEL_RESP:
204 go c.handleE2TSubscriptionDeleteResponse(msg)
205 case xapp.RIC_SUB_DEL_FAILURE:
206 go c.handleE2TSubscriptionDeleteFailure(msg)
208 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
213 //-------------------------------------------------------------------
214 // handle from XAPP Subscription Request
215 //------------------------------------------------------------------
216 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
217 xapp.Logger.Info("MSG from XAPP: %s", params.String())
219 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
221 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
225 trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
227 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
230 defer trans.Release()
232 err = c.tracker.Track(trans)
234 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
238 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
239 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
241 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
248 go c.handleSubscriptionCreate(subs, trans)
249 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
253 switch themsg := event.(type) {
254 case *e2ap.E2APSubscriptionResponse:
255 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
258 c.rmrSendToXapp("", subs, trans)
261 case *e2ap.E2APSubscriptionFailure:
262 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
264 c.rmrSendToXapp("", subs, trans)
270 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
271 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
274 //-------------------------------------------------------------------
275 // handle from XAPP Subscription Delete Request
276 //------------------------------------------------------------------
277 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
278 xapp.Logger.Info("MSG from XAPP: %s", params.String())
280 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
282 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
286 trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
288 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
291 defer trans.Release()
293 err = c.tracker.Track(trans)
295 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
299 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
301 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
308 go c.handleSubscriptionDelete(subs, trans)
309 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
311 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
313 // Whatever is received send ok delete response
314 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
315 subDelRespMsg.RequestId = subs.GetReqId().RequestId
316 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
317 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
319 c.rmrSendToXapp("", subs, trans)
322 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
323 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
326 //-------------------------------------------------------------------
327 // SUBS CREATE Handling
328 //-------------------------------------------------------------------
329 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
331 trans := c.tracker.NewSubsTransaction(subs)
332 subs.WaitTransactionTurn(trans)
333 defer subs.ReleaseTransactionTurn(trans)
334 defer trans.Release()
336 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
338 subRfMsg, valid := subs.GetCachedResponse()
339 if subRfMsg == nil && valid == true {
342 // In case of failure
343 // - make internal delete
344 // - in case duplicate cause, retry (currently max 1 retry)
346 maxRetries := uint64(1)
348 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
351 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
352 switch themsg := event.(type) {
353 case *e2ap.E2APSubscriptionResponse:
354 subRfMsg, valid = subs.SetCachedResponse(event, true)
355 case *e2ap.E2APSubscriptionFailure:
356 subRfMsg, valid = subs.SetCachedResponse(event, false)
358 for _, item := range themsg.ActionNotAdmittedList.Items {
359 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
364 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
365 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
367 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
368 subRfMsg, valid = subs.SetCachedResponse(nil, false)
369 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
373 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
375 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
378 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
380 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
382 parentTrans.SendEvent(subRfMsg, 0)
385 //-------------------------------------------------------------------
386 // SUBS DELETE Handling
387 //-------------------------------------------------------------------
389 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
391 trans := c.tracker.NewSubsTransaction(subs)
392 subs.WaitTransactionTurn(trans)
393 defer subs.ReleaseTransactionTurn(trans)
394 defer trans.Release()
396 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
399 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
402 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
406 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
407 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
408 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
409 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
410 parentTrans.SendEvent(nil, 0)
413 //-------------------------------------------------------------------
414 // send to E2T Subscription Request
415 //-------------------------------------------------------------------
416 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
418 var event interface{} = nil
419 var timedOut bool = false
421 subReqMsg := subs.SubReqMsg
422 subReqMsg.RequestId = subs.GetReqId().RequestId
423 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
425 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
429 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
430 desc := fmt.Sprintf("(retry %d)", retries)
431 c.rmrSendToE2T(desc, subs, trans)
432 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
438 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
442 //-------------------------------------------------------------------
443 // send to E2T Subscription Delete Request
444 //-------------------------------------------------------------------
446 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
448 var event interface{}
451 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
452 subDelReqMsg.RequestId = subs.GetReqId().RequestId
453 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
454 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
456 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
460 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
461 desc := fmt.Sprintf("(retry %d)", retries)
462 c.rmrSendToE2T(desc, subs, trans)
463 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
469 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
473 //-------------------------------------------------------------------
474 // handle from E2T Subscription Reponse
475 //-------------------------------------------------------------------
476 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
477 xapp.Logger.Info("MSG from E2T: %s", params.String())
478 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
480 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
483 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
485 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
488 trans := subs.GetTransaction()
490 err = fmt.Errorf("Ongoing transaction not found")
491 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
494 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
496 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
497 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
502 //-------------------------------------------------------------------
503 // handle from E2T Subscription Failure
504 //-------------------------------------------------------------------
505 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
506 xapp.Logger.Info("MSG from E2T: %s", params.String())
507 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
509 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
512 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
514 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
517 trans := subs.GetTransaction()
519 err = fmt.Errorf("Ongoing transaction not found")
520 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
523 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
525 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
526 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
531 //-------------------------------------------------------------------
532 // handle from E2T Subscription Delete Response
533 //-------------------------------------------------------------------
534 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
535 xapp.Logger.Info("MSG from E2T: %s", params.String())
536 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
538 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
541 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
543 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
546 trans := subs.GetTransaction()
548 err = fmt.Errorf("Ongoing transaction not found")
549 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
552 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
554 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
555 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
560 //-------------------------------------------------------------------
561 // handle from E2T Subscription Delete Failure
562 //-------------------------------------------------------------------
563 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
564 xapp.Logger.Info("MSG from E2T: %s", params.String())
565 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
567 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
570 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
572 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
575 trans := subs.GetTransaction()
577 err = fmt.Errorf("Ongoing transaction not found")
578 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
581 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
583 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
584 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
589 //-------------------------------------------------------------------
591 //-------------------------------------------------------------------
592 func typeofSubsMessage(v interface{}) string {
597 case *e2ap.E2APSubscriptionRequest:
599 case *e2ap.E2APSubscriptionResponse:
601 case *e2ap.E2APSubscriptionFailure:
603 case *e2ap.E2APSubscriptionDeleteRequest:
605 case *e2ap.E2APSubscriptionDeleteResponse:
607 case *e2ap.E2APSubscriptionDeleteFailure: