2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/strfmt"
29 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 var e2tSubReqTimeout time.Duration = 5 * time.Second
39 var e2tSubDelReqTime time.Duration = 5 * time.Second
40 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
41 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
43 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
50 rmrSendMutex sync.Mutex
61 xapp.Logger.Info("SUBMGR")
63 viper.SetEnvPrefix("submgr")
64 viper.AllowEmptyEnv(true)
67 func NewControl() *Control {
69 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
70 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
72 registry := new(Registry)
74 registry.rtmgrClient = &rtmgrClient
76 tracker := new(Tracker)
79 timerMap := new(TimerMap)
82 return &Control{e2ap: new(E2ap),
90 func (c *Control) Run() {
94 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
96 xapp.Logger.Info("%s: %s", desc, params.String())
99 for ; i <= 10 && status == false; i++ {
100 c.rmrSendMutex.Lock()
101 status = xapp.Rmr.Send(params.RMRParams, false)
102 c.rmrSendMutex.Unlock()
104 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
105 time.Sleep(500 * time.Millisecond)
109 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
110 xapp.Logger.Error("%s: %s", desc, err.Error())
111 xapp.Rmr.Free(params.Mbuf)
116 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
117 params := &RMRParams{&xapp.RMRParams{}}
118 params.Mtype = trans.GetMtype()
119 params.SubId = int(subs.GetReqId().Seq)
121 params.Meid = subs.GetMeid()
123 params.PayloadLen = len(trans.Payload.Buf)
124 params.Payload = trans.Payload.Buf
127 return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params)
130 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
132 params := &RMRParams{&xapp.RMRParams{}}
133 params.Mtype = trans.GetMtype()
134 params.SubId = int(subs.GetReqId().Seq)
135 params.Xid = trans.GetXid()
136 params.Meid = trans.GetMeid()
138 params.PayloadLen = len(trans.Payload.Buf)
139 params.Payload = trans.Payload.Buf
142 return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params)
145 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
146 xapp.Rmr.Free(params.Mbuf)
148 msg := &RMRParams{params}
152 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
153 go c.handleXAPPSubscriptionRequest(msg)
154 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
155 go c.handleE2TSubscriptionResponse(msg)
156 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
157 go c.handleE2TSubscriptionFailure(msg)
158 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
159 go c.handleXAPPSubscriptionDeleteRequest(msg)
160 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
161 go c.handleE2TSubscriptionDeleteResponse(msg)
162 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
163 go c.handleE2TSubscriptionDeleteFailure(msg)
165 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
171 func idstring(err error, entries ...fmt.Stringer) string {
172 var retval string = ""
173 var filler string = ""
174 for _, entry := range entries {
175 retval += filler + entry.String()
179 retval += filler + "err(" + err.Error() + ")"
186 //-------------------------------------------------------------------
187 // handle from XAPP Subscription Request
188 //------------------------------------------------------------------
189 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
190 xapp.Logger.Info("MSG from XAPP: %s", params.String())
192 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
194 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
198 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
200 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
203 defer trans.Release()
205 err = c.tracker.Track(trans)
207 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
211 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
213 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
220 go c.handleSubscriptionCreate(subs, trans)
221 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
225 switch themsg := event.(type) {
226 case *e2ap.E2APSubscriptionResponse:
227 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
229 c.rmrSendToXapp("", subs, trans)
232 case *e2ap.E2APSubscriptionFailure:
233 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
235 c.rmrSendToXapp("", subs, trans)
241 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
242 go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
245 //-------------------------------------------------------------------
246 // handle from XAPP Subscription Delete Request
247 //------------------------------------------------------------------
248 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
249 xapp.Logger.Info("MSG from XAPP: %s", params.String())
251 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
253 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
257 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
259 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
262 defer trans.Release()
264 err = c.tracker.Track(trans)
266 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
270 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
272 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
279 go c.handleSubscriptionDelete(subs, trans)
280 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
282 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
284 // Whatever is received send ok delete response
285 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
286 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
287 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
288 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
290 c.rmrSendToXapp("", subs, trans)
293 go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
296 //-------------------------------------------------------------------
297 // SUBS CREATE Handling
298 //-------------------------------------------------------------------
299 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
301 trans := c.tracker.NewSubsTransaction(subs)
302 subs.WaitTransactionTurn(trans)
303 defer subs.ReleaseTransactionTurn(trans)
304 defer trans.Release()
306 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
308 subRfMsg, valid := subs.GetCachedResponse()
309 if subRfMsg == nil && valid == true {
310 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
311 switch event.(type) {
312 case *e2ap.E2APSubscriptionResponse:
313 subRfMsg, valid = subs.SetCachedResponse(event, true)
314 case *e2ap.E2APSubscriptionFailure:
315 subRfMsg, valid = subs.SetCachedResponse(event, false)
317 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
318 subRfMsg, valid = subs.SetCachedResponse(nil, false)
319 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
321 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
323 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
326 parentTrans.SendEvent(subRfMsg, 0)
329 //-------------------------------------------------------------------
330 // SUBS DELETE Handling
331 //-------------------------------------------------------------------
333 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
335 trans := c.tracker.NewSubsTransaction(subs)
336 subs.WaitTransactionTurn(trans)
337 defer subs.ReleaseTransactionTurn(trans)
338 defer trans.Release()
340 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
343 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
346 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
351 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
352 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
353 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
354 parentTrans.SendEvent(subDelRespMsg, 0)
357 //-------------------------------------------------------------------
358 // send to E2T Subscription Request
359 //-------------------------------------------------------------------
360 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
362 var event interface{} = nil
363 var timedOut bool = false
365 subReqMsg := subs.SubReqMsg
366 subReqMsg.RequestId = subs.GetReqId().RequestId
367 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
369 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
373 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
374 desc := fmt.Sprintf("(retry %d)", retries)
375 c.rmrSendToE2T(desc, subs, trans)
376 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
382 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
386 //-------------------------------------------------------------------
387 // send to E2T Subscription Delete Request
388 //-------------------------------------------------------------------
390 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
392 var event interface{}
395 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
396 subDelReqMsg.RequestId = subs.GetReqId().RequestId
397 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
398 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
400 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
404 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
405 desc := fmt.Sprintf("(retry %d)", retries)
406 c.rmrSendToE2T(desc, subs, trans)
407 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
413 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
417 //-------------------------------------------------------------------
418 // handle from E2T Subscription Reponse
419 //-------------------------------------------------------------------
420 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
421 xapp.Logger.Info("MSG from E2T: %s", params.String())
422 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
424 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
427 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
429 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
432 trans := subs.GetTransaction()
434 err = fmt.Errorf("Ongoing transaction not found")
435 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
438 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
440 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
441 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
446 //-------------------------------------------------------------------
447 // handle from E2T Subscription Failure
448 //-------------------------------------------------------------------
449 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
450 xapp.Logger.Info("MSG from E2T: %s", params.String())
451 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
453 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
456 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
458 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
461 trans := subs.GetTransaction()
463 err = fmt.Errorf("Ongoing transaction not found")
464 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
467 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
469 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
470 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
475 //-------------------------------------------------------------------
476 // handle from E2T Subscription Delete Response
477 //-------------------------------------------------------------------
478 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
479 xapp.Logger.Info("MSG from E2T: %s", params.String())
480 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
482 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
485 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
487 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
490 trans := subs.GetTransaction()
492 err = fmt.Errorf("Ongoing transaction not found")
493 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
496 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
498 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
499 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
504 //-------------------------------------------------------------------
505 // handle from E2T Subscription Delete Failure
506 //-------------------------------------------------------------------
507 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
508 xapp.Logger.Info("MSG from E2T: %s", params.String())
509 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
511 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
514 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
516 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
519 trans := subs.GetTransaction()
521 err = fmt.Errorf("Ongoing transaction not found")
522 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
525 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
527 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
528 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
533 //-------------------------------------------------------------------
535 //-------------------------------------------------------------------
536 func typeofSubsMessage(v interface{}) string {
541 case *e2ap.E2APSubscriptionRequest:
543 case *e2ap.E2APSubscriptionResponse:
545 case *e2ap.E2APSubscriptionFailure:
547 case *e2ap.E2APSubscriptionDeleteRequest:
549 case *e2ap.E2APSubscriptionDeleteResponse:
551 case *e2ap.E2APSubscriptionDeleteFailure: