2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration
58 var e2tSubDelReqTime time.Duration
59 var e2tRecvMsgTimeout time.Duration
60 var e2tMaxSubReqTryCount uint64 // Initial try + retry
61 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 //subscriber *xapp.Subscriber
79 xapp.Logger.Info("SUBMGR")
81 viper.SetEnvPrefix("submgr")
82 viper.AllowEmptyEnv(true)
85 func NewControl() *Control {
87 // viper.GetDuration returns nanoseconds
88 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
89 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
90 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
91 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
92 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
93 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
94 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
95 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
96 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
97 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
99 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
100 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102 registry := new(Registry)
103 registry.Initialize()
104 registry.rtmgrClient = &rtmgrClient
106 tracker := new(Tracker)
109 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
111 c := &Control{e2ap: new(E2ap),
114 //subscriber: subscriber,
116 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
117 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
121 func (c *Control) ReadyCB(data interface{}) {
122 if c.RMRClient == nil {
123 c.RMRClient = xapp.Rmr
127 func (c *Control) Run() {
128 xapp.SetReadyCB(c.ReadyCB, nil)
132 //-------------------------------------------------------------------
134 //-------------------------------------------------------------------
135 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
137 switch p := params.(type) {
138 case *models.ReportParams:
139 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
141 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
144 defer trans.Release()
145 case *models.ControlParams:
146 case *models.PolicyParams:
149 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
152 func (c *Control) SubscriptionDeleteHandler(string) error {
153 return fmt.Errorf("Subscription rest interface not implemented")
156 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
157 return c.registry.QueryHandler()
160 //-------------------------------------------------------------------
162 //-------------------------------------------------------------------
164 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
165 params := &xapp.RMRParams{}
166 params.Mtype = trans.GetMtype()
167 params.SubId = int(subs.GetReqId().InstanceId)
169 params.Meid = subs.GetMeid()
171 params.PayloadLen = len(trans.Payload.Buf)
172 params.Payload = trans.Payload.Buf
174 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
175 return c.SendWithRetry(params, false, 5)
178 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
180 params := &xapp.RMRParams{}
181 params.Mtype = trans.GetMtype()
182 params.SubId = int(subs.GetReqId().InstanceId)
183 params.Xid = trans.GetXid()
184 params.Meid = trans.GetMeid()
186 params.PayloadLen = len(trans.Payload.Buf)
187 params.Payload = trans.Payload.Buf
189 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
190 return c.SendWithRetry(params, false, 5)
193 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
194 if c.RMRClient == nil {
195 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
196 xapp.Logger.Error("%s", err.Error())
201 defer c.RMRClient.Free(msg.Mbuf)
203 // xapp-frame might use direct access to c buffer and
204 // when msg.Mbuf is freed, someone might take it into use
205 // and payload data might be invalid inside message handle function
207 // subscriptions won't load system a lot so there is no
208 // real performance hit by cloning buffer into new go byte slice
209 cPay := append(msg.Payload[:0:0], msg.Payload...)
211 msg.PayloadLen = len(cPay)
214 case xapp.RIC_SUB_REQ:
215 go c.handleXAPPSubscriptionRequest(msg)
216 case xapp.RIC_SUB_RESP:
217 go c.handleE2TSubscriptionResponse(msg)
218 case xapp.RIC_SUB_FAILURE:
219 go c.handleE2TSubscriptionFailure(msg)
220 case xapp.RIC_SUB_DEL_REQ:
221 go c.handleXAPPSubscriptionDeleteRequest(msg)
222 case xapp.RIC_SUB_DEL_RESP:
223 go c.handleE2TSubscriptionDeleteResponse(msg)
224 case xapp.RIC_SUB_DEL_FAILURE:
225 go c.handleE2TSubscriptionDeleteFailure(msg)
227 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
232 //-------------------------------------------------------------------
233 // handle from XAPP Subscription Request
234 //------------------------------------------------------------------
235 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
236 xapp.Logger.Info("MSG from XAPP: %s", params.String())
238 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
240 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
244 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
246 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
249 defer trans.Release()
251 err = c.tracker.Track(trans)
253 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
257 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
258 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
260 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
267 go c.handleSubscriptionCreate(subs, trans)
268 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
272 switch themsg := event.(type) {
273 case *e2ap.E2APSubscriptionResponse:
274 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
277 c.rmrSendToXapp("", subs, trans)
280 case *e2ap.E2APSubscriptionFailure:
281 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
283 c.rmrSendToXapp("", subs, trans)
289 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
290 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
293 //-------------------------------------------------------------------
294 // handle from XAPP Subscription Delete Request
295 //------------------------------------------------------------------
296 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
297 xapp.Logger.Info("MSG from XAPP: %s", params.String())
299 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
301 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
305 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
307 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
310 defer trans.Release()
312 err = c.tracker.Track(trans)
314 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
318 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
320 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
327 go c.handleSubscriptionDelete(subs, trans)
328 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
330 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
332 // Whatever is received send ok delete response
333 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
334 subDelRespMsg.RequestId = subs.GetReqId().RequestId
335 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
336 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
338 c.rmrSendToXapp("", subs, trans)
341 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
342 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
345 //-------------------------------------------------------------------
346 // SUBS CREATE Handling
347 //-------------------------------------------------------------------
348 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
350 trans := c.tracker.NewSubsTransaction(subs)
351 subs.WaitTransactionTurn(trans)
352 defer subs.ReleaseTransactionTurn(trans)
353 defer trans.Release()
355 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
357 subRfMsg, valid := subs.GetCachedResponse()
358 if subRfMsg == nil && valid == true {
361 // In case of failure
362 // - make internal delete
363 // - in case duplicate cause, retry (currently max 1 retry)
365 maxRetries := uint64(1)
367 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
370 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
371 switch themsg := event.(type) {
372 case *e2ap.E2APSubscriptionResponse:
373 subRfMsg, valid = subs.SetCachedResponse(event, true)
374 case *e2ap.E2APSubscriptionFailure:
375 subRfMsg, valid = subs.SetCachedResponse(event, false)
377 for _, item := range themsg.ActionNotAdmittedList.Items {
378 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
383 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
384 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
386 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
387 subRfMsg, valid = subs.SetCachedResponse(nil, false)
388 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
392 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
394 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
397 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
399 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
401 parentTrans.SendEvent(subRfMsg, 0)
404 //-------------------------------------------------------------------
405 // SUBS DELETE Handling
406 //-------------------------------------------------------------------
408 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
410 trans := c.tracker.NewSubsTransaction(subs)
411 subs.WaitTransactionTurn(trans)
412 defer subs.ReleaseTransactionTurn(trans)
413 defer trans.Release()
415 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
418 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
421 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
425 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
426 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
427 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
428 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
429 parentTrans.SendEvent(nil, 0)
432 //-------------------------------------------------------------------
433 // send to E2T Subscription Request
434 //-------------------------------------------------------------------
435 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
437 var event interface{} = nil
438 var timedOut bool = false
440 subReqMsg := subs.SubReqMsg
441 subReqMsg.RequestId = subs.GetReqId().RequestId
442 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
444 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
448 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
449 desc := fmt.Sprintf("(retry %d)", retries)
450 c.rmrSendToE2T(desc, subs, trans)
451 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
457 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
461 //-------------------------------------------------------------------
462 // send to E2T Subscription Delete Request
463 //-------------------------------------------------------------------
465 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
467 var event interface{}
470 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
471 subDelReqMsg.RequestId = subs.GetReqId().RequestId
472 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
473 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
475 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
479 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
480 desc := fmt.Sprintf("(retry %d)", retries)
481 c.rmrSendToE2T(desc, subs, trans)
482 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
488 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
492 //-------------------------------------------------------------------
493 // handle from E2T Subscription Reponse
494 //-------------------------------------------------------------------
495 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
496 xapp.Logger.Info("MSG from E2T: %s", params.String())
497 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
499 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
502 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
504 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
507 trans := subs.GetTransaction()
509 err = fmt.Errorf("Ongoing transaction not found")
510 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
513 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
515 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
516 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
521 //-------------------------------------------------------------------
522 // handle from E2T Subscription Failure
523 //-------------------------------------------------------------------
524 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
525 xapp.Logger.Info("MSG from E2T: %s", params.String())
526 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
528 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
531 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
533 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
536 trans := subs.GetTransaction()
538 err = fmt.Errorf("Ongoing transaction not found")
539 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
542 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
544 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
545 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
550 //-------------------------------------------------------------------
551 // handle from E2T Subscription Delete Response
552 //-------------------------------------------------------------------
553 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
554 xapp.Logger.Info("MSG from E2T: %s", params.String())
555 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
557 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
560 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
562 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
565 trans := subs.GetTransaction()
567 err = fmt.Errorf("Ongoing transaction not found")
568 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
571 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
573 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
574 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
579 //-------------------------------------------------------------------
580 // handle from E2T Subscription Delete Failure
581 //-------------------------------------------------------------------
582 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
583 xapp.Logger.Info("MSG from E2T: %s", params.String())
584 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
586 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
589 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
591 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
594 trans := subs.GetTransaction()
596 err = fmt.Errorf("Ongoing transaction not found")
597 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
600 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
602 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
603 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
608 //-------------------------------------------------------------------
610 //-------------------------------------------------------------------
611 func typeofSubsMessage(v interface{}) string {
616 case *e2ap.E2APSubscriptionRequest:
618 case *e2ap.E2APSubscriptionResponse:
620 case *e2ap.E2APSubscriptionFailure:
622 case *e2ap.E2APSubscriptionDeleteRequest:
624 case *e2ap.E2APSubscriptionDeleteResponse:
626 case *e2ap.E2APSubscriptionDeleteFailure: