2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
65 xapptweaks.XappWrapper
79 xapp.Logger.Info("SUBMGR")
81 viper.SetEnvPrefix("submgr")
82 viper.AllowEmptyEnv(true)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
90 registry := new(Registry)
92 registry.rtmgrClient = &rtmgrClient
94 tracker := new(Tracker)
97 timerMap := new(TimerMap)
100 c := &Control{e2ap: new(E2ap),
105 c.XappWrapper.Init("")
109 func (c *Control) ReadyCB(data interface{}) {
115 func (c *Control) Run() {
116 xapp.SetReadyCB(c.ReadyCB, nil)
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121 params := xapptweaks.NewParams(nil)
122 params.Mtype = trans.GetMtype()
123 params.SubId = int(subs.GetReqId().Seq)
125 params.Meid = subs.GetMeid()
127 params.PayloadLen = len(trans.Payload.Buf)
128 params.Payload = trans.Payload.Buf
130 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131 return c.RmrSend(params)
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
136 params := xapptweaks.NewParams(nil)
137 params.Mtype = trans.GetMtype()
138 params.SubId = int(subs.GetReqId().Seq)
139 params.Xid = trans.GetXid()
140 params.Meid = trans.GetMeid()
142 params.PayloadLen = len(trans.Payload.Buf)
143 params.Payload = trans.Payload.Buf
145 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146 return c.RmrSend(params)
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150 msg := xapptweaks.NewParams(params)
152 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153 xapp.Logger.Error("%s", err.Error())
158 defer c.Rmr.Free(msg.Mbuf)
161 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162 go c.handleXAPPSubscriptionRequest(msg)
163 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164 go c.handleE2TSubscriptionResponse(msg)
165 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166 go c.handleE2TSubscriptionFailure(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168 go c.handleXAPPSubscriptionDeleteRequest(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170 go c.handleE2TSubscriptionDeleteResponse(msg)
171 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172 go c.handleE2TSubscriptionDeleteFailure(msg)
174 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183 xapp.Logger.Info("MSG from XAPP: %s", params.String())
185 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
187 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
191 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
193 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
196 defer trans.Release()
198 err = c.tracker.Track(trans)
200 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
204 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
206 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
213 go c.handleSubscriptionCreate(subs, trans)
214 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
218 switch themsg := event.(type) {
219 case *e2ap.E2APSubscriptionResponse:
220 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
223 c.rmrSendToXapp("", subs, trans)
226 case *e2ap.E2APSubscriptionFailure:
227 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
229 c.rmrSendToXapp("", subs, trans)
235 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
236 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
239 //-------------------------------------------------------------------
240 // handle from XAPP Subscription Delete Request
241 //------------------------------------------------------------------
242 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
243 xapp.Logger.Info("MSG from XAPP: %s", params.String())
245 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
247 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
251 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
253 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
256 defer trans.Release()
258 err = c.tracker.Track(trans)
260 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
264 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
266 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
273 go c.handleSubscriptionDelete(subs, trans)
274 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
276 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
278 // Whatever is received send ok delete response
279 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
280 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
281 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
282 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
284 c.rmrSendToXapp("", subs, trans)
287 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
290 //-------------------------------------------------------------------
291 // SUBS CREATE Handling
292 //-------------------------------------------------------------------
293 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
295 trans := c.tracker.NewSubsTransaction(subs)
296 subs.WaitTransactionTurn(trans)
297 defer subs.ReleaseTransactionTurn(trans)
298 defer trans.Release()
300 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
302 subRfMsg, valid := subs.GetCachedResponse()
303 if subRfMsg == nil && valid == true {
304 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
305 switch event.(type) {
306 case *e2ap.E2APSubscriptionResponse:
307 subRfMsg, valid = subs.SetCachedResponse(event, true)
308 case *e2ap.E2APSubscriptionFailure:
309 subRfMsg, valid = subs.SetCachedResponse(event, false)
311 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
312 subRfMsg, valid = subs.SetCachedResponse(nil, false)
313 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
315 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
317 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
320 parentTrans.SendEvent(subRfMsg, 0)
323 //-------------------------------------------------------------------
324 // SUBS DELETE Handling
325 //-------------------------------------------------------------------
327 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
329 trans := c.tracker.NewSubsTransaction(subs)
330 subs.WaitTransactionTurn(trans)
331 defer subs.ReleaseTransactionTurn(trans)
332 defer trans.Release()
334 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
337 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
340 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
345 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
346 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
347 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
348 parentTrans.SendEvent(subDelRespMsg, 0)
351 //-------------------------------------------------------------------
352 // send to E2T Subscription Request
353 //-------------------------------------------------------------------
354 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
356 var event interface{} = nil
357 var timedOut bool = false
359 subReqMsg := subs.SubReqMsg
360 subReqMsg.RequestId = subs.GetReqId().RequestId
361 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
363 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
367 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
368 desc := fmt.Sprintf("(retry %d)", retries)
369 c.rmrSendToE2T(desc, subs, trans)
370 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
376 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
380 //-------------------------------------------------------------------
381 // send to E2T Subscription Delete Request
382 //-------------------------------------------------------------------
384 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
386 var event interface{}
389 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
390 subDelReqMsg.RequestId = subs.GetReqId().RequestId
391 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
392 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
394 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
398 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
399 desc := fmt.Sprintf("(retry %d)", retries)
400 c.rmrSendToE2T(desc, subs, trans)
401 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
407 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
411 //-------------------------------------------------------------------
412 // handle from E2T Subscription Reponse
413 //-------------------------------------------------------------------
414 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
415 xapp.Logger.Info("MSG from E2T: %s", params.String())
416 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
418 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
421 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
423 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
426 trans := subs.GetTransaction()
428 err = fmt.Errorf("Ongoing transaction not found")
429 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
432 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
434 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
435 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
440 //-------------------------------------------------------------------
441 // handle from E2T Subscription Failure
442 //-------------------------------------------------------------------
443 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
444 xapp.Logger.Info("MSG from E2T: %s", params.String())
445 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
447 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
450 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
452 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
455 trans := subs.GetTransaction()
457 err = fmt.Errorf("Ongoing transaction not found")
458 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
461 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
463 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
464 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
469 //-------------------------------------------------------------------
470 // handle from E2T Subscription Delete Response
471 //-------------------------------------------------------------------
472 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
473 xapp.Logger.Info("MSG from E2T: %s", params.String())
474 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
476 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
479 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
481 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
484 trans := subs.GetTransaction()
486 err = fmt.Errorf("Ongoing transaction not found")
487 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
490 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
492 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
493 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
498 //-------------------------------------------------------------------
499 // handle from E2T Subscription Delete Failure
500 //-------------------------------------------------------------------
501 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
502 xapp.Logger.Info("MSG from E2T: %s", params.String())
503 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
505 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
508 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
510 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
513 trans := subs.GetTransaction()
515 err = fmt.Errorf("Ongoing transaction not found")
516 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
519 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
521 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
522 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
527 //-------------------------------------------------------------------
529 //-------------------------------------------------------------------
530 func typeofSubsMessage(v interface{}) string {
535 case *e2ap.E2APSubscriptionRequest:
537 case *e2ap.E2APSubscriptionResponse:
539 case *e2ap.E2APSubscriptionFailure:
541 case *e2ap.E2APSubscriptionDeleteRequest:
543 case *e2ap.E2APSubscriptionDeleteResponse:
545 case *e2ap.E2APSubscriptionDeleteFailure: