"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
e2ap *E2ap
registry *Registry
tracker *Tracker
- timerMap *TimerMap
+ //subscriber *xapp.Subscriber
}
type RMRMeid struct {
tracker := new(Tracker)
tracker.Init()
- timerMap := new(TimerMap)
- timerMap.Init()
+ //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
c := &Control{e2ap: new(E2ap),
registry: registry,
tracker: tracker,
- timerMap: timerMap,
+ //subscriber: subscriber,
}
c.XappWrapper.Init("")
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
+ //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
return c
}
xapp.Run(c)
}
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
+ /*
+ switch p := params.(type) {
+ case *models.ReportParams:
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
+ if trans == nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
+ return
+ }
+ defer trans.Release()
+ case *models.ControlParams:
+ case *models.PolicyParams:
+ }
+ */
+ return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
+}
+
+func (c *Control) SubscriptionDeleteHandler(string) error {
+ return fmt.Errorf("Subscription rest interface not implemented")
+}
+
+func (c *Control) QueryHandler() (models.SubscriptionList, error) {
+ return c.registry.QueryHandler()
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
params := xapptweaks.NewParams(nil)
params.Mtype = trans.GetMtype()
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params)
+ return c.RmrSend(params, 5)
}
func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params)
+ return c.RmrSend(params, 5)
}
func (c *Control) Consume(params *xapp.RMRParams) (err error) {
defer c.Rmr.Free(msg.Mbuf)
switch msg.Mtype {
- case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ case xapp.RIC_SUB_REQ:
go c.handleXAPPSubscriptionRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_RESP"]:
+ case xapp.RIC_SUB_RESP:
go c.handleE2TSubscriptionResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ case xapp.RIC_SUB_FAILURE:
go c.handleE2TSubscriptionFailure(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
+ case xapp.RIC_SUB_DEL_REQ:
go c.handleXAPPSubscriptionDeleteRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
+ case xapp.RIC_SUB_DEL_RESP:
go c.handleE2TSubscriptionDeleteResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
+ case xapp.RIC_SUB_DEL_FAILURE:
go c.handleE2TSubscriptionDeleteFailure(msg)
default:
xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
return
}
+ //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
if err != nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
}
}
xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
c.rmrSendToXapp("", subs, trans)
}
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
+ //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
subRfMsg, valid := subs.GetCachedResponse()
if subRfMsg == nil && valid == true {
- event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
- switch event.(type) {
- case *e2ap.E2APSubscriptionResponse:
- subRfMsg, valid = subs.SetCachedResponse(event, true)
- case *e2ap.E2APSubscriptionFailure:
- subRfMsg, valid = subs.SetCachedResponse(event, false)
- default:
- xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
- subRfMsg, valid = subs.SetCachedResponse(nil, false)
- c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+
+ //
+ // In case of failure
+ // - make internal delete
+ // - in case duplicate cause, retry (currently max 1 retry)
+ //
+ maxRetries := uint64(1)
+ doRetry := true
+ for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
+ doRetry = false
+
+ event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
+ switch themsg := event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ subRfMsg, valid = subs.SetCachedResponse(event, true)
+ case *e2ap.E2APSubscriptionFailure:
+ subRfMsg, valid = subs.SetCachedResponse(event, false)
+ doRetry = true
+ for _, item := range themsg.ActionNotAdmittedList.Items {
+ if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
+ doRetry = false
+ break
+ }
+ }
+ xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
+ c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ default:
+ xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+ subRfMsg, valid = subs.SetCachedResponse(nil, false)
+ c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ }
}
+
xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
} else {
xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
}
+ //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
+ if valid == false {
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+ }
parentTrans.SendEvent(subRfMsg, 0)
}
} else {
subs.mutex.Unlock()
}
-
+ //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
+ // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
+ // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
parentTrans.SendEvent(nil, 0)
}