httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/gorilla/mux"
+ "github.com/segmentio/ksuid"
"github.com/spf13/viper"
)
var e2tSubReqTimeout time.Duration
var e2tSubDelReqTime time.Duration
var e2tRecvMsgTimeout time.Duration
+var waitRouteCleanup_ms time.Duration
var e2tMaxSubReqTryCount uint64 // Initial try + retry
var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
var readSubsFromDb string
xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
- go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
if readSubsFromDb == "false" {
return c
e2tRecvMsgTimeout = 2000 * 1000000
}
xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
+
+ // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
+ // value 100ms used currently only in unittests.
+ waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
+ if waitRouteCleanup_ms == 0 {
+ waitRouteCleanup_ms = 5000 * 1000000
+ }
+ xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
+
e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
if e2tMaxSubReqTryCount == 0 {
e2tMaxSubReqTryCount = 1
}
xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
+
e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
if e2tMaxSubDelReqTryCount == 0 {
e2tMaxSubDelReqTryCount = 1
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
- /*
- switch p := params.(type) {
- case *models.ReportParams:
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
- if trans == nil {
- xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
- return
- }
- defer trans.Release()
- case *models.ControlParams:
- case *models.PolicyParams:
- }
- */
- return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
-}
-
-func (c *Control) SubscriptionDeleteHandler(s string) error {
+func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
+
+ restSubId := ksuid.New().String()
+ subResp := models.SubscriptionResponse{}
+ subResp.SubscriptionID = &restSubId
+ p := params.(*models.SubscriptionParams)
+
+ c.CntRecvMsg++
+
+ c.UpdateCounter(cRestSubReqFromXapp)
+
+ if p.ClientEndpoint == nil {
+ xapp.Logger.Error("ClientEndpoint == nil")
+ return nil, fmt.Errorf("")
+ }
+
+ _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ subReqList := e2ap.SubscriptionRequestList{}
+ err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ c.registry.DeleteRESTSubscription(&restSubId)
+ return nil, err
+ }
+
+ go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
+
+ return &subResp, nil
+
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
+func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
+ clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
+
+ xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
+
+ _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ return
+ }
+
+ var requestorID int64
+ var instanceId int64
+ for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
+ subReqMsg := subReqList.E2APSubscriptionRequests[index]
+
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
+ if trans == nil {
+ c.registry.DeleteRESTSubscription(restSubId)
+ xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
+ return
+ }
+
+ defer trans.Release()
+ xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+ subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
+ if err != nil {
+ // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
+ // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
+ requestorID = (int64)(0)
+ instanceId = (int64)(0)
+ resp := &models.SubscriptionResponse{
+ SubscriptionID: restSubId,
+ SubscriptionInstances: []*models.SubscriptionInstance{
+ &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+ },
+ }
+ // Mark REST subscription request processed.
+ restSubscription.SetProcessed()
+ xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
+ xapp.Subscription.Notify(resp, *clientEndpoint)
+ } else {
+ xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
+
+ // Store successfully processed InstanceId for deletion
+ restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
+
+ // Send notification to xApp that a Subscription Request has been processed.
+ requestorID = (int64)(subRespMsg.RequestId.Id)
+ instanceId = (int64)(subRespMsg.RequestId.InstanceId)
+ resp := &models.SubscriptionResponse{
+ SubscriptionID: restSubId,
+ SubscriptionInstances: []*models.SubscriptionInstance{
+ &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+ },
+ }
+ // Mark REST subscription request processesd.
+ restSubscription.SetProcessed()
+ xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
+ xapp.Subscription.Notify(resp, *clientEndpoint)
+
+ }
+ c.UpdateCounter(cRestSubRespToXapp)
+ }
+}
+
+//-------------------------------------------------------------------
+//
+//------------------------------------------------------------------
+func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
+ restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
+
+ err := c.tracker.Track(trans)
+ if err != nil {
+ err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
+ if err != nil {
+ err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ //
+ // Wake subs request
+ //
+ go c.handleSubscriptionCreate(subs, trans)
+ event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ err = nil
+ if event != nil {
+ switch themsg := event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ trans.Release()
+ return themsg, nil
+ case *e2ap.E2APSubscriptionFailure:
+ err = fmt.Errorf("SubscriptionFailure received")
+ return nil, err
+ default:
+ break
+ }
+ }
+ err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
+ xapp.Logger.Error("%s", err.Error())
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
+ return nil, err
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
+
+ c.CntRecvMsg++
+ c.UpdateCounter(cRestSubDelReqFromXapp)
+
+ xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
+
+ restSubscription, err := c.registry.GetRESTSubscription(restSubId)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ if restSubscription == nil {
+ // Subscription was not found
+ return nil
+ } else {
+ if restSubscription.SubReqOngoing == true {
+ err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
+ xapp.Logger.Error("%s", err.Error())
+ return err
+ } else if restSubscription.SubDelReqOngoing == true {
+ // Previous request for same restSubId still ongoing
+ return nil
+ }
+ }
+ }
+
+ xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
+ go func() {
+ for _, instanceId := range restSubscription.InstanceIds {
+ err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ //return err
+ }
+ xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
+ restSubscription.DeleteInstanceId(instanceId)
+ }
+ c.registry.DeleteRESTSubscription(&restSubId)
+ }()
+
+ c.UpdateCounter(cRestSubDelRespToXapp)
+
+ return nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
+
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
+ if trans == nil {
+ err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
+ xapp.Logger.Error("%s", err.Error())
+ }
+ defer trans.Release()
+
+ err := c.tracker.Track(trans)
+ if err != nil {
+ err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return &time.ParseError{}
+ }
+
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
+ if err != nil {
+ err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return err
+ }
+ //
+ // Wake subs delete
+ //
+ go c.handleSubscriptionDelete(subs, trans)
+ trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
+
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
+
return nil
}
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
func (c *Control) QueryHandler() (models.SubscriptionList, error) {
xapp.Logger.Info("QueryHandler() called")
+ c.CntRecvMsg++
+
return c.registry.QueryHandler()
}
removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(event, false)
xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+ c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
case *SubmgrRestartTestEvent:
// This simulates that no response has been received and after restart subscriptions are restored from db
xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
if valid == false {
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
}
c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
// If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
// RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
c.registry.UpdateSubscriptionToDb(subs, c)
parentTrans.SendEvent(nil, 0)
}
// Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
c.WriteSubscriptionToDb(subs)
+
for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
desc := fmt.Sprintf("(retry %d)", retries)
if retries == 0 {
func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
c.UpdateCounter(cSubRespFromE2)
+
subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
if err != nil {
xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
return "NIL"
}
switch v.(type) {
- case *e2ap.E2APSubscriptionRequest:
- return "SubReq"
+ //case *e2ap.E2APSubscriptionRequest:
+ // return "SubReq"
case *e2ap.E2APSubscriptionResponse:
return "SubResp"
case *e2ap.E2APSubscriptionFailure:
return "SubFail"
- case *e2ap.E2APSubscriptionDeleteRequest:
- return "SubDelReq"
+ //case *e2ap.E2APSubscriptionDeleteRequest:
+ // return "SubDelReq"
case *e2ap.E2APSubscriptionDeleteResponse:
return "SubDelResp"
case *e2ap.E2APSubscriptionDeleteFailure:
params.PayloadLen = len(payload.Buf)
params.Payload = payload.Buf
params.Mbuf = nil
-
- if params == nil {
- xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
- return
- }
-
subs.DeleteFromDb = true
c.handleXAPPSubscriptionDeleteRequest(params)
}