2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/strfmt"
29 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 var e2tSubReqTimeout time.Duration = 5 * time.Second
39 var e2tSubDelReqTime time.Duration = 5 * time.Second
40 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
41 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
43 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
50 rmrSendMutex sync.Mutex
68 xapp.Logger.Info("SUBMGR")
70 viper.SetEnvPrefix("submgr")
71 viper.AllowEmptyEnv(true)
74 func NewControl() *Control {
76 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
77 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
79 registry := new(Registry)
81 registry.rtmgrClient = &rtmgrClient
83 tracker := new(Tracker)
86 timerMap := new(TimerMap)
89 return &Control{e2ap: new(E2ap),
97 func (c *Control) Run() {
101 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
103 xapp.Logger.Info("%s: %s", desc, params.String())
106 for ; i <= 10 && status == false; i++ {
107 c.rmrSendMutex.Lock()
108 status = xapp.Rmr.Send(params.RMRParams, false)
109 c.rmrSendMutex.Unlock()
111 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
112 time.Sleep(500 * time.Millisecond)
116 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
117 xapp.Logger.Error("%s: %s", desc, err.Error())
118 xapp.Rmr.Free(params.Mbuf)
123 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
124 params := &RMRParams{&xapp.RMRParams{}}
125 params.Mtype = trans.GetMtype()
126 params.SubId = int(subs.GetSubId())
128 params.Meid = subs.GetMeid()
130 params.PayloadLen = len(trans.Payload.Buf)
131 params.Payload = trans.Payload.Buf
134 return c.rmrSendRaw(desc, params)
137 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
138 params := &RMRParams{&xapp.RMRParams{}}
139 params.Mtype = trans.GetMtype()
140 params.SubId = int(subs.GetSubId())
141 params.Xid = trans.GetXid()
142 params.Meid = trans.GetMeid()
144 params.PayloadLen = len(trans.Payload.Buf)
145 params.Payload = trans.Payload.Buf
148 return c.rmrSendRaw(desc, params)
151 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
152 xapp.Rmr.Free(params.Mbuf)
154 msg := &RMRParams{params}
157 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
158 go c.handleXAPPSubscriptionRequest(msg)
159 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
160 go c.handleE2TSubscriptionResponse(msg)
161 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
162 go c.handleE2TSubscriptionFailure(msg)
163 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
164 go c.handleXAPPSubscriptionDeleteRequest(msg)
165 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
166 go c.handleE2TSubscriptionDeleteResponse(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
168 go c.handleE2TSubscriptionDeleteFailure(msg)
170 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
175 func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
176 var retval string = ""
177 var filler string = ""
179 retval += filler + trans.String()
183 retval += filler + subs.String()
187 retval += filler + "err(" + err.Error() + ")"
194 //-------------------------------------------------------------------
195 // handle from XAPP Subscription Request
196 //------------------------------------------------------------------
197 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
198 xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
200 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
202 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
206 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
208 xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
211 defer trans.Release()
213 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
215 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
219 if subs.IsTransactionReserved() {
220 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
221 xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
228 go c.handleSubscriptionCreate(subs, trans)
229 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
233 switch themsg := event.(type) {
234 case *e2ap.E2APSubscriptionResponse:
235 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
237 c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
240 case *e2ap.E2APSubscriptionFailure:
241 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
243 c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
250 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
253 //-------------------------------------------------------------------
254 // handle from XAPP Subscription Delete Request
255 //------------------------------------------------------------------
256 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
257 xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
259 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
261 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
265 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
267 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
270 defer trans.Release()
272 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
274 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
278 if subs.IsTransactionReserved() {
279 err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
280 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
287 go c.handleSubscriptionDelete(subs, trans)
288 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
290 // Whatever is received send ok delete response
291 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
292 subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
293 subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
294 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
295 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
297 c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
301 //-------------------------------------------------------------------
302 // SUBS CREATE Handling
303 //-------------------------------------------------------------------
304 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
306 trans := c.tracker.NewTransaction(subs.GetMeid())
307 subs.WaitTransactionTurn(trans)
308 defer subs.ReleaseTransactionTurn(trans)
309 defer trans.Release()
311 xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
313 if subs.SubRespMsg != nil {
314 xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
315 parentTrans.SendEvent(subs.SubRespMsg, 0)
319 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
320 switch themsg := event.(type) {
321 case *e2ap.E2APSubscriptionResponse:
322 subs.SubRespMsg = themsg
323 parentTrans.SendEvent(event, 0)
325 case *e2ap.E2APSubscriptionFailure:
326 //TODO: Possible delete and one retry for subs req
327 parentTrans.SendEvent(event, 0)
329 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
330 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
331 parentTrans.SendEvent(nil, 0)
334 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
337 //-------------------------------------------------------------------
338 // SUBS DELETE Handling
339 //-------------------------------------------------------------------
340 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
342 trans := c.tracker.NewTransaction(subs.GetMeid())
343 subs.WaitTransactionTurn(trans)
344 defer subs.ReleaseTransactionTurn(trans)
345 defer trans.Release()
347 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
349 event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
351 parentTrans.SendEvent(event, 0)
352 go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
355 //-------------------------------------------------------------------
356 // send to E2T Subscription Request
357 //-------------------------------------------------------------------
358 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
360 var event interface{} = nil
361 var timedOut bool = false
363 subReqMsg := subs.SubReqMsg
364 subReqMsg.RequestId.Id = 123
365 subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
366 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
368 xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
372 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
373 desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
374 c.rmrSend(desc, subs, trans)
375 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
381 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
385 //-------------------------------------------------------------------
386 // send to E2T Subscription Delete Request
387 //-------------------------------------------------------------------
389 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
391 var event interface{}
394 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
395 subDelReqMsg.RequestId.Id = 123
396 subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
397 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
398 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
400 xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
404 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
405 desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
406 c.rmrSend(desc, subs, trans)
407 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
413 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
417 //-------------------------------------------------------------------
418 // handle from E2T Subscription Reponse
419 //-------------------------------------------------------------------
420 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
421 xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
422 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
424 xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
427 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
429 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
432 trans := subs.GetTransaction()
434 err = fmt.Errorf("Ongoing transaction not found")
435 xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
438 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
440 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
441 xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
446 //-------------------------------------------------------------------
447 // handle from E2T Subscription Failure
448 //-------------------------------------------------------------------
449 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
450 xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
451 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
453 xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
456 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
458 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
461 trans := subs.GetTransaction()
463 err = fmt.Errorf("Ongoing transaction not found")
464 xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
467 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
469 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
470 xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
475 //-------------------------------------------------------------------
476 // handle from E2T Subscription Delete Response
477 //-------------------------------------------------------------------
478 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
479 xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
480 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
482 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
485 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
487 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
490 trans := subs.GetTransaction()
492 err = fmt.Errorf("Ongoing transaction not found")
493 xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
496 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
498 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
499 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
504 //-------------------------------------------------------------------
505 // handle from E2T Subscription Delete Failure
506 //-------------------------------------------------------------------
507 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
508 xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
509 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
511 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
514 subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
516 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
519 trans := subs.GetTransaction()
521 err = fmt.Errorf("Ongoing transaction not found")
522 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
525 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
527 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
528 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
533 //-------------------------------------------------------------------
535 //-------------------------------------------------------------------
536 func typeofSubsMessage(v interface{}) string {
541 case *e2ap.E2APSubscriptionRequest:
543 case *e2ap.E2APSubscriptionResponse:
545 case *e2ap.E2APSubscriptionFailure:
547 case *e2ap.E2APSubscriptionDeleteRequest:
549 case *e2ap.E2APSubscriptionDeleteResponse:
551 case *e2ap.E2APSubscriptionDeleteFailure: