2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 var e2tSubReqTimeout time.Duration = 5 * time.Second
40 var e2tSubDelReqTime time.Duration = 5 * time.Second
41 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
42 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
44 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
51 rmrSendMutex sync.Mutex
69 xapp.Logger.Info("SUBMGR")
71 viper.SetEnvPrefix("submgr")
72 viper.AllowEmptyEnv(true)
75 func NewControl() *Control {
77 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
78 client := rtmgrclient.New(transport, strfmt.Default)
79 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
80 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
81 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
83 registry := new(Registry)
85 registry.rtmgrClient = &rtmgrClient
87 tracker := new(Tracker)
90 timerMap := new(TimerMap)
93 return &Control{e2ap: new(E2ap),
101 func (c *Control) Run() {
105 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
107 xapp.Logger.Info("%s: %s", desc, params.String())
110 for ; i <= 10 && status == false; i++ {
111 c.rmrSendMutex.Lock()
112 status = xapp.Rmr.Send(params.RMRParams, false)
113 c.rmrSendMutex.Unlock()
115 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
116 time.Sleep(500 * time.Millisecond)
120 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
121 xapp.Logger.Error("%s: %s", desc, err.Error())
122 xapp.Rmr.Free(params.Mbuf)
127 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
128 params := &RMRParams{&xapp.RMRParams{}}
129 params.Mtype = trans.GetMtype()
130 params.SubId = int(subs.GetSubId())
132 params.Meid = subs.GetMeid()
134 params.PayloadLen = len(trans.Payload.Buf)
135 params.Payload = trans.Payload.Buf
138 return c.rmrSendRaw(desc, params)
141 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
142 params := &RMRParams{&xapp.RMRParams{}}
143 params.Mtype = trans.GetMtype()
144 params.SubId = int(subs.GetSubId())
145 params.Xid = trans.GetXid()
146 params.Meid = trans.GetMeid()
148 params.PayloadLen = len(trans.Payload.Buf)
149 params.Payload = trans.Payload.Buf
152 return c.rmrSendRaw(desc, params)
155 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
156 xapp.Rmr.Free(params.Mbuf)
158 msg := &RMRParams{params}
161 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162 go c.handleXAPPSubscriptionRequest(msg)
163 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164 go c.handleE2TSubscriptionResponse(msg)
165 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166 go c.handleE2TSubscriptionFailure(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168 go c.handleXAPPSubscriptionDeleteRequest(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170 go c.handleE2TSubscriptionDeleteResponse(msg)
171 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172 go c.handleE2TSubscriptionDeleteFailure(msg)
174 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
179 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
180 var retval string = ""
181 var filler string = ""
183 retval += filler + trans.String()
187 retval += filler + subs.String()
191 retval += filler + "err(" + err.Error() + ")"
198 //-------------------------------------------------------------------
199 // handle from XAPP Subscription Request
200 //------------------------------------------------------------------
201 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
202 xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
204 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
206 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
210 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
212 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
215 defer trans.Release()
217 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
219 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
223 if subs.IsTransactionReserved() {
224 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
225 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
232 go c.handleSubscriptionCreate(subs, trans)
233 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
237 switch themsg := event.(type) {
238 case *e2ap.E2APSubscriptionResponse:
239 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
241 c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
244 case *e2ap.E2APSubscriptionFailure:
245 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
247 c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
254 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
257 //-------------------------------------------------------------------
258 // handle from XAPP Subscription Delete Request
259 //------------------------------------------------------------------
260 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
261 xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
263 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
265 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
269 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
271 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
274 defer trans.Release()
276 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
278 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
282 if subs.IsTransactionReserved() {
283 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
284 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
291 go c.handleSubscriptionDelete(subs, trans)
292 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
294 // Whatever is received send ok delete response
295 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
296 subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
297 subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
298 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
299 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
301 c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
305 //-------------------------------------------------------------------
306 // SUBS CREATE Handling
307 //-------------------------------------------------------------------
308 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
310 trans := c.tracker.NewTransaction(subs.GetMeid())
311 subs.WaitTransactionTurn(trans)
312 defer subs.ReleaseTransactionTurn(trans)
313 defer trans.Release()
315 xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
317 if subs.SubRespMsg != nil {
318 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
319 parentTrans.SendEvent(subs.SubRespMsg, 0)
323 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
324 switch themsg := event.(type) {
325 case *e2ap.E2APSubscriptionResponse:
326 subs.SubRespMsg = themsg
327 parentTrans.SendEvent(event, 0)
329 case *e2ap.E2APSubscriptionFailure:
330 //TODO: Possible delete and one retry for subs req
331 parentTrans.SendEvent(event, 0)
333 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
334 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
335 parentTrans.SendEvent(nil, 0)
338 subs.DelEndpoint(parentTrans.GetEndpoint())
341 //-------------------------------------------------------------------
342 // SUBS DELETE Handling
343 //-------------------------------------------------------------------
344 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
346 trans := c.tracker.NewTransaction(subs.GetMeid())
347 subs.WaitTransactionTurn(trans)
348 defer subs.ReleaseTransactionTurn(trans)
349 defer trans.Release()
351 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
353 event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
355 parentTrans.SendEvent(event, 0)
356 subs.DelEndpoint(parentTrans.GetEndpoint())
359 //-------------------------------------------------------------------
360 // send to E2T Subscription Request
361 //-------------------------------------------------------------------
362 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
364 var event interface{} = nil
365 var timedOut bool = false
367 subReqMsg := subs.SubReqMsg
368 subReqMsg.RequestId.Id = 123
369 subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
370 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
372 xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
376 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
377 desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
378 c.rmrSend(desc, subs, trans)
379 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
385 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
389 //-------------------------------------------------------------------
390 // send to E2T Subscription Delete Request
391 //-------------------------------------------------------------------
393 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
395 var event interface{}
398 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
399 subDelReqMsg.RequestId.Id = 123
400 subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
401 subDelReqMsg.FunctionId = 0
402 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
404 xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
408 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
409 desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
410 c.rmrSend(desc, subs, trans)
411 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
417 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
421 //-------------------------------------------------------------------
422 // handle from E2T Subscription Reponse
423 //-------------------------------------------------------------------
424 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
425 xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
426 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
428 xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
431 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
433 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
436 trans := subs.GetTransaction()
438 err = fmt.Errorf("Ongoing transaction not found")
439 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
442 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
444 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
445 xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
450 //-------------------------------------------------------------------
451 // handle from E2T Subscription Failure
452 //-------------------------------------------------------------------
453 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
454 xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
455 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
457 xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
460 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
462 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
465 trans := subs.GetTransaction()
467 err = fmt.Errorf("Ongoing transaction not found")
468 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
471 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
473 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
474 xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
479 //-------------------------------------------------------------------
480 // handle from E2T Subscription Delete Response
481 //-------------------------------------------------------------------
482 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
483 xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
484 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
486 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
489 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
491 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
494 trans := subs.GetTransaction()
496 err = fmt.Errorf("Ongoing transaction not found")
497 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
500 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
502 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
503 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
508 //-------------------------------------------------------------------
509 // handle from E2T Subscription Delete Failure
510 //-------------------------------------------------------------------
511 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
512 xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
513 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
515 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
518 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
520 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
523 trans := subs.GetTransaction()
525 err = fmt.Errorf("Ongoing transaction not found")
526 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
529 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
531 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
532 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
537 //-------------------------------------------------------------------
539 //-------------------------------------------------------------------
540 func typeofSubsMessage(v interface{}) string {
545 case *e2ap.E2APSubscriptionRequest:
547 case *e2ap.E2APSubscriptionResponse:
549 case *e2ap.E2APSubscriptionFailure:
551 case *e2ap.E2APSubscriptionDeleteRequest:
553 case *e2ap.E2APSubscriptionDeleteResponse:
555 case *e2ap.E2APSubscriptionDeleteFailure: