2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 func idstring(err error, entries ...fmt.Stringer) string {
40 var retval string = ""
41 var filler string = ""
42 for _, entry := range entries {
43 retval += filler + entry.String()
47 retval += filler + "err(" + err.Error() + ")"
54 //-----------------------------------------------------------------------------
56 //-----------------------------------------------------------------------------
58 var e2tSubReqTimeout time.Duration = 5 * time.Second
59 var e2tSubDelReqTime time.Duration = 5 * time.Second
60 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
63 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
66 xapptweaks.XappWrapper
70 //subscriber *xapp.Subscriber
80 xapp.Logger.Info("SUBMGR")
82 viper.SetEnvPrefix("submgr")
83 viper.AllowEmptyEnv(true)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
91 registry := new(Registry)
93 registry.rtmgrClient = &rtmgrClient
95 tracker := new(Tracker)
98 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
100 c := &Control{e2ap: new(E2ap),
103 //subscriber: subscriber,
105 c.XappWrapper.Init("")
106 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
107 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
111 func (c *Control) ReadyCB(data interface{}) {
117 func (c *Control) Run() {
118 xapp.SetReadyCB(c.ReadyCB, nil)
122 //-------------------------------------------------------------------
124 //-------------------------------------------------------------------
125 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
127 switch p := params.(type) {
128 case *models.ReportParams:
129 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
131 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
134 defer trans.Release()
135 case *models.ControlParams:
136 case *models.PolicyParams:
139 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
142 func (c *Control) SubscriptionDeleteHandler(string) error {
143 return fmt.Errorf("Subscription rest interface not implemented")
146 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
147 return c.registry.QueryHandler()
150 //-------------------------------------------------------------------
152 //-------------------------------------------------------------------
154 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
155 params := xapptweaks.NewParams(nil)
156 params.Mtype = trans.GetMtype()
157 params.SubId = int(subs.GetReqId().InstanceId)
159 params.Meid = subs.GetMeid()
161 params.PayloadLen = len(trans.Payload.Buf)
162 params.Payload = trans.Payload.Buf
164 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
165 return c.RmrSend(params, 5)
168 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
170 params := xapptweaks.NewParams(nil)
171 params.Mtype = trans.GetMtype()
172 params.SubId = int(subs.GetReqId().InstanceId)
173 params.Xid = trans.GetXid()
174 params.Meid = trans.GetMeid()
176 params.PayloadLen = len(trans.Payload.Buf)
177 params.Payload = trans.Payload.Buf
179 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
180 return c.RmrSend(params, 5)
183 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
184 msg := xapptweaks.NewParams(params)
186 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
187 xapp.Logger.Error("%s", err.Error())
192 defer c.Rmr.Free(msg.Mbuf)
194 // xapp-frame might use direct access to c buffer and
195 // when msg.Mbuf is freed, someone might take it into use
196 // and payload data might be invalid inside message handle function
198 // subscriptions won't load system a lot so there is no
199 // real performance hit by cloning buffer into new go byte slice
200 cPay := append(msg.Payload[:0:0], msg.Payload...)
202 msg.PayloadLen = len(cPay)
205 case xapp.RIC_SUB_REQ:
206 go c.handleXAPPSubscriptionRequest(msg)
207 case xapp.RIC_SUB_RESP:
208 go c.handleE2TSubscriptionResponse(msg)
209 case xapp.RIC_SUB_FAILURE:
210 go c.handleE2TSubscriptionFailure(msg)
211 case xapp.RIC_SUB_DEL_REQ:
212 go c.handleXAPPSubscriptionDeleteRequest(msg)
213 case xapp.RIC_SUB_DEL_RESP:
214 go c.handleE2TSubscriptionDeleteResponse(msg)
215 case xapp.RIC_SUB_DEL_FAILURE:
216 go c.handleE2TSubscriptionDeleteFailure(msg)
218 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
223 //-------------------------------------------------------------------
224 // handle from XAPP Subscription Request
225 //------------------------------------------------------------------
226 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
227 xapp.Logger.Info("MSG from XAPP: %s", params.String())
229 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
231 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
235 trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
237 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
240 defer trans.Release()
242 err = c.tracker.Track(trans)
244 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
248 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
249 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
251 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
258 go c.handleSubscriptionCreate(subs, trans)
259 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
263 switch themsg := event.(type) {
264 case *e2ap.E2APSubscriptionResponse:
265 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
268 c.rmrSendToXapp("", subs, trans)
271 case *e2ap.E2APSubscriptionFailure:
272 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
274 c.rmrSendToXapp("", subs, trans)
280 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
281 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
284 //-------------------------------------------------------------------
285 // handle from XAPP Subscription Delete Request
286 //------------------------------------------------------------------
287 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
288 xapp.Logger.Info("MSG from XAPP: %s", params.String())
290 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
292 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
296 trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
298 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
301 defer trans.Release()
303 err = c.tracker.Track(trans)
305 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
309 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
311 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
318 go c.handleSubscriptionDelete(subs, trans)
319 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
321 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
323 // Whatever is received send ok delete response
324 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
325 subDelRespMsg.RequestId = subs.GetReqId().RequestId
326 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
327 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
329 c.rmrSendToXapp("", subs, trans)
332 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
333 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
336 //-------------------------------------------------------------------
337 // SUBS CREATE Handling
338 //-------------------------------------------------------------------
339 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
341 trans := c.tracker.NewSubsTransaction(subs)
342 subs.WaitTransactionTurn(trans)
343 defer subs.ReleaseTransactionTurn(trans)
344 defer trans.Release()
346 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
348 subRfMsg, valid := subs.GetCachedResponse()
349 if subRfMsg == nil && valid == true {
352 // In case of failure
353 // - make internal delete
354 // - in case duplicate cause, retry (currently max 1 retry)
356 maxRetries := uint64(1)
358 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
361 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
362 switch themsg := event.(type) {
363 case *e2ap.E2APSubscriptionResponse:
364 subRfMsg, valid = subs.SetCachedResponse(event, true)
365 case *e2ap.E2APSubscriptionFailure:
366 subRfMsg, valid = subs.SetCachedResponse(event, false)
368 for _, item := range themsg.ActionNotAdmittedList.Items {
369 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
374 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
375 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
377 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
378 subRfMsg, valid = subs.SetCachedResponse(nil, false)
379 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
383 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
385 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
388 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
390 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
392 parentTrans.SendEvent(subRfMsg, 0)
395 //-------------------------------------------------------------------
396 // SUBS DELETE Handling
397 //-------------------------------------------------------------------
399 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
401 trans := c.tracker.NewSubsTransaction(subs)
402 subs.WaitTransactionTurn(trans)
403 defer subs.ReleaseTransactionTurn(trans)
404 defer trans.Release()
406 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
409 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
412 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
416 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
417 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
418 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
419 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
420 parentTrans.SendEvent(nil, 0)
423 //-------------------------------------------------------------------
424 // send to E2T Subscription Request
425 //-------------------------------------------------------------------
426 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
428 var event interface{} = nil
429 var timedOut bool = false
431 subReqMsg := subs.SubReqMsg
432 subReqMsg.RequestId = subs.GetReqId().RequestId
433 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
435 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
439 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
440 desc := fmt.Sprintf("(retry %d)", retries)
441 c.rmrSendToE2T(desc, subs, trans)
442 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
448 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
452 //-------------------------------------------------------------------
453 // send to E2T Subscription Delete Request
454 //-------------------------------------------------------------------
456 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
458 var event interface{}
461 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
462 subDelReqMsg.RequestId = subs.GetReqId().RequestId
463 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
464 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
466 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
470 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
471 desc := fmt.Sprintf("(retry %d)", retries)
472 c.rmrSendToE2T(desc, subs, trans)
473 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
479 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
483 //-------------------------------------------------------------------
484 // handle from E2T Subscription Reponse
485 //-------------------------------------------------------------------
486 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
487 xapp.Logger.Info("MSG from E2T: %s", params.String())
488 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
490 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
493 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
495 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
498 trans := subs.GetTransaction()
500 err = fmt.Errorf("Ongoing transaction not found")
501 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
504 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
506 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
507 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
512 //-------------------------------------------------------------------
513 // handle from E2T Subscription Failure
514 //-------------------------------------------------------------------
515 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
516 xapp.Logger.Info("MSG from E2T: %s", params.String())
517 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
519 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
522 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
524 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
527 trans := subs.GetTransaction()
529 err = fmt.Errorf("Ongoing transaction not found")
530 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
533 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
535 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
536 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
541 //-------------------------------------------------------------------
542 // handle from E2T Subscription Delete Response
543 //-------------------------------------------------------------------
544 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
545 xapp.Logger.Info("MSG from E2T: %s", params.String())
546 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
548 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
551 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
553 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
556 trans := subs.GetTransaction()
558 err = fmt.Errorf("Ongoing transaction not found")
559 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
562 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
564 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
565 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
570 //-------------------------------------------------------------------
571 // handle from E2T Subscription Delete Failure
572 //-------------------------------------------------------------------
573 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
574 xapp.Logger.Info("MSG from E2T: %s", params.String())
575 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
577 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
580 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
582 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
585 trans := subs.GetTransaction()
587 err = fmt.Errorf("Ongoing transaction not found")
588 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
591 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
593 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
594 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
599 //-------------------------------------------------------------------
601 //-------------------------------------------------------------------
602 func typeofSubsMessage(v interface{}) string {
607 case *e2ap.E2APSubscriptionRequest:
609 case *e2ap.E2APSubscriptionResponse:
611 case *e2ap.E2APSubscriptionFailure:
613 case *e2ap.E2APSubscriptionDeleteRequest:
615 case *e2ap.E2APSubscriptionDeleteResponse:
617 case *e2ap.E2APSubscriptionDeleteFailure: