2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 func idstring(err error, entries ...fmt.Stringer) string {
39 var retval string = ""
40 var filler string = ""
41 for _, entry := range entries {
42 retval += filler + entry.String()
46 retval += filler + "err(" + err.Error() + ")"
53 //-----------------------------------------------------------------------------
55 //-----------------------------------------------------------------------------
57 var e2tSubReqTimeout time.Duration = 5 * time.Second
58 var e2tSubDelReqTime time.Duration = 5 * time.Second
59 var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
60 var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
62 var e2tRecvMsgTimeout time.Duration = 5 * time.Second
65 xapptweaks.XappWrapper
79 xapp.Logger.Info("SUBMGR")
81 viper.SetEnvPrefix("submgr")
82 viper.AllowEmptyEnv(true)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
90 registry := new(Registry)
92 registry.rtmgrClient = &rtmgrClient
94 tracker := new(Tracker)
97 timerMap := new(TimerMap)
100 c := &Control{e2ap: new(E2ap),
105 c.XappWrapper.Init("")
109 func (c *Control) ReadyCB(data interface{}) {
115 func (c *Control) Run() {
116 xapp.SetReadyCB(c.ReadyCB, nil)
120 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
121 params := xapptweaks.NewParams(nil)
122 params.Mtype = trans.GetMtype()
123 params.SubId = int(subs.GetReqId().Seq)
125 params.Meid = subs.GetMeid()
127 params.PayloadLen = len(trans.Payload.Buf)
128 params.Payload = trans.Payload.Buf
130 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
131 return c.RmrSend(params)
134 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
136 params := xapptweaks.NewParams(nil)
137 params.Mtype = trans.GetMtype()
138 params.SubId = int(subs.GetReqId().Seq)
139 params.Xid = trans.GetXid()
140 params.Meid = trans.GetMeid()
142 params.PayloadLen = len(trans.Payload.Buf)
143 params.Payload = trans.Payload.Buf
145 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
146 return c.RmrSend(params)
149 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
150 msg := xapptweaks.NewParams(params)
152 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
153 xapp.Logger.Error("%s", err.Error())
158 defer c.Rmr.Free(msg.Mbuf)
161 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
162 go c.handleXAPPSubscriptionRequest(msg)
163 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
164 go c.handleE2TSubscriptionResponse(msg)
165 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
166 go c.handleE2TSubscriptionFailure(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
168 go c.handleXAPPSubscriptionDeleteRequest(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
170 go c.handleE2TSubscriptionDeleteResponse(msg)
171 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
172 go c.handleE2TSubscriptionDeleteFailure(msg)
174 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
179 //-------------------------------------------------------------------
180 // handle from XAPP Subscription Request
181 //------------------------------------------------------------------
182 func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
183 xapp.Logger.Info("MSG from XAPP: %s", params.String())
185 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
187 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
191 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
193 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
196 defer trans.Release()
198 err = c.tracker.Track(trans)
200 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
204 subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
206 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
213 go c.handleSubscriptionCreate(subs, trans)
214 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
218 switch themsg := event.(type) {
219 case *e2ap.E2APSubscriptionResponse:
220 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
223 c.rmrSendToXapp("", subs, trans)
226 case *e2ap.E2APSubscriptionFailure:
227 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
229 c.rmrSendToXapp("", subs, trans)
235 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
236 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
239 //-------------------------------------------------------------------
240 // handle from XAPP Subscription Delete Request
241 //------------------------------------------------------------------
242 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
243 xapp.Logger.Info("MSG from XAPP: %s", params.String())
245 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
247 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
251 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
253 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
256 defer trans.Release()
258 err = c.tracker.Track(trans)
260 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
264 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
266 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
273 go c.handleSubscriptionDelete(subs, trans)
274 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
276 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
278 // Whatever is received send ok delete response
279 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
280 subDelRespMsg.RequestId = subs.GetReqId().RequestId
281 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
282 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
284 c.rmrSendToXapp("", subs, trans)
287 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
290 //-------------------------------------------------------------------
291 // SUBS CREATE Handling
292 //-------------------------------------------------------------------
293 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
295 trans := c.tracker.NewSubsTransaction(subs)
296 subs.WaitTransactionTurn(trans)
297 defer subs.ReleaseTransactionTurn(trans)
298 defer trans.Release()
300 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
302 subRfMsg, valid := subs.GetCachedResponse()
303 if subRfMsg == nil && valid == true {
304 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
305 switch event.(type) {
306 case *e2ap.E2APSubscriptionResponse:
307 subRfMsg, valid = subs.SetCachedResponse(event, true)
308 case *e2ap.E2APSubscriptionFailure:
309 subRfMsg, valid = subs.SetCachedResponse(event, false)
311 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
312 subRfMsg, valid = subs.SetCachedResponse(nil, false)
313 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
315 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
317 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
320 parentTrans.SendEvent(subRfMsg, 0)
323 //-------------------------------------------------------------------
324 // SUBS DELETE Handling
325 //-------------------------------------------------------------------
327 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
329 trans := c.tracker.NewSubsTransaction(subs)
330 subs.WaitTransactionTurn(trans)
331 defer subs.ReleaseTransactionTurn(trans)
332 defer trans.Release()
334 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
337 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
340 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
345 parentTrans.SendEvent(nil, 0)
348 //-------------------------------------------------------------------
349 // send to E2T Subscription Request
350 //-------------------------------------------------------------------
351 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
353 var event interface{} = nil
354 var timedOut bool = false
356 subReqMsg := subs.SubReqMsg
357 subReqMsg.RequestId = subs.GetReqId().RequestId
358 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
360 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
364 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
365 desc := fmt.Sprintf("(retry %d)", retries)
366 c.rmrSendToE2T(desc, subs, trans)
367 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
373 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
377 //-------------------------------------------------------------------
378 // send to E2T Subscription Delete Request
379 //-------------------------------------------------------------------
381 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
383 var event interface{}
386 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
387 subDelReqMsg.RequestId = subs.GetReqId().RequestId
388 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
389 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
391 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
395 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
396 desc := fmt.Sprintf("(retry %d)", retries)
397 c.rmrSendToE2T(desc, subs, trans)
398 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
404 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
408 //-------------------------------------------------------------------
409 // handle from E2T Subscription Reponse
410 //-------------------------------------------------------------------
411 func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
412 xapp.Logger.Info("MSG from E2T: %s", params.String())
413 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
415 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
418 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
420 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
423 trans := subs.GetTransaction()
425 err = fmt.Errorf("Ongoing transaction not found")
426 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
429 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
431 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
432 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
437 //-------------------------------------------------------------------
438 // handle from E2T Subscription Failure
439 //-------------------------------------------------------------------
440 func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
441 xapp.Logger.Info("MSG from E2T: %s", params.String())
442 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
444 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
447 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
449 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
452 trans := subs.GetTransaction()
454 err = fmt.Errorf("Ongoing transaction not found")
455 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
458 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
460 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
461 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
466 //-------------------------------------------------------------------
467 // handle from E2T Subscription Delete Response
468 //-------------------------------------------------------------------
469 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
470 xapp.Logger.Info("MSG from E2T: %s", params.String())
471 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
473 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
476 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
478 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
481 trans := subs.GetTransaction()
483 err = fmt.Errorf("Ongoing transaction not found")
484 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
487 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
489 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
490 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
495 //-------------------------------------------------------------------
496 // handle from E2T Subscription Delete Failure
497 //-------------------------------------------------------------------
498 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
499 xapp.Logger.Info("MSG from E2T: %s", params.String())
500 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
502 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
505 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
507 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
510 trans := subs.GetTransaction()
512 err = fmt.Errorf("Ongoing transaction not found")
513 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
516 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
518 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
519 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
524 //-------------------------------------------------------------------
526 //-------------------------------------------------------------------
527 func typeofSubsMessage(v interface{}) string {
532 case *e2ap.E2APSubscriptionRequest:
534 case *e2ap.E2APSubscriptionResponse:
536 case *e2ap.E2APSubscriptionFailure:
538 case *e2ap.E2APSubscriptionDeleteRequest:
540 case *e2ap.E2APSubscriptionDeleteResponse:
542 case *e2ap.E2APSubscriptionDeleteFailure: