2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration
58 var e2tSubDelReqTime time.Duration
59 var e2tRecvMsgTimeout time.Duration
60 var e2tMaxSubReqTryCount uint64 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 //subscriber *xapp.Subscriber
79 xapp.Logger.Info("SUBMGR")
81 viper.SetEnvPrefix("submgr")
82 viper.AllowEmptyEnv(true)
85 func NewControl() *Control {
87 // viper.GetDuration returns nanoseconds
88 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
89 if e2tSubReqTimeout == 0 {
90 e2tSubReqTimeout = 2000 * 1000000
92 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
93 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
94 if e2tSubDelReqTime == 0 {
95 e2tSubDelReqTime = 2000 * 1000000
97 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
98 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
99 if e2tRecvMsgTimeout == 0 {
100 e2tRecvMsgTimeout = 2000 * 1000000
102 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
103 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
104 if e2tMaxSubReqTryCount == 0 {
105 e2tMaxSubReqTryCount = 1
107 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
108 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
109 if e2tMaxSubDelReqTryCount == 0 {
110 e2tMaxSubDelReqTryCount = 1
112 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
114 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
115 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
117 registry := new(Registry)
118 registry.Initialize()
119 registry.rtmgrClient = &rtmgrClient
121 tracker := new(Tracker)
124 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
126 c := &Control{e2ap: new(E2ap),
129 //subscriber: subscriber,
131 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
132 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
136 func (c *Control) ReadyCB(data interface{}) {
137 if c.RMRClient == nil {
138 c.RMRClient = xapp.Rmr
142 func (c *Control) Run() {
143 xapp.SetReadyCB(c.ReadyCB, nil)
147 //-------------------------------------------------------------------
149 //-------------------------------------------------------------------
150 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
152 switch p := params.(type) {
153 case *models.ReportParams:
154 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
156 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
159 defer trans.Release()
160 case *models.ControlParams:
161 case *models.PolicyParams:
164 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
167 func (c *Control) SubscriptionDeleteHandler(string) error {
168 return fmt.Errorf("Subscription rest interface not implemented")
171 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
172 return c.registry.QueryHandler()
175 //-------------------------------------------------------------------
177 //-------------------------------------------------------------------
179 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
180 params := &xapp.RMRParams{}
181 params.Mtype = trans.GetMtype()
182 params.SubId = int(subs.GetReqId().InstanceId)
184 params.Meid = subs.GetMeid()
186 params.PayloadLen = len(trans.Payload.Buf)
187 params.Payload = trans.Payload.Buf
189 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
190 return c.SendWithRetry(params, false, 5)
193 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
195 params := &xapp.RMRParams{}
196 params.Mtype = trans.GetMtype()
197 params.SubId = int(subs.GetReqId().InstanceId)
198 params.Xid = trans.GetXid()
199 params.Meid = trans.GetMeid()
201 params.PayloadLen = len(trans.Payload.Buf)
202 params.Payload = trans.Payload.Buf
204 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
205 return c.SendWithRetry(params, false, 5)
208 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
209 if c.RMRClient == nil {
210 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
211 xapp.Logger.Error("%s", err.Error())
216 defer c.RMRClient.Free(msg.Mbuf)
218 // xapp-frame might use direct access to c buffer and
219 // when msg.Mbuf is freed, someone might take it into use
220 // and payload data might be invalid inside message handle function
222 // subscriptions won't load system a lot so there is no
223 // real performance hit by cloning buffer into new go byte slice
224 cPay := append(msg.Payload[:0:0], msg.Payload...)
226 msg.PayloadLen = len(cPay)
229 case xapp.RIC_SUB_REQ:
230 go c.handleXAPPSubscriptionRequest(msg)
231 case xapp.RIC_SUB_RESP:
232 go c.handleE2TSubscriptionResponse(msg)
233 case xapp.RIC_SUB_FAILURE:
234 go c.handleE2TSubscriptionFailure(msg)
235 case xapp.RIC_SUB_DEL_REQ:
236 go c.handleXAPPSubscriptionDeleteRequest(msg)
237 case xapp.RIC_SUB_DEL_RESP:
238 go c.handleE2TSubscriptionDeleteResponse(msg)
239 case xapp.RIC_SUB_DEL_FAILURE:
240 go c.handleE2TSubscriptionDeleteFailure(msg)
242 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
247 //-------------------------------------------------------------------
248 // handle from XAPP Subscription Request
249 //------------------------------------------------------------------
250 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
251 xapp.Logger.Info("MSG from XAPP: %s", params.String())
253 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
255 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
259 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
261 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
264 defer trans.Release()
266 err = c.tracker.Track(trans)
268 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
272 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
273 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
275 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
282 go c.handleSubscriptionCreate(subs, trans)
283 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
287 switch themsg := event.(type) {
288 case *e2ap.E2APSubscriptionResponse:
289 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
292 c.rmrSendToXapp("", subs, trans)
295 case *e2ap.E2APSubscriptionFailure:
296 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
298 c.rmrSendToXapp("", subs, trans)
304 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
305 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
308 //-------------------------------------------------------------------
309 // handle from XAPP Subscription Delete Request
310 //------------------------------------------------------------------
311 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
312 xapp.Logger.Info("MSG from XAPP: %s", params.String())
314 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
316 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
320 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
322 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
325 defer trans.Release()
327 err = c.tracker.Track(trans)
329 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
333 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
335 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
342 go c.handleSubscriptionDelete(subs, trans)
343 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
345 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
347 // Whatever is received send ok delete response
348 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
349 subDelRespMsg.RequestId = subs.GetReqId().RequestId
350 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
351 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
353 c.rmrSendToXapp("", subs, trans)
356 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
357 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
360 //-------------------------------------------------------------------
361 // SUBS CREATE Handling
362 //-------------------------------------------------------------------
363 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
365 trans := c.tracker.NewSubsTransaction(subs)
366 subs.WaitTransactionTurn(trans)
367 defer subs.ReleaseTransactionTurn(trans)
368 defer trans.Release()
370 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
372 subRfMsg, valid := subs.GetCachedResponse()
373 if subRfMsg == nil && valid == true {
376 // In case of failure
377 // - make internal delete
378 // - in case duplicate cause, retry (currently max 1 retry)
380 maxRetries := uint64(1)
382 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
385 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
386 switch themsg := event.(type) {
387 case *e2ap.E2APSubscriptionResponse:
388 subRfMsg, valid = subs.SetCachedResponse(event, true)
389 case *e2ap.E2APSubscriptionFailure:
390 subRfMsg, valid = subs.SetCachedResponse(event, false)
392 for _, item := range themsg.ActionNotAdmittedList.Items {
393 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
398 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
399 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
401 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
402 subRfMsg, valid = subs.SetCachedResponse(nil, false)
403 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
407 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
409 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
412 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
414 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
416 parentTrans.SendEvent(subRfMsg, 0)
419 //-------------------------------------------------------------------
420 // SUBS DELETE Handling
421 //-------------------------------------------------------------------
423 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
425 trans := c.tracker.NewSubsTransaction(subs)
426 subs.WaitTransactionTurn(trans)
427 defer subs.ReleaseTransactionTurn(trans)
428 defer trans.Release()
430 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
433 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
436 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
440 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
441 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
442 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
443 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
444 parentTrans.SendEvent(nil, 0)
447 //-------------------------------------------------------------------
448 // send to E2T Subscription Request
449 //-------------------------------------------------------------------
450 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
452 var event interface{} = nil
453 var timedOut bool = false
455 subReqMsg := subs.SubReqMsg
456 subReqMsg.RequestId = subs.GetReqId().RequestId
457 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
459 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
463 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
464 desc := fmt.Sprintf("(retry %d)", retries)
465 c.rmrSendToE2T(desc, subs, trans)
466 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
472 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
476 //-------------------------------------------------------------------
477 // send to E2T Subscription Delete Request
478 //-------------------------------------------------------------------
480 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
482 var event interface{}
485 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
486 subDelReqMsg.RequestId = subs.GetReqId().RequestId
487 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
488 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
490 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
494 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
495 desc := fmt.Sprintf("(retry %d)", retries)
496 c.rmrSendToE2T(desc, subs, trans)
497 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
503 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
507 //-------------------------------------------------------------------
508 // handle from E2T Subscription Reponse
509 //-------------------------------------------------------------------
510 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
511 xapp.Logger.Info("MSG from E2T: %s", params.String())
512 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
514 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
517 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
519 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
522 trans := subs.GetTransaction()
524 err = fmt.Errorf("Ongoing transaction not found")
525 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
528 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
530 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
531 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
536 //-------------------------------------------------------------------
537 // handle from E2T Subscription Failure
538 //-------------------------------------------------------------------
539 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
540 xapp.Logger.Info("MSG from E2T: %s", params.String())
541 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
543 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
546 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
548 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
551 trans := subs.GetTransaction()
553 err = fmt.Errorf("Ongoing transaction not found")
554 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
557 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
559 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
560 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
565 //-------------------------------------------------------------------
566 // handle from E2T Subscription Delete Response
567 //-------------------------------------------------------------------
568 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
569 xapp.Logger.Info("MSG from E2T: %s", params.String())
570 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
572 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
575 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
577 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
580 trans := subs.GetTransaction()
582 err = fmt.Errorf("Ongoing transaction not found")
583 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
586 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
588 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
589 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
594 //-------------------------------------------------------------------
595 // handle from E2T Subscription Delete Failure
596 //-------------------------------------------------------------------
597 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
598 xapp.Logger.Info("MSG from E2T: %s", params.String())
599 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
601 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
604 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
606 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
609 trans := subs.GetTransaction()
611 err = fmt.Errorf("Ongoing transaction not found")
612 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
615 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
617 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
618 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
623 //-------------------------------------------------------------------
625 //-------------------------------------------------------------------
626 func typeofSubsMessage(v interface{}) string {
631 case *e2ap.E2APSubscriptionRequest:
633 case *e2ap.E2APSubscriptionResponse:
635 case *e2ap.E2APSubscriptionFailure:
637 case *e2ap.E2APSubscriptionDeleteRequest:
639 case *e2ap.E2APSubscriptionDeleteResponse:
641 case *e2ap.E2APSubscriptionDeleteFailure: