//subscriber: subscriber,
}
c.XappWrapper.Init("")
- go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
//go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
return c
}
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
+func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
/*
switch p := params.(type) {
case *models.ReportParams:
case *models.PolicyParams:
}
*/
- return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
+ return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
+}
+
+func (c *Control) SubscriptionDeleteHandler(string) error {
+ return fmt.Errorf("Subscription rest interface not implemented")
}
func (c *Control) QueryHandler() (models.SubscriptionList, error) {
func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
params := xapptweaks.NewParams(nil)
params.Mtype = trans.GetMtype()
- params.SubId = int(subs.GetReqId().Seq)
+ params.SubId = int(subs.GetReqId().InstanceId)
params.Xid = ""
params.Meid = subs.GetMeid()
params.Src = ""
params := xapptweaks.NewParams(nil)
params.Mtype = trans.GetMtype()
- params.SubId = int(subs.GetReqId().Seq)
+ params.SubId = int(subs.GetReqId().InstanceId)
params.Xid = trans.GetXid()
params.Meid = trans.GetMeid()
params.Src = ""
defer c.Rmr.Free(msg.Mbuf)
+ // xapp-frame might use direct access to c buffer and
+ // when msg.Mbuf is freed, someone might take it into use
+ // and payload data might be invalid inside message handle function
+ //
+ // subscriptions won't load system a lot so there is no
+ // real performance hit by cloning buffer into new go byte slice
+ cPay := append(msg.Payload[:0:0], msg.Payload...)
+ msg.Payload = cPay
+ msg.PayloadLen = len(cPay)
+
switch msg.Mtype {
case xapp.RIC_SUB_REQ:
go c.handleXAPPSubscriptionRequest(msg)
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
return
}
+ //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
if err != nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
}
}
xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
c.rmrSendToXapp("", subs, trans)
}
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
+ //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
}
+ //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
+ if valid == false {
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+ }
parentTrans.SendEvent(subRfMsg, 0)
}
} else {
subs.mutex.Unlock()
}
-
+ //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
+ // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
+ // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
parentTrans.SendEvent(nil, 0)
}
xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
if err != nil {
xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
return
xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
if err != nil {
xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
return
xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
if err != nil {
xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
return
xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
if err != nil {
xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
return