X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fcontrol%2Fcontrol.go;h=1bc2ee86de8588e186c0f19009a3ddc559268de8;hb=refs%2Fchanges%2F52%2F2252%2F3;hp=cc176c2b711c1c58370a648da6a39f721b50a6ea;hpb=114fa2c930c5a24078b6ceba68254b2f209c5916;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index cc176c2..1bc2ee8 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -174,54 +174,86 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { return nil } +func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string { + var retval string = "" + var filler string = "" + if trans != nil { + retval += filler + trans.String() + filler = " " + } + if subs != nil { + retval += filler + subs.String() + filler = " " + } + if err != nil { + retval += filler + "err(" + err.Error() + ")" + filler = " " + } + return retval +} + +func (c *Control) findSubs(ids []int) (*Subscription, error) { + var subs *Subscription = nil + for _, id := range ids { + if id >= 0 { + subs = c.registry.GetSubscription(uint16(id)) + } + if subs != nil { + break + } + } + if subs == nil { + return nil, fmt.Errorf("No valid subscription found with ids %v", ids) + } + return subs, nil +} + +func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) { + subs, err := c.findSubs(ids) + if err != nil { + return nil, nil, err + } + trans := subs.GetTransaction() + if trans == nil { + return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil)) + } + return subs, trans, nil +} func (c *Control) handleSubscriptionRequest(params *RMRParams) { xapp.Logger.Info("SubReq from xapp: %s", params.String()) - // - // - // - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Xid, - params.Meid, - false, - true) - + SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err)) return } - // - // - // - trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload) + trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true) if err != nil { - xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans) - trans.Release() + xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err)) return } + trans.SubReqMsg = SubReqMsg - // - // - // - subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid) + subs, err := c.registry.ReserveSubscription(trans.Meid) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err)) trans.Release() return } err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err)) subs.Release() trans.Release() return } - trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId()) + xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil)) + // // TODO: subscription create is in fact owned by subscription and not transaction. // Transaction is toward xapp while Subscription is toward ran. @@ -232,59 +264,34 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) if err != nil { - xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err)) subs.Release() trans.Release() return } c.rmrSend("SubReq: SubReq to E2T", subs, trans) - c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) - xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) return } func (c *Control) handleSubscriptionResponse(params *RMRParams) { xapp.Logger.Info("SubResp from E2T: %s", params.String()) - // - // - // SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String()) - return - } - - // - // - // - subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String()) + xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err)) return } - xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId()) - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubResp: %s", idstring(params, nil, err)) return } - trans.SubRespMsg = SubRespMsg + xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -295,7 +302,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) if err != nil { - xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err)) trans.Release() return } @@ -309,49 +316,22 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { func (c *Control) handleSubscriptionFailure(params *RMRParams) { xapp.Logger.Info("SubFail from E2T: %s", params.String()) - // - // - // SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload) if err != nil { - xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err)) return } - // - // - // - subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String()) - return - } - xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId()) - - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubFail: %s", idstring(params, nil, err)) return } trans.SubFailMsg = SubFailMsg + xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId())) - responseReceived := trans.CheckResponseReceived() - if err != nil { - return - } - if responseReceived == true { // Subscription timer already received return @@ -363,7 +343,7 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { time.Sleep(3 * time.Second) } else { //TODO error handling improvement - xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans) + xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err)) } trans.Release() @@ -374,27 +354,21 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) { xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(uint16(nbrId)) - if subs == nil { - xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) - return - } - - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans(([]int{nbrId})) + if err != nil { + xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err)) return } + xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil)) responseReceived := trans.CheckResponseReceived() - if responseReceived == true { // Subscription Response or Failure already received return } if tryCount < maxSubReqTryCount { - xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans) + xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil)) trans.RetryTransaction() @@ -409,14 +383,9 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou trans.Release() // Create DELETE transaction (internal and no messages toward xapp) - deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, - trans.GetXid(), - trans.GetMeid(), - false, - false) - + deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false) if err != nil { - xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge subs.Release() return @@ -428,7 +397,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) if err != nil { - xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge deltrans.Release() subs.Release() @@ -437,7 +406,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou err = subs.SetTransaction(deltrans) if err != nil { - xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge deltrans.Release() return @@ -451,52 +420,35 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { xapp.Logger.Info("SubDelReq from xapp: %s", params.String()) - // - // - // - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Xid, - params.Meid, - false, - true) - + SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) if err != nil { - xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err)) return } - // - // - // - trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) + trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true) if err != nil { - xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans) - trans.Release() + xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err)) return } + trans.SubDelReqMsg = SubDelReqMsg - // - // - // - subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans) + subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err)) trans.Release() return } - xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans) err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err)) trans.Release() return } + xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil)) + // // TODO: subscription delete is in fact owned by subscription and not transaction. // Transaction is toward xapp while Subscription is toward ran. @@ -507,7 +459,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) if err != nil { - xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err)) trans.Release() return } @@ -523,43 +475,20 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) { xapp.Logger.Info("SubDelResp from E2T:%s", params.String()) - // - // - // SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String()) - return - } - - // - // - // - subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String()) + xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err)) return } - xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId()) - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err)) return } - trans.SubDelRespMsg = SubDelRespMsg + xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -575,42 +504,20 @@ func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) { xapp.Logger.Info("SubDelFail from E2T:%s", params.String()) - // - // - // SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload) if err != nil { - xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err)) return } - // - // - // - subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String()) - return - } - xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId()) - - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err)) return } trans.SubDelFailMsg = SubDelFailMsg + xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -626,17 +533,12 @@ func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) { func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) { xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(uint16(nbrId)) - if subs == nil { - xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) - return - } - - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{nbrId}) + if err != nil { + xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err)) return } + xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil)) responseReceived := trans.CheckResponseReceived() if responseReceived == true { @@ -673,7 +575,7 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction time.Sleep(3 * time.Second) } else { //TODO error handling improvement - xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans) + xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err)) } }