if err != nil {
retval += filler + "err(" + err.Error() + ")"
filler = " "
-
}
return retval
}
//subscriber *xapp.Subscriber
CntRecvMsg uint64
ResetTestFlag bool
+ Counters map[string]xapp.Counter
}
type RMRMeid struct {
func NewControl() *Control {
- ReadConfigParameters()
transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
tracker := new(Tracker)
tracker.Init()
- //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
-
c := &Control{e2ap: new(E2ap),
registry: registry,
tracker: tracker,
db: CreateSdl(),
//subscriber: subscriber,
+ Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
}
+ c.ReadConfigParameters("")
// Register REST handler for testing support
xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
+ xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
- //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
if readSubsFromDb == "false" {
return c
return c
}
+func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
+ subscriptions, _ := c.registry.QueryHandler()
+ xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
+}
+
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func ReadConfigParameters() {
+func (c *Control) ReadConfigParameters(f string) {
// viper.GetDuration returns nanoseconds
e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
func (c *Control) Run() {
xapp.SetReadyCB(c.ReadyCB, nil)
+ xapp.AddConfigChangeListener(c.ReadConfigParameters)
xapp.Run(c)
}
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
+func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
/*
switch p := params.(type) {
case *models.ReportParams:
}
func (c *Control) QueryHandler() (models.SubscriptionList, error) {
+ xapp.Logger.Info("QueryHandler() called")
+
return c.registry.QueryHandler()
}
func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
-
xapp.Logger.Info("TestRestHandler() called")
pathParams := mux.Vars(r)
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
- return c.SendWithRetry(params, false, 5)
+ err = c.SendWithRetry(params, false, 5)
+ if err != nil {
+ xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
+ }
+ return err
}
func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
- return c.SendWithRetry(params, false, 5)
+ err = c.SendWithRetry(params, false, 5)
+ if err != nil {
+ xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
+ }
+ return err
}
func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
//------------------------------------------------------------------
func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from XAPP: %s", params.String())
+ c.UpdateCounter(cSubReqFromXapp)
subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
if err != nil {
return
}
- trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
}
defer trans.Release()
- err = c.tracker.Track(trans)
- if err != nil {
+ if err = c.tracker.Track(trans); err != nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
return
}
//TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
- subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
if err != nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
return
}
- //
- // Wake subs request
- //
+ c.wakeSubscriptionRequest(subs, trans)
+}
+
+//-------------------------------------------------------------------
+// Wake Subscription Request to E2node
+//------------------------------------------------------------------
+func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
+
go c.handleSubscriptionCreate(subs, trans)
event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
- err = nil
+ var err error
if event != nil {
switch themsg := event.(type) {
case *e2ap.E2APSubscriptionResponse:
+ themsg.RequestId.Id = trans.RequestId.Id
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
if err == nil {
trans.Release()
+ c.UpdateCounter(cSubRespToXapp)
c.rmrSendToXapp("", subs, trans)
return
}
case *e2ap.E2APSubscriptionFailure:
+ themsg.RequestId.Id = trans.RequestId.Id
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
if err == nil {
+ c.UpdateCounter(cSubFailToXapp)
c.rmrSendToXapp("", subs, trans)
}
default:
//------------------------------------------------------------------
func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from XAPP: %s", params.String())
+ c.UpdateCounter(cSubDelReqFromXapp)
subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
if err != nil {
return
}
- trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
// Whatever is received success, fail or timeout, send successful delete response
subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
- subDelRespMsg.RequestId = subs.GetReqId().RequestId
+ subDelRespMsg.RequestId.Id = trans.RequestId.Id
+ subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
if err == nil {
+ c.UpdateCounter(cSubDelRespToXapp)
c.rmrSendToXapp("", subs, trans)
}
case *e2ap.E2APSubscriptionFailure:
removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(event, false)
- xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
- c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
case *SubmgrRestartTestEvent:
// This simulates that no response has been received and after restart subscriptions are restored from db
xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
subs.mutex.Lock()
+
if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
subs.valid = false
subs.mutex.Unlock()
c.WriteSubscriptionToDb(subs)
for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
desc := fmt.Sprintf("(retry %d)", retries)
+ if retries == 0 {
+ c.UpdateCounter(cSubReqToE2)
+ } else {
+ c.UpdateCounter(cSubReReqToE2)
+ }
c.rmrSendToE2T(desc, subs, trans)
if subs.DoNotWaitSubResp == false {
event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
if timedOut {
+ c.UpdateCounter(cSubReqTimerExpiry)
continue
}
} else {
for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
desc := fmt.Sprintf("(retry %d)", retries)
+ if retries == 0 {
+ c.UpdateCounter(cSubDelReqToE2)
+ } else {
+ c.UpdateCounter(cSubDelReReqToE2)
+ }
c.rmrSendToE2T(desc, subs, trans)
event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
if timedOut {
+ c.UpdateCounter(cSubDelReqTimerExpiry)
continue
}
break
}
//-------------------------------------------------------------------
-// handle from E2T Subscription Reponse
+// handle from E2T Subscription Response
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
+ c.UpdateCounter(cSubRespFromE2)
subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
if err != nil {
xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
+ c.UpdateCounter(cSubFailFromE2)
subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
if err != nil {
xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
+ c.UpdateCounter(cSubDelRespFromE2)
subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
if err != nil {
xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
+ c.UpdateCounter(cSubDelFailFromE2)
subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
if err != nil {
xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))