2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/strfmt"
29 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 var e2tSubReqTimeout time.Duration = 5 * time.Second
39 var e2tSubDelReqTime time.Duration = 5 * time.Second
40 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
41 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
43 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
50 rmrSendMutex sync.Mutex
68 xapp.Logger.Info("SUBMGR")
70 viper.SetEnvPrefix("submgr")
71 viper.AllowEmptyEnv(true)
74 func NewControl() *Control {
76 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
77 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
79 registry := new(Registry)
81 registry.rtmgrClient = &rtmgrClient
83 tracker := new(Tracker)
86 timerMap := new(TimerMap)
89 return &Control{e2ap: new(E2ap),
97 func (c *Control) Run() {
101 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
103 xapp.Logger.Info("%s: %s", desc, params.String())
106 for ; i <= 10 && status == false; i++ {
107 c.rmrSendMutex.Lock()
108 status = xapp.Rmr.Send(params.RMRParams, false)
109 c.rmrSendMutex.Unlock()
111 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
112 time.Sleep(500 * time.Millisecond)
116 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
117 xapp.Logger.Error("%s: %s", desc, err.Error())
118 xapp.Rmr.Free(params.Mbuf)
123 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
124 params := &RMRParams{&xapp.RMRParams{}}
125 params.Mtype = trans.GetMtype()
126 params.SubId = int(subs.GetSubId())
128 params.Meid = subs.GetMeid()
130 params.PayloadLen = len(trans.Payload.Buf)
131 params.Payload = trans.Payload.Buf
134 return c.rmrSendRaw(desc, params)
137 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
138 params := &RMRParams{&xapp.RMRParams{}}
139 params.Mtype = trans.GetMtype()
140 params.SubId = int(subs.GetSubId())
141 params.Xid = trans.GetXid()
142 params.Meid = trans.GetMeid()
144 params.PayloadLen = len(trans.Payload.Buf)
145 params.Payload = trans.Payload.Buf
148 return c.rmrSendRaw(desc, params)
151 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
152 xapp.Rmr.Free(params.Mbuf)
154 msg := &RMRParams{params}
157 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
158 go c.handleXAPPSubscriptionRequest(msg)
159 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
160 go c.handleE2TSubscriptionResponse(msg)
161 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
162 go c.handleE2TSubscriptionFailure(msg)
163 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
164 go c.handleXAPPSubscriptionDeleteRequest(msg)
165 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
166 go c.handleE2TSubscriptionDeleteResponse(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
168 go c.handleE2TSubscriptionDeleteFailure(msg)
170 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
176 var retval string = ""
177 var filler string = ""
179 retval += filler + trans.String()
183 retval += filler + subs.String()
187 retval += filler + "err(" + err.Error() + ")"
194 //-------------------------------------------------------------------
195 // handle from XAPP Subscription Request
196 //------------------------------------------------------------------
197 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
198 xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
200 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
202 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
206 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
208 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
211 defer trans.Release()
213 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
215 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
222 go c.handleSubscriptionCreate(subs, trans)
223 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
227 switch themsg := event.(type) {
228 case *e2ap.E2APSubscriptionResponse:
229 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
231 c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
234 case *e2ap.E2APSubscriptionFailure:
235 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
237 c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
244 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
247 //-------------------------------------------------------------------
248 // handle from XAPP Subscription Delete Request
249 //------------------------------------------------------------------
250 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
251 xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
253 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
255 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
259 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
261 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
264 defer trans.Release()
266 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
268 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
275 go c.handleSubscriptionDelete(subs, trans)
276 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
278 // Whatever is received send ok delete response
279 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
280 subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
281 subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
282 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
283 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
285 c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
289 //-------------------------------------------------------------------
290 // SUBS CREATE Handling
291 //-------------------------------------------------------------------
292 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
294 trans := c.tracker.NewTransaction(subs.GetMeid())
295 subs.WaitTransactionTurn(trans)
296 defer subs.ReleaseTransactionTurn(trans)
297 defer trans.Release()
299 xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
302 if subs.SubRespMsg != nil {
303 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate resp response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
304 parentTrans.SendEvent(subs.SubRespMsg, 0)
308 if subs.SubFailMsg != nil {
309 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate fail response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
310 parentTrans.SendEvent(subs.SubFailMsg, 0)
312 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
315 if subs.valid == false {
316 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate nil response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
317 parentTrans.SendEvent(nil, 0)
319 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
324 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
325 switch themsg := event.(type) {
326 case *e2ap.E2APSubscriptionResponse:
328 subs.SubRespMsg = themsg
330 parentTrans.SendEvent(event, 0)
332 case *e2ap.E2APSubscriptionFailure:
334 subs.SubFailMsg = themsg
336 parentTrans.SendEvent(event, 0)
338 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
342 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
343 parentTrans.SendEvent(nil, 0)
346 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
349 //-------------------------------------------------------------------
350 // SUBS DELETE Handling
351 //-------------------------------------------------------------------
353 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
355 trans := c.tracker.NewTransaction(subs.GetMeid())
356 subs.WaitTransactionTurn(trans)
357 defer subs.ReleaseTransactionTurn(trans)
358 defer trans.Release()
360 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
363 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
366 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
371 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
372 subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
373 subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
374 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
375 parentTrans.SendEvent(subDelRespMsg, 0)
377 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
380 //-------------------------------------------------------------------
381 // send to E2T Subscription Request
382 //-------------------------------------------------------------------
383 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
385 var event interface{} = nil
386 var timedOut bool = false
388 subReqMsg := subs.SubReqMsg
389 subReqMsg.RequestId.Id = 123
390 subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
391 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
393 xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
397 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
398 desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
399 c.rmrSend(desc, subs, trans)
400 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
406 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
410 //-------------------------------------------------------------------
411 // send to E2T Subscription Delete Request
412 //-------------------------------------------------------------------
414 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
416 var event interface{}
419 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
420 subDelReqMsg.RequestId.Id = 123
421 subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
422 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
423 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
425 xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
429 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
430 desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
431 c.rmrSend(desc, subs, trans)
432 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
438 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
442 //-------------------------------------------------------------------
443 // handle from E2T Subscription Reponse
444 //-------------------------------------------------------------------
445 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
446 xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
447 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
449 xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
452 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
454 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
457 trans := subs.GetTransaction()
459 err = fmt.Errorf("Ongoing transaction not found")
460 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
463 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
465 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
466 xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
471 //-------------------------------------------------------------------
472 // handle from E2T Subscription Failure
473 //-------------------------------------------------------------------
474 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
475 xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
476 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
478 xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
481 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
483 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
486 trans := subs.GetTransaction()
488 err = fmt.Errorf("Ongoing transaction not found")
489 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
492 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
494 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
495 xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
500 //-------------------------------------------------------------------
501 // handle from E2T Subscription Delete Response
502 //-------------------------------------------------------------------
503 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
504 xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
505 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
507 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
510 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
512 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
515 trans := subs.GetTransaction()
517 err = fmt.Errorf("Ongoing transaction not found")
518 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
521 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
523 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
524 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
529 //-------------------------------------------------------------------
530 // handle from E2T Subscription Delete Failure
531 //-------------------------------------------------------------------
532 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
533 xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
534 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
536 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
539 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
541 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
544 trans := subs.GetTransaction()
546 err = fmt.Errorf("Ongoing transaction not found")
547 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
550 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
552 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
553 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
558 //-------------------------------------------------------------------
560 //-------------------------------------------------------------------
561 func typeofSubsMessage(v interface{}) string {
566 case *e2ap.E2APSubscriptionRequest:
568 case *e2ap.E2APSubscriptionResponse:
570 case *e2ap.E2APSubscriptionFailure:
572 case *e2ap.E2APSubscriptionDeleteRequest:
574 case *e2ap.E2APSubscriptionDeleteResponse:
576 case *e2ap.E2APSubscriptionDeleteFailure: