2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
65 xapptweaks.XappWrapper
79 xapp.Logger.Info("SUBMGR")
81 viper.SetEnvPrefix("submgr")
82 viper.AllowEmptyEnv(true)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
90 registry := new(Registry)
92 registry.rtmgrClient = &rtmgrClient
94 tracker := new(Tracker)
97 timerMap := new(TimerMap)
100 c := &Control{e2ap: new(E2ap),
105 c.XappWrapper.Init("")
109 func (c *Control) ReadyCB(data interface{}) {
115 func (c *Control) Run() {
116 xapp.SetReadyCB(c.ReadyCB, nil)
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121 params := xapptweaks.NewParams(nil)
122 params.Mtype = trans.GetMtype()
123 params.SubId = int(subs.GetReqId().Seq)
125 params.Meid = subs.GetMeid()
127 params.PayloadLen = len(trans.Payload.Buf)
128 params.Payload = trans.Payload.Buf
130 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131 return c.RmrSend(params)
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
136 params := xapptweaks.NewParams(nil)
137 params.Mtype = trans.GetMtype()
138 params.SubId = int(subs.GetReqId().Seq)
139 params.Xid = trans.GetXid()
140 params.Meid = trans.GetMeid()
142 params.PayloadLen = len(trans.Payload.Buf)
143 params.Payload = trans.Payload.Buf
145 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146 return c.RmrSend(params)
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150 msg := xapptweaks.NewParams(params)
152 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153 xapp.Logger.Error("%s", err.Error())
158 defer c.Rmr.Free(msg.Mbuf)
161 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162 go c.handleXAPPSubscriptionRequest(msg)
163 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164 go c.handleE2TSubscriptionResponse(msg)
165 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166 go c.handleE2TSubscriptionFailure(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168 go c.handleXAPPSubscriptionDeleteRequest(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170 go c.handleE2TSubscriptionDeleteResponse(msg)
171 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172 go c.handleE2TSubscriptionDeleteFailure(msg)
174 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183 xapp.Logger.Info("MSG from XAPP: %s", params.String())
185 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
187 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
191 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
193 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
196 defer trans.Release()
198 err = c.tracker.Track(trans)
200 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
204 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
206 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
213 go c.handleSubscriptionCreate(subs, trans)
214 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
218 switch themsg := event.(type) {
219 case *e2ap.E2APSubscriptionResponse:
220 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
222 c.rmrSendToXapp("", subs, trans)
225 case *e2ap.E2APSubscriptionFailure:
226 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
228 c.rmrSendToXapp("", subs, trans)
234 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
235 go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
238 //-------------------------------------------------------------------
239 // handle from XAPP Subscription Delete Request
240 //------------------------------------------------------------------
241 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
242 xapp.Logger.Info("MSG from XAPP: %s", params.String())
244 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
246 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
250 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
252 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
255 defer trans.Release()
257 err = c.tracker.Track(trans)
259 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
263 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
265 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
272 go c.handleSubscriptionDelete(subs, trans)
273 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
275 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
277 // Whatever is received send ok delete response
278 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
279 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
280 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
281 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
283 c.rmrSendToXapp("", subs, trans)
286 go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
289 //-------------------------------------------------------------------
290 // SUBS CREATE Handling
291 //-------------------------------------------------------------------
292 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
294 trans := c.tracker.NewSubsTransaction(subs)
295 subs.WaitTransactionTurn(trans)
296 defer subs.ReleaseTransactionTurn(trans)
297 defer trans.Release()
299 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
301 subRfMsg, valid := subs.GetCachedResponse()
302 if subRfMsg == nil && valid == true {
303 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
304 switch event.(type) {
305 case *e2ap.E2APSubscriptionResponse:
306 subRfMsg, valid = subs.SetCachedResponse(event, true)
307 case *e2ap.E2APSubscriptionFailure:
308 subRfMsg, valid = subs.SetCachedResponse(event, false)
310 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
311 subRfMsg, valid = subs.SetCachedResponse(nil, false)
312 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
314 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
316 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
319 parentTrans.SendEvent(subRfMsg, 0)
322 //-------------------------------------------------------------------
323 // SUBS DELETE Handling
324 //-------------------------------------------------------------------
326 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
328 trans := c.tracker.NewSubsTransaction(subs)
329 subs.WaitTransactionTurn(trans)
330 defer subs.ReleaseTransactionTurn(trans)
331 defer trans.Release()
333 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
336 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
339 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
344 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
345 subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
346 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
347 parentTrans.SendEvent(subDelRespMsg, 0)
350 //-------------------------------------------------------------------
351 // send to E2T Subscription Request
352 //-------------------------------------------------------------------
353 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
355 var event interface{} = nil
356 var timedOut bool = false
358 subReqMsg := subs.SubReqMsg
359 subReqMsg.RequestId = subs.GetReqId().RequestId
360 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
362 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
366 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
367 desc := fmt.Sprintf("(retry %d)", retries)
368 c.rmrSendToE2T(desc, subs, trans)
369 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
375 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
379 //-------------------------------------------------------------------
380 // send to E2T Subscription Delete Request
381 //-------------------------------------------------------------------
383 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
385 var event interface{}
388 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
389 subDelReqMsg.RequestId = subs.GetReqId().RequestId
390 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
391 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
393 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
397 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
398 desc := fmt.Sprintf("(retry %d)", retries)
399 c.rmrSendToE2T(desc, subs, trans)
400 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
406 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
410 //-------------------------------------------------------------------
411 // handle from E2T Subscription Reponse
412 //-------------------------------------------------------------------
413 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
414 xapp.Logger.Info("MSG from E2T: %s", params.String())
415 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
417 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
420 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
422 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
425 trans := subs.GetTransaction()
427 err = fmt.Errorf("Ongoing transaction not found")
428 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
431 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
433 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
434 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
439 //-------------------------------------------------------------------
440 // handle from E2T Subscription Failure
441 //-------------------------------------------------------------------
442 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
443 xapp.Logger.Info("MSG from E2T: %s", params.String())
444 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
446 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
449 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
451 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
454 trans := subs.GetTransaction()
456 err = fmt.Errorf("Ongoing transaction not found")
457 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
460 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
462 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
463 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
468 //-------------------------------------------------------------------
469 // handle from E2T Subscription Delete Response
470 //-------------------------------------------------------------------
471 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
472 xapp.Logger.Info("MSG from E2T: %s", params.String())
473 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
475 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
478 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
480 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
483 trans := subs.GetTransaction()
485 err = fmt.Errorf("Ongoing transaction not found")
486 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
489 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
491 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
492 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
497 //-------------------------------------------------------------------
498 // handle from E2T Subscription Delete Failure
499 //-------------------------------------------------------------------
500 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
501 xapp.Logger.Info("MSG from E2T: %s", params.String())
502 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
504 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
507 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
509 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
512 trans := subs.GetTransaction()
514 err = fmt.Errorf("Ongoing transaction not found")
515 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
518 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
520 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
521 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
526 //-------------------------------------------------------------------
528 //-------------------------------------------------------------------
529 func typeofSubsMessage(v interface{}) string {
534 case *e2ap.E2APSubscriptionRequest:
536 case *e2ap.E2APSubscriptionResponse:
538 case *e2ap.E2APSubscriptionFailure:
540 case *e2ap.E2APSubscriptionDeleteRequest:
542 case *e2ap.E2APSubscriptionDeleteResponse:
544 case *e2ap.E2APSubscriptionDeleteFailure: