2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 var subReqTime time.Duration = 5 * time.Second
40 var subDelReqTime time.Duration = 5 * time.Second
41 var maxSubReqTryCount uint64 = 2 // Initial try + retry
42 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
49 rmrSendMutex sync.Mutex
67 xapp.Logger.Info("SUBMGR")
69 viper.SetEnvPrefix("submgr")
70 viper.AllowEmptyEnv(true)
73 func NewControl() *Control {
75 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
76 client := rtmgrclient.New(transport, strfmt.Default)
77 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
78 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
79 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
81 registry := new(Registry)
83 registry.rtmgrClient = &rtmgrClient
85 tracker := new(Tracker)
88 timerMap := new(TimerMap)
91 return &Control{e2ap: new(E2ap),
99 func (c *Control) Run() {
103 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
105 xapp.Logger.Info("%s: %s", desc, params.String())
108 for ; i <= 10 && status == false; i++ {
109 c.rmrSendMutex.Lock()
110 status = xapp.Rmr.Send(params.RMRParams, false)
111 c.rmrSendMutex.Unlock()
113 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
114 time.Sleep(500 * time.Millisecond)
118 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
119 xapp.Logger.Error("%s: %s", desc, err.Error())
120 xapp.Rmr.Free(params.Mbuf)
125 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
126 params := &RMRParams{&xapp.RMRParams{}}
127 params.Mtype = trans.GetMtype()
128 params.SubId = int(subs.GetSubId())
130 params.Meid = subs.GetMeid()
132 params.PayloadLen = len(trans.Payload.Buf)
133 params.Payload = trans.Payload.Buf
136 return c.rmrSendRaw(desc, params)
139 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
140 params := &RMRParams{&xapp.RMRParams{}}
141 params.Mtype = trans.GetMtype()
142 params.SubId = int(subs.GetSubId())
143 params.Xid = trans.GetXid()
144 params.Meid = trans.GetMeid()
146 params.PayloadLen = len(trans.Payload.Buf)
147 params.Payload = trans.Payload.Buf
150 return c.rmrSendRaw(desc, params)
153 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
154 xapp.Rmr.Free(params.Mbuf)
156 msg := &RMRParams{params}
159 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
160 go c.handleSubscriptionRequest(msg)
161 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
162 go c.handleSubscriptionResponse(msg)
163 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
164 go c.handleSubscriptionFailure(msg)
165 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
166 go c.handleSubscriptionDeleteRequest(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
168 go c.handleSubscriptionDeleteResponse(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
170 go c.handleSubscriptionDeleteFailure(msg)
172 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
177 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
178 var retval string = ""
179 var filler string = ""
181 retval += filler + trans.String()
185 retval += filler + subs.String()
189 retval += filler + "err(" + err.Error() + ")"
195 func (c *Control) findSubs(ids []int) (*Subscription, error) {
196 var subs *Subscription = nil
197 for _, id := range ids {
199 subs = c.registry.GetSubscription(uint16(id))
206 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
211 func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) {
212 subs, err := c.findSubs(ids)
216 trans := subs.GetTransaction()
218 return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil))
220 return subs, trans, nil
223 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
224 xapp.Logger.Info("SubReq from xapp: %s", params.String())
226 SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
228 xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
232 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
234 xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
237 trans.SubReqMsg = SubReqMsg
239 subs, err := c.registry.ReserveSubscription(trans.Meid)
241 xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err))
246 err = subs.SetTransaction(trans)
248 xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err))
253 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
255 xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil))
258 // TODO: subscription create is in fact owned by subscription and not transaction.
259 // Transaction is toward xapp while Subscription is toward ran.
260 // In merge several xapps may wake transactions, while only one subscription
261 // toward ran occurs -> subscription owns subscription creation toward ran
263 // This is intermediate solution while improving message handling
265 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
267 xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err))
273 c.rmrSend("SubReq: SubReq to E2T", subs, trans)
274 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
278 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
279 xapp.Logger.Info("SubResp from E2T: %s", params.String())
281 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
283 xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err))
287 subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId})
289 xapp.Logger.Error("SubResp: %s", idstring(params, nil, err))
292 trans.SubRespMsg = SubRespMsg
293 xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil))
295 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
297 responseReceived := trans.CheckResponseReceived()
298 if responseReceived == true {
299 // Subscription timer already received
303 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
305 xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err))
312 c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans)
316 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
317 xapp.Logger.Info("SubFail from E2T: %s", params.String())
319 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
321 xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err))
325 subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId})
327 xapp.Logger.Error("SubFail: %s", idstring(params, nil, err))
330 trans.SubFailMsg = SubFailMsg
331 xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil))
333 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
334 responseReceived := trans.CheckResponseReceived()
335 if responseReceived == true {
336 // Subscription timer already received
340 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
342 c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans)
343 time.Sleep(3 * time.Second)
345 //TODO error handling improvement
346 xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err))
354 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
355 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
357 subs, trans, err := c.findSubsAndTrans(([]int{nbrId}))
359 xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err))
362 xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil))
364 responseReceived := trans.CheckResponseReceived()
365 if responseReceived == true {
366 // Subscription Response or Failure already received
370 if tryCount < maxSubReqTryCount {
371 xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil))
373 trans.RetryTransaction()
375 c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans)
378 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
382 // Release CREATE transaction
385 // Create DELETE transaction (internal and no messages toward xapp)
386 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false)
388 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
389 //TODO improve error handling. Important at least in merge
394 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
395 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
396 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
397 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
398 deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
400 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
401 //TODO improve error handling. Important at least in merge
407 err = subs.SetTransaction(deltrans)
409 xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
410 //TODO improve error handling. Important at least in merge
415 c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans)
416 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
420 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
421 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
423 SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
425 xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
429 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
431 xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
434 trans.SubDelReqMsg = SubDelReqMsg
436 subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId})
438 xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err))
443 err = subs.SetTransaction(trans)
445 xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
450 xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil))
453 // TODO: subscription delete is in fact owned by subscription and not transaction.
454 // Transaction is toward xapp while Subscription is toward ran.
455 // In merge several xapps may wake transactions, while only one subscription
456 // toward ran occurs -> subscription owns subscription creation toward ran
458 // This is intermediate solution while improving message handling
460 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
462 xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
469 c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans)
471 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
475 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
476 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
478 SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
480 xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err))
484 subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId})
486 xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err))
489 trans.SubDelRespMsg = SubDelRespMsg
490 xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil))
492 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
494 responseReceived := trans.CheckResponseReceived()
495 if responseReceived == true {
496 // Subscription Delete timer already received
500 c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
504 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
505 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
507 SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
509 xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err))
513 subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId})
515 xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err))
518 trans.SubDelFailMsg = SubDelFailMsg
519 xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil))
521 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
523 responseReceived := trans.CheckResponseReceived()
524 if responseReceived == true {
525 // Subscription Delete timer already received
529 c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
533 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
534 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
536 subs, trans, err := c.findSubsAndTrans([]int{nbrId})
538 xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err))
541 xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil))
543 responseReceived := trans.CheckResponseReceived()
544 if responseReceived == true {
545 // Subscription Delete Response or Failure already received
549 if tryCount < maxSubDelReqTryCount {
550 // Set possible to handle new response for the subId
551 trans.RetryTransaction()
552 c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans)
554 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
558 c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
562 func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
564 if trans.ForwardRespToXapp == true {
565 //Always generate SubDelResp
566 trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
567 trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
568 trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
569 trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
572 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
574 c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans)
575 time.Sleep(3 * time.Second)
577 //TODO error handling improvement
578 xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err))