2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
69 //subscriber *xapp.Subscriber
80 xapp.Logger.Info("SUBMGR")
82 viper.SetEnvPrefix("submgr")
83 viper.AllowEmptyEnv(true)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
91 registry := new(Registry)
93 registry.rtmgrClient = &rtmgrClient
95 tracker := new(Tracker)
98 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
100 c := &Control{e2ap: new(E2ap),
103 //subscriber: subscriber,
105 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
106 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
110 func (c *Control) ReadyCB(data interface{}) {
111 if c.RMRClient == nil {
112 c.RMRClient = xapp.Rmr
116 func (c *Control) Run() {
117 xapp.SetReadyCB(c.ReadyCB, nil)
121 //-------------------------------------------------------------------
123 //-------------------------------------------------------------------
124 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
126 switch p := params.(type) {
127 case *models.ReportParams:
128 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
130 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
133 defer trans.Release()
134 case *models.ControlParams:
135 case *models.PolicyParams:
138 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
141 func (c *Control) SubscriptionDeleteHandler(string) error {
142 return fmt.Errorf("Subscription rest interface not implemented")
145 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
146 return c.registry.QueryHandler()
149 //-------------------------------------------------------------------
151 //-------------------------------------------------------------------
153 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
154 params := &xapp.RMRParams{}
155 params.Mtype = trans.GetMtype()
156 params.SubId = int(subs.GetReqId().InstanceId)
158 params.Meid = subs.GetMeid()
160 params.PayloadLen = len(trans.Payload.Buf)
161 params.Payload = trans.Payload.Buf
163 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
164 return c.SendWithRetry(params, false, 5)
167 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
169 params := &xapp.RMRParams{}
170 params.Mtype = trans.GetMtype()
171 params.SubId = int(subs.GetReqId().InstanceId)
172 params.Xid = trans.GetXid()
173 params.Meid = trans.GetMeid()
175 params.PayloadLen = len(trans.Payload.Buf)
176 params.Payload = trans.Payload.Buf
178 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
179 return c.SendWithRetry(params, false, 5)
182 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
183 if c.RMRClient == nil {
184 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
185 xapp.Logger.Error("%s", err.Error())
190 defer c.RMRClient.Free(msg.Mbuf)
192 // xapp-frame might use direct access to c buffer and
193 // when msg.Mbuf is freed, someone might take it into use
194 // and payload data might be invalid inside message handle function
196 // subscriptions won't load system a lot so there is no
197 // real performance hit by cloning buffer into new go byte slice
198 cPay := append(msg.Payload[:0:0], msg.Payload...)
200 msg.PayloadLen = len(cPay)
203 case xapp.RIC_SUB_REQ:
204 go c.handleXAPPSubscriptionRequest(msg)
205 case xapp.RIC_SUB_RESP:
206 go c.handleE2TSubscriptionResponse(msg)
207 case xapp.RIC_SUB_FAILURE:
208 go c.handleE2TSubscriptionFailure(msg)
209 case xapp.RIC_SUB_DEL_REQ:
210 go c.handleXAPPSubscriptionDeleteRequest(msg)
211 case xapp.RIC_SUB_DEL_RESP:
212 go c.handleE2TSubscriptionDeleteResponse(msg)
213 case xapp.RIC_SUB_DEL_FAILURE:
214 go c.handleE2TSubscriptionDeleteFailure(msg)
216 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
221 //-------------------------------------------------------------------
222 // handle from XAPP Subscription Request
223 //------------------------------------------------------------------
224 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
225 xapp.Logger.Info("MSG from XAPP: %s", params.String())
227 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
229 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
233 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
235 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
238 defer trans.Release()
240 err = c.tracker.Track(trans)
242 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
246 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
247 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
249 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
256 go c.handleSubscriptionCreate(subs, trans)
257 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
261 switch themsg := event.(type) {
262 case *e2ap.E2APSubscriptionResponse:
263 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
266 c.rmrSendToXapp("", subs, trans)
269 case *e2ap.E2APSubscriptionFailure:
270 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
272 c.rmrSendToXapp("", subs, trans)
278 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
279 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
282 //-------------------------------------------------------------------
283 // handle from XAPP Subscription Delete Request
284 //------------------------------------------------------------------
285 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
286 xapp.Logger.Info("MSG from XAPP: %s", params.String())
288 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
290 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
294 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
296 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
299 defer trans.Release()
301 err = c.tracker.Track(trans)
303 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
307 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
309 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
316 go c.handleSubscriptionDelete(subs, trans)
317 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
319 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
321 // Whatever is received send ok delete response
322 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
323 subDelRespMsg.RequestId = subs.GetReqId().RequestId
324 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
325 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
327 c.rmrSendToXapp("", subs, trans)
330 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
331 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
334 //-------------------------------------------------------------------
335 // SUBS CREATE Handling
336 //-------------------------------------------------------------------
337 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
339 trans := c.tracker.NewSubsTransaction(subs)
340 subs.WaitTransactionTurn(trans)
341 defer subs.ReleaseTransactionTurn(trans)
342 defer trans.Release()
344 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
346 subRfMsg, valid := subs.GetCachedResponse()
347 if subRfMsg == nil && valid == true {
350 // In case of failure
351 // - make internal delete
352 // - in case duplicate cause, retry (currently max 1 retry)
354 maxRetries := uint64(1)
356 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
359 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
360 switch themsg := event.(type) {
361 case *e2ap.E2APSubscriptionResponse:
362 subRfMsg, valid = subs.SetCachedResponse(event, true)
363 case *e2ap.E2APSubscriptionFailure:
364 subRfMsg, valid = subs.SetCachedResponse(event, false)
366 for _, item := range themsg.ActionNotAdmittedList.Items {
367 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
372 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
373 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
375 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
376 subRfMsg, valid = subs.SetCachedResponse(nil, false)
377 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
381 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
383 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
386 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
388 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
390 parentTrans.SendEvent(subRfMsg, 0)
393 //-------------------------------------------------------------------
394 // SUBS DELETE Handling
395 //-------------------------------------------------------------------
397 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
399 trans := c.tracker.NewSubsTransaction(subs)
400 subs.WaitTransactionTurn(trans)
401 defer subs.ReleaseTransactionTurn(trans)
402 defer trans.Release()
404 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
407 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
410 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
414 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
415 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
416 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
417 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
418 parentTrans.SendEvent(nil, 0)
421 //-------------------------------------------------------------------
422 // send to E2T Subscription Request
423 //-------------------------------------------------------------------
424 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
426 var event interface{} = nil
427 var timedOut bool = false
429 subReqMsg := subs.SubReqMsg
430 subReqMsg.RequestId = subs.GetReqId().RequestId
431 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
433 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
437 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
438 desc := fmt.Sprintf("(retry %d)", retries)
439 c.rmrSendToE2T(desc, subs, trans)
440 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
446 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
450 //-------------------------------------------------------------------
451 // send to E2T Subscription Delete Request
452 //-------------------------------------------------------------------
454 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
456 var event interface{}
459 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
460 subDelReqMsg.RequestId = subs.GetReqId().RequestId
461 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
462 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
464 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
468 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
469 desc := fmt.Sprintf("(retry %d)", retries)
470 c.rmrSendToE2T(desc, subs, trans)
471 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
477 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
481 //-------------------------------------------------------------------
482 // handle from E2T Subscription Reponse
483 //-------------------------------------------------------------------
484 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
485 xapp.Logger.Info("MSG from E2T: %s", params.String())
486 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
488 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
491 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
493 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
496 trans := subs.GetTransaction()
498 err = fmt.Errorf("Ongoing transaction not found")
499 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
502 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
504 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
505 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
510 //-------------------------------------------------------------------
511 // handle from E2T Subscription Failure
512 //-------------------------------------------------------------------
513 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
514 xapp.Logger.Info("MSG from E2T: %s", params.String())
515 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
517 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
520 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
522 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
525 trans := subs.GetTransaction()
527 err = fmt.Errorf("Ongoing transaction not found")
528 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
531 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
533 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
534 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
539 //-------------------------------------------------------------------
540 // handle from E2T Subscription Delete Response
541 //-------------------------------------------------------------------
542 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
543 xapp.Logger.Info("MSG from E2T: %s", params.String())
544 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
546 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
549 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
551 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
554 trans := subs.GetTransaction()
556 err = fmt.Errorf("Ongoing transaction not found")
557 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
560 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
562 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
563 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
568 //-------------------------------------------------------------------
569 // handle from E2T Subscription Delete Failure
570 //-------------------------------------------------------------------
571 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
572 xapp.Logger.Info("MSG from E2T: %s", params.String())
573 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
575 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
578 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
580 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
583 trans := subs.GetTransaction()
585 err = fmt.Errorf("Ongoing transaction not found")
586 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
589 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
591 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
592 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
597 //-------------------------------------------------------------------
599 //-------------------------------------------------------------------
600 func typeofSubsMessage(v interface{}) string {
605 case *e2ap.E2APSubscriptionRequest:
607 case *e2ap.E2APSubscriptionResponse:
609 case *e2ap.E2APSubscriptionFailure:
611 case *e2ap.E2APSubscriptionDeleteRequest:
613 case *e2ap.E2APSubscriptionDeleteResponse:
615 case *e2ap.E2APSubscriptionDeleteFailure: