From: Juha Hyttinen Date: Thu, 30 Jan 2020 08:36:33 +0000 (+0200) Subject: Some cleaning and bug fixes X-Git-Tag: 0.4.0~23 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=83ada00338d2c9fa47d48c406b4a46b9d7888aff;p=ric-plt%2Fsubmgr.git Some cleaning and bug fixes Change-Id: Ic7660b9ec386b152262ac8502cc1ebadca8c56b6 Signed-off-by: Juha Hyttinen --- diff --git a/pkg/control/client.go b/pkg/control/client.go index 73e2ee0..fdafcb6 100644 --- a/pkg/control/client.go +++ b/pkg/control/client.go @@ -34,13 +34,12 @@ import ( // //----------------------------------------------------------------------------- type SubRouteInfo struct { - Command Action - EpList RmrEndpointList - SubID uint16 + EpList RmrEndpointList + SubID uint16 } func (sri *SubRouteInfo) String() string { - return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/" + sri.Command.String() + "/[" + sri.EpList.String() + "])" + return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])" } //----------------------------------------------------------------------------- @@ -50,39 +49,48 @@ type RtmgrClient struct { rtClient *rtmgrclient.RoutingManager } -func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error { +func (rc *RtmgrClient) SubscriptionRequestCreate(subRouteAction SubRouteInfo) error { subID := int32(subRouteAction.SubID) - xapp.Logger.Debug("%s ongoing", subRouteAction.String()) - var err error - switch subRouteAction.Command { - case CREATE: - createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} - createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - createHandle.WithXappSubscriptionData(&createData) - _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle) - case DELETE: - deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} - deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - deleteHandle.WithXappSubscriptionData(&deleteData) - _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle) - case UPDATE: - var updateData rtmgr_models.XappList - for i := range subRouteAction.EpList.Endpoints { - updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port}) - } - updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - updateHandle.WithSubscriptionID(subRouteAction.SubID) - updateHandle.WithXappList(updateData) - _, err = rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle) - - default: - return fmt.Errorf("%s unknown", subRouteAction.String()) + xapp.Logger.Debug("CREATE %s ongoing", subRouteAction.String()) + createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} + createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + createHandle.WithXappSubscriptionData(&createData) + _, err := rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle) + if err != nil && !(strings.Contains(err.Error(), "status 200")) { + return fmt.Errorf("CREATE %s failed with error: %s", subRouteAction.String(), err.Error()) } + xapp.Logger.Debug("CREATE %s successful", subRouteAction.String()) + return nil +} +func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error { + xapp.Logger.Debug("UPDATE %s ongoing", subRouteAction.String()) + var updateData rtmgr_models.XappList + for i := range subRouteAction.EpList.Endpoints { + updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port}) + } + updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + updateHandle.WithSubscriptionID(subRouteAction.SubID) + updateHandle.WithXappList(updateData) + _, err := rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle) if err != nil && !(strings.Contains(err.Error(), "status 200")) { - return fmt.Errorf("%s failed with error: %s", subRouteAction.String(), err.Error()) + return fmt.Errorf("UPDATE %s failed with error: %s", subRouteAction.String(), err.Error()) } - xapp.Logger.Debug("%s successful", subRouteAction.String()) + xapp.Logger.Debug("UPDATE %s successful", subRouteAction.String()) return nil } + +func (rc *RtmgrClient) SubscriptionRequestDelete(subRouteAction SubRouteInfo) error { + subID := int32(subRouteAction.SubID) + xapp.Logger.Debug("DELETE %s ongoing", subRouteAction.String()) + deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} + deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + deleteHandle.WithXappSubscriptionData(&deleteData) + _, _, err := rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle) + if err != nil && !(strings.Contains(err.Error(), "status 200")) { + return fmt.Errorf("DELETE %s failed with error: %s", subRouteAction.String(), err.Error()) + } + xapp.Logger.Debug("DELETE %s successful", subRouteAction.String()) + return nil +} diff --git a/pkg/control/control.go b/pkg/control/control.go index dee7e65..1d64f3c 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -57,13 +57,6 @@ type RMRMeid struct { RanName string } -const ( - CREATE Action = 0 - UPDATE Action = 1 - NONE Action = 2 - DELETE Action = 3 -) - func init() { xapp.Logger.Info("SUBMGR") viper.AutomaticEnv() @@ -120,10 +113,10 @@ func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { return } -func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) { +func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = trans.GetMtype() - params.SubId = int(subs.GetSubId()) + params.SubId = int(subs.GetReqId().Seq) params.Xid = "" params.Meid = subs.GetMeid() params.Src = "" @@ -131,13 +124,14 @@ func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) ( params.Payload = trans.Payload.Buf params.Mbuf = nil - return c.rmrSendRaw(desc, params) + return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params) } -func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) { +func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = trans.GetMtype() - params.SubId = int(subs.GetSubId()) + params.SubId = int(subs.GetReqId().Seq) params.Xid = trans.GetXid() params.Meid = trans.GetMeid() params.Src = "" @@ -145,7 +139,7 @@ func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Trans params.Payload = trans.Payload.Buf params.Mbuf = nil - return c.rmrSendRaw(desc, params) + return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params) } func (c *Control) Consume(params *xapp.RMRParams) (err error) { @@ -153,6 +147,7 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { params.Mbuf = nil msg := &RMRParams{params} c.msgCounter++ + switch msg.Mtype { case xapp.RICMessageTypes["RIC_SUB_REQ"]: go c.handleXAPPSubscriptionRequest(msg) @@ -172,15 +167,12 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { return nil } -func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string { + +func idstring(err error, entries ...fmt.Stringer) string { var retval string = "" var filler string = "" - if trans != nil { - retval += filler + trans.String() - filler = " " - } - if subs != nil { - retval += filler + subs.String() + for _, entry := range entries { + retval += filler + entry.String() filler = " " } if err != nil { @@ -195,24 +187,30 @@ func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string { // handle from XAPP Subscription Request //------------------------------------------------------------------ func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) { - xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String()) + xapp.Logger.Info("MSG from XAPP: %s", params.String()) subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) if err != nil { - xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err)) + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params)) return } - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid) - if err != nil { - xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err)) + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid) + if trans == nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return } defer trans.Release() + err = c.tracker.Track(trans) + if err != nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) + return + } + subs, err := c.registry.AssignToSubscription(trans, subReqMsg) if err != nil { - xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err)) + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) return } @@ -228,44 +226,50 @@ func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) { case *e2ap.E2APSubscriptionResponse: trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg) if err == nil { - c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans) + c.rmrSendToXapp("", subs, trans) return } case *e2ap.E2APSubscriptionFailure: trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg) if err == nil { - c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans) + c.rmrSendToXapp("", subs, trans) } - return default: break } } - xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err)) + xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs)) + go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- // handle from XAPP Subscription Delete Request //------------------------------------------------------------------ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) { - xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String()) + xapp.Logger.Info("MSG from XAPP: %s", params.String()) subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) if err != nil { - xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err)) + xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params)) return } - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid) - if err != nil { - xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err)) + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid) + if trans == nil { + xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return } defer trans.Release() - subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)}) + err = c.tracker.Track(trans) + if err != nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) + return + } + + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq}) if err != nil { - xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err)) + xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans)) return } @@ -275,89 +279,65 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) { go c.handleSubscriptionDelete(subs, trans) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) + // Whatever is received send ok delete response subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} - subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id - subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId()) + subDelRespMsg.RequestId = subs.SubReqMsg.RequestId subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg) if err == nil { - c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans) + c.rmrSendToXapp("", subs, trans) } + + go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- // SUBS CREATE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) { +func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) { - trans := c.tracker.NewTransaction(subs.GetMeid()) + trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) defer subs.ReleaseTransactionTurn(trans) defer trans.Release() - xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String()) + xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans)) - subs.mutex.Lock() - if subs.SubRespMsg != nil { - xapp.Logger.Debug("SUBS-SubReq: Handling (immediate resp response) %s parent %s", idstring(nil, subs, nil), parentTrans.String()) - parentTrans.SendEvent(subs.SubRespMsg, 0) - subs.mutex.Unlock() - return - } - if subs.SubFailMsg != nil { - xapp.Logger.Debug("SUBS-SubReq: Handling (immediate fail response) %s parent %s", idstring(nil, subs, nil), parentTrans.String()) - parentTrans.SendEvent(subs.SubFailMsg, 0) - subs.mutex.Unlock() - go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) - return - } - if subs.valid == false { - xapp.Logger.Debug("SUBS-SubReq: Handling (immediate nil response) %s parent %s", idstring(nil, subs, nil), parentTrans.String()) - parentTrans.SendEvent(nil, 0) - subs.mutex.Unlock() - go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) - return - } - subs.mutex.Unlock() - - event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans) - switch themsg := event.(type) { - case *e2ap.E2APSubscriptionResponse: - subs.mutex.Lock() - subs.SubRespMsg = themsg - subs.mutex.Unlock() - parentTrans.SendEvent(event, 0) - return - case *e2ap.E2APSubscriptionFailure: - subs.mutex.Lock() - subs.SubFailMsg = themsg - subs.mutex.Unlock() - parentTrans.SendEvent(event, 0) - default: - xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil)) - subs.mutex.Lock() - subs.valid = false - subs.mutex.Unlock() - c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) - parentTrans.SendEvent(nil, 0) + subRfMsg, valid := subs.GetCachedResponse() + if subRfMsg == nil && valid == true { + event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans) + switch event.(type) { + case *e2ap.E2APSubscriptionResponse: + subRfMsg, valid = subs.SetCachedResponse(event, true) + case *e2ap.E2APSubscriptionFailure: + subRfMsg, valid = subs.SetCachedResponse(event, false) + default: + xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) + subRfMsg, valid = subs.SetCachedResponse(nil, false) + c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + } + xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) + } else { + xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } - go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) + parentTrans.SendEvent(subRfMsg, 0) } //------------------------------------------------------------------- // SUBS DELETE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) { +func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) { - trans := c.tracker.NewTransaction(subs.GetMeid()) + trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) defer subs.ReleaseTransactionTurn(trans) defer trans.Release() - xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String()) + xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans)) subs.mutex.Lock() if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 { @@ -369,41 +349,37 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran } subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} - subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id - subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId()) + subDelRespMsg.RequestId = subs.SubReqMsg.RequestId subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId parentTrans.SendEvent(subDelRespMsg, 0) - - go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) } //------------------------------------------------------------------- // send to E2T Subscription Request //------------------------------------------------------------------- -func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} { +func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} { var err error var event interface{} = nil var timedOut bool = false subReqMsg := subs.SubReqMsg - subReqMsg.RequestId.Id = 123 - subReqMsg.RequestId.Seq = uint32(subs.GetSubId()) + subReqMsg.RequestId = subs.GetReqId().RequestId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg) if err != nil { - xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String()) + xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans)) return event } for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ { - desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries) - c.rmrSend(desc, subs, trans) + desc := fmt.Sprintf("(retry %d)", retries) + c.rmrSendToE2T(desc, subs, trans) event, timedOut = trans.WaitEvent(e2tSubReqTimeout) if timedOut { continue } break } - xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String()) + xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) return event } @@ -411,31 +387,30 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact // send to E2T Subscription Delete Request //------------------------------------------------------------------- -func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} { +func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} { var err error var event interface{} var timedOut bool subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{} - subDelReqMsg.RequestId.Id = 123 - subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId()) + subDelReqMsg.RequestId = subs.GetReqId().RequestId subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg) if err != nil { - xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String()) + xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans)) return event } for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ { - desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries) - c.rmrSend(desc, subs, trans) + desc := fmt.Sprintf("(retry %d)", retries) + c.rmrSendToE2T(desc, subs, trans) event, timedOut = trans.WaitEvent(e2tSubDelReqTime) if timedOut { continue } break } - xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String()) + xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) return event } @@ -443,27 +418,27 @@ func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Tr // handle from E2T Subscription Reponse //------------------------------------------------------------------- func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) { - xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String()) + xapp.Logger.Info("MSG from E2T: %s", params.String()) subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { - xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubResp %s", idstring(err, params)) return } - subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)}) + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq}) if err != nil { - xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params)) return } trans := subs.GetTransaction() if trans == nil { err = fmt.Errorf("Ongoing transaction not found") - xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err)) + xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs)) return } sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) - xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err)) + xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs)) } return } @@ -472,27 +447,27 @@ func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) { // handle from E2T Subscription Failure //------------------------------------------------------------------- func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) { - xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String()) + xapp.Logger.Info("MSG from E2T: %s", params.String()) subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload) if err != nil { - xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubFail %s", idstring(err, params)) return } - subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)}) + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq}) if err != nil { - xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params)) return } trans := subs.GetTransaction() if trans == nil { err = fmt.Errorf("Ongoing transaction not found") - xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err)) + xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs)) return } sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) - xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err)) + xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs)) } return } @@ -501,27 +476,27 @@ func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) { // handle from E2T Subscription Delete Response //------------------------------------------------------------------- func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) { - xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String()) + xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload) if err != nil { - xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params)) return } - subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)}) + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq}) if err != nil { - xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params)) return } trans := subs.GetTransaction() if trans == nil { err = fmt.Errorf("Ongoing transaction not found") - xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err)) + xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs)) return } sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) - xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err)) + xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs)) } return } @@ -530,27 +505,27 @@ func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err er // handle from E2T Subscription Delete Failure //------------------------------------------------------------------- func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) { - xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String()) + xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload) if err != nil { - xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params)) return } - subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)}) + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq}) if err != nil { - xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err)) + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params)) return } trans := subs.GetTransaction() if trans == nil { err = fmt.Errorf("Ongoing transaction not found") - xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err)) + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs)) return } sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) - xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err)) + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs)) } return } diff --git a/pkg/control/registry.go b/pkg/control/registry.go index 6abdcdb..e00062b 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -30,22 +30,23 @@ import ( //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- + type Registry struct { mutex sync.Mutex - register map[uint16]*Subscription - subIds []uint16 + register map[uint32]*Subscription + subIds []uint32 rtmgrClient *RtmgrClient } func (r *Registry) Initialize() { - r.register = make(map[uint16]*Subscription) - var i uint16 + r.register = make(map[uint32]*Subscription) + var i uint32 for i = 0; i < 65535; i++ { r.subIds = append(r.subIds, i+1) } } -func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { +func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { if len(r.subIds) > 0 { sequenceNumber := r.subIds[0] r.subIds = r.subIds[1:] @@ -55,14 +56,15 @@ func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscrip } subs := &Subscription{ registry: r, - Seq: sequenceNumber, Meid: trans.Meid, SubReqMsg: subReqMsg, valid: true, } + subs.ReqId.Id = 123 + subs.ReqId.Seq = sequenceNumber if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false { - r.subIds = append(r.subIds, subs.Seq) + r.subIds = append(r.subIds, subs.ReqId.Seq) return nil, fmt.Errorf("Registry: Endpoint existing already in subscription") } @@ -71,9 +73,10 @@ func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscrip return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids") } -func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription { +func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription { + for _, subs := range r.register { - if subs.IsSame(trans, subReqMsg) { + if subs.IsMergeable(trans, subReqMsg) { // // check if there has been race conditions @@ -84,6 +87,11 @@ func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubs subs.mutex.Unlock() continue } + // If size is zero, entry is to be deleted + if subs.EpList.Size() == 0 { + subs.mutex.Unlock() + continue + } // try to add to endpointlist. if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false { subs.mutex.Unlock() @@ -91,15 +99,14 @@ func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubs } subs.mutex.Unlock() - //Race collision during parallel incoming and deleted - xapp.Logger.Debug("Registry: Identical subs found %s for %s", subs.String(), trans.String()) + xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String()) return subs } } return nil } -func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { +func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { var err error var newAlloc bool r.mutex.Lock() @@ -128,31 +135,31 @@ func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2AP // Subscription route updates // if epamount == 1 { - subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq} - err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)} + err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction) } else { - subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq} + subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)} err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) } r.mutex.Lock() if err != nil { if newAlloc { - r.subIds = append(r.subIds, subs.Seq) + r.subIds = append(r.subIds, subs.ReqId.Seq) } return nil, err } if newAlloc { - r.register[subs.Seq] = subs + r.register[subs.ReqId.Seq] = subs } - xapp.Logger.Debug("Registry: Create %s", subs.String()) + xapp.Logger.Debug("CREATE %s", subs.String()) xapp.Logger.Debug("Registry: substable=%v", r.register) return subs, nil } -// TODO: Needs better logic when there is concurrent calls -func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error { +// TODO: Works with concurrent calls, but check if can be improved +func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error { r.mutex.Lock() defer r.mutex.Unlock() @@ -161,17 +168,12 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint()) epamount := subs.EpList.Size() + seqId := subs.ReqId.Seq - // - // If last endpoint remove from register map - // - if epamount == 0 { - if _, ok := r.register[subs.Seq]; ok { - xapp.Logger.Debug("Registry: Delete %s", subs.String()) - delete(r.register, subs.Seq) - xapp.Logger.Debug("Registry: substable=%v", r.register) - } + if delStatus == false { + return nil } + r.mutex.Unlock() // @@ -183,35 +185,38 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction subs.mutex.Lock() } - xapp.Logger.Info("Registry: Cleaning %s", subs.String()) + xapp.Logger.Info("CLEAN %s", subs.String()) // // Subscription route updates // - if delStatus { - if epamount == 0 { - tmpList := RmrEndpointList{} - tmpList.AddEndpoint(trans.GetEndpoint()) - subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq} - r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - } else { - subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq} - r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - } + if epamount == 0 { + tmpList := RmrEndpointList{} + tmpList.AddEndpoint(trans.GetEndpoint()) + subRouteAction := SubRouteInfo{tmpList, uint16(seqId)} + r.rtmgrClient.SubscriptionRequestDelete(subRouteAction) + } else if subs.EpList.Size() > 0 { + subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)} + r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) } r.mutex.Lock() // - // If last endpoint free seq nro + // If last endpoint, release and free seqid // if epamount == 0 { - r.subIds = append(r.subIds, subs.Seq) + if _, ok := r.register[seqId]; ok { + xapp.Logger.Debug("RELEASE %s", subs.String()) + delete(r.register, seqId) + xapp.Logger.Debug("Registry: substable=%v", r.register) + } + r.subIds = append(r.subIds, seqId) } return nil } -func (r *Registry) GetSubscription(sn uint16) *Subscription { +func (r *Registry) GetSubscription(sn uint32) *Subscription { r.mutex.Lock() defer r.mutex.Unlock() if _, ok := r.register[sn]; ok { @@ -220,7 +225,7 @@ func (r *Registry) GetSubscription(sn uint16) *Subscription { return nil } -func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) { +func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) { r.mutex.Lock() defer r.mutex.Unlock() for _, id := range ids { diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 5bfe2e1..c2b5283 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -22,7 +22,6 @@ package control import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" "sync" ) @@ -30,62 +29,65 @@ import ( // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex // Lock - valid bool // valid - registry *Registry // Registry - Seq uint16 // SubsId - Meid *xapp.RMRMeid // Meid/ RanName - EpList RmrEndpointList // Endpoints - TransLock sync.Mutex // Lock transactions, only one executed per time for subs - TheTrans *Transaction // Ongoing transaction from xapp - SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information - SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information - SubFailMsg *e2ap.E2APSubscriptionFailure // Subscription information + mutex sync.Mutex // Lock + valid bool // valid + registry *Registry // Registry + ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid) + Meid *xapp.RMRMeid // Meid/ RanName + EpList RmrEndpointList // Endpoints + TransLock sync.Mutex // Lock transactions, only one executed per time for subs + TheTrans TransactionIf // Ongoing transaction + SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information + SubRFMsg interface{} // Subscription information } func (s *Subscription) String() string { - return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" + return "subs(" + s.ReqId.String() + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" } -func (s *Subscription) GetSubId() uint16 { +func (s *Subscription) GetCachedResponse() (interface{}, bool) { s.mutex.Lock() defer s.mutex.Unlock() - return s.Seq + return s.SubRFMsg, s.valid } -func (s *Subscription) GetMeid() *xapp.RMRMeid { +func (s *Subscription) SetCachedResponse(subRFMsg interface{}, valid bool) (interface{}, bool) { s.mutex.Lock() defer s.mutex.Unlock() - if s.Meid != nil { - return s.Meid - } - return nil + s.SubRFMsg = subRFMsg + s.valid = valid + return s.SubRFMsg, s.valid } -func (s *Subscription) IsTransactionReserved() bool { +func (s *Subscription) GetReqId() *RequestId { s.mutex.Lock() defer s.mutex.Unlock() - if s.TheTrans != nil { - return true - } - return false + return &s.ReqId +} +func (s *Subscription) GetMeid() *xapp.RMRMeid { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.Meid != nil { + return s.Meid + } + return nil } -func (s *Subscription) GetTransaction() *Transaction { +func (s *Subscription) GetTransaction() TransactionIf { s.mutex.Lock() defer s.mutex.Unlock() return s.TheTrans } -func (s *Subscription) WaitTransactionTurn(trans *Transaction) { +func (s *Subscription) WaitTransactionTurn(trans TransactionIf) { s.TransLock.Lock() s.mutex.Lock() s.TheTrans = trans s.mutex.Unlock() } -func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) { +func (s *Subscription) ReleaseTransactionTurn(trans TransactionIf) { s.mutex.Lock() if trans != nil && trans == s.TheTrans { s.TheTrans = nil @@ -94,7 +96,7 @@ func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) { s.TransLock.Unlock() } -func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) bool { +func (s *Subscription) IsMergeable(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) bool { s.mutex.Lock() defer s.mutex.Unlock() @@ -110,15 +112,6 @@ func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscripti return false } - if s.EpList.Size() == 0 { - return false - } - - //Somehow special case ... ? - if s.EpList.HasEndpoint(trans.GetEndpoint()) == true { - return false - } - // EventTrigger check if s.SubReqMsg.EventTriggerDefinition.InterfaceDirection != subReqMsg.EventTriggerDefinition.InterfaceDirection || s.SubReqMsg.EventTriggerDefinition.ProcedureCode != subReqMsg.EventTriggerDefinition.ProcedureCode || @@ -156,6 +149,10 @@ func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscripti return false } + if acts.ActionType != e2ap.E2AP_ActionTypeReport { + return false + } + if acts.ActionDefinition.Present != actt.ActionDefinition.Present || acts.ActionDefinition.StyleId != actt.ActionDefinition.StyleId || acts.ActionDefinition.ParamId != actt.ActionDefinition.ParamId { diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index c16a76a..2f54237 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -30,71 +30,81 @@ import ( //----------------------------------------------------------------------------- type Tracker struct { mutex sync.Mutex - transactionXappTable map[TransactionXappKey]*Transaction + transactionXappTable map[TransactionXappKey]*TransactionXapp transSeq uint64 } func (t *Tracker) Init() { - t.transactionXappTable = make(map[TransactionXappKey]*Transaction) + t.transactionXappTable = make(map[TransactionXappKey]*TransactionXapp) } -func (t *Tracker) NewTransactionFromSkel(transSkel *Transaction) *Transaction { +func (t *Tracker) initTransaction(transBase *Transaction) { t.mutex.Lock() defer t.mutex.Unlock() - trans := transSkel - if trans == nil { - trans = &Transaction{} - } - trans.EventChan = make(chan interface{}) - trans.tracker = t - trans.Seq = t.transSeq + transBase.EventChan = make(chan interface{}) + transBase.tracker = t + transBase.Seq = t.transSeq t.transSeq++ - xapp.Logger.Debug("Transaction: Create %s", trans.String()) - return trans } -func (t *Tracker) NewTransaction(meid *xapp.RMRMeid) *Transaction { - trans := &Transaction{} - trans.Meid = meid - trans = t.NewTransactionFromSkel(trans) +func (t *Tracker) NewSubsTransaction(subs *Subscription) *TransactionSubs { + trans := &TransactionSubs{} + trans.Meid = subs.GetMeid() + rid := subs.GetReqId() + if rid != nil { + trans.ReqId = *rid + } + t.initTransaction(&trans.Transaction) + xapp.Logger.Debug("CREATE %s", trans.String()) return trans } -func (t *Tracker) TrackTransaction( +func (t *Tracker) NewXappTransaction( endpoint *RmrEndpoint, xid string, - meid *xapp.RMRMeid) (*Transaction, error) { - - if endpoint == nil { - err := fmt.Errorf("Tracker: No valid endpoint given") - return nil, err - } + reqId *RequestId, + meid *xapp.RMRMeid) *TransactionXapp { - trans := &Transaction{} + trans := &TransactionXapp{} trans.XappKey = &TransactionXappKey{*endpoint, xid} trans.Meid = meid - trans = t.NewTransactionFromSkel(trans) + if reqId != nil { + trans.ReqId = *reqId + } + t.initTransaction(&trans.Transaction) + xapp.Logger.Debug("CREATE %s", trans.String()) + return trans +} + +func (t *Tracker) Track(trans *TransactionXapp) error { + + if trans.GetEndpoint() == nil { + err := fmt.Errorf("Tracker: No valid endpoint given in %s", trans.String()) + return err + } t.mutex.Lock() defer t.mutex.Unlock() - if othtrans, ok := t.transactionXappTable[*trans.XappKey]; ok { - err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans) - return nil, err + theKey := *trans.XappKey + + if othtrans, ok := t.transactionXappTable[theKey]; ok { + err := fmt.Errorf("Tracker: %s is ongoing, not tracking %s", othtrans, trans) + return err } trans.tracker = t - t.transactionXappTable[*trans.XappKey] = trans - xapp.Logger.Debug("Tracker: Add %s", trans.String()) + t.transactionXappTable[theKey] = trans + xapp.Logger.Debug("Tracker: Append %s", trans.String()) //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable) - return trans, nil + return nil } -func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction, error) { +func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*TransactionXapp, error) { t.mutex.Lock() defer t.mutex.Unlock() if trans, ok2 := t.transactionXappTable[xappKey]; ok2 { - xapp.Logger.Debug("Tracker: Delete %s", trans.String()) + xapp.Logger.Debug("Tracker: Remove %s", trans.String()) delete(t.transactionXappTable, xappKey) //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable) return trans, nil diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go index 735954e..b2b838b 100644 --- a/pkg/control/transaction.go +++ b/pkg/control/transaction.go @@ -30,18 +30,33 @@ import ( //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- +type TransactionIf interface { + String() string + Release() + SendEvent(interface{}, time.Duration) (bool, bool) + WaitEvent(time.Duration) (interface{}, bool) +} -type TransactionBase struct { +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- + +type Transaction struct { mutex sync.Mutex // - Seq uint64 // + Seq uint64 //transaction sequence tracker *Tracker //tracker instance Meid *xapp.RMRMeid //meid transaction related + ReqId RequestId // Mtype int //Encoded message type to be send Payload *packer.PackedData //Encoded message to be send EventChan chan interface{} } -func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) (bool, bool) { +func (t *Transaction) String() string { + return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + t.ReqId.String() + ")" +} + +func (t *Transaction) SendEvent(event interface{}, waittime time.Duration) (bool, bool) { if waittime > 0 { select { case t.EventChan <- event: @@ -55,7 +70,7 @@ func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) ( return true, false } -func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool) { +func (t *Transaction) WaitEvent(waittime time.Duration) (interface{}, bool) { if waittime > 0 { select { case event := <-t.EventChan: @@ -68,13 +83,19 @@ func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool) return event, false } -func (t *TransactionBase) GetMtype() int { +func (t *Transaction) GetReqId() *RequestId { + t.mutex.Lock() + defer t.mutex.Unlock() + return &t.ReqId +} + +func (t *Transaction) GetMtype() int { t.mutex.Lock() defer t.mutex.Unlock() return t.Mtype } -func (t *TransactionBase) GetMeid() *xapp.RMRMeid { +func (t *Transaction) GetMeid() *xapp.RMRMeid { t.mutex.Lock() defer t.mutex.Unlock() if t.Meid != nil { @@ -83,12 +104,30 @@ func (t *TransactionBase) GetMeid() *xapp.RMRMeid { return nil } -func (t *TransactionBase) GetPayload() *packer.PackedData { +func (t *Transaction) GetPayload() *packer.PackedData { t.mutex.Lock() defer t.mutex.Unlock() return t.Payload } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type TransactionSubs struct { + Transaction // +} + +func (t *TransactionSubs) String() string { + return "transsubs(" + t.Transaction.String() + ")" +} + +func (t *TransactionSubs) Release() { + t.mutex.Lock() + xapp.Logger.Debug("RELEASE %s", t.String()) + t.tracker = nil + t.mutex.Unlock() +} + //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- @@ -104,20 +143,20 @@ func (key *TransactionXappKey) String() string { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -type Transaction struct { - TransactionBase // - XappKey *TransactionXappKey // +type TransactionXapp struct { + Transaction // + XappKey *TransactionXappKey // } -func (t *Transaction) String() string { +func (t *TransactionXapp) String() string { var transkey string = "transkey(N/A)" if t.XappKey != nil { transkey = t.XappKey.String() } - return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + transkey + ")" + return "transxapp(" + t.Transaction.String() + "/" + transkey + ")" } -func (t *Transaction) GetEndpoint() *RmrEndpoint { +func (t *TransactionXapp) GetEndpoint() *RmrEndpoint { t.mutex.Lock() defer t.mutex.Unlock() if t.XappKey != nil { @@ -126,7 +165,7 @@ func (t *Transaction) GetEndpoint() *RmrEndpoint { return nil } -func (t *Transaction) GetXid() string { +func (t *TransactionXapp) GetXid() string { t.mutex.Lock() defer t.mutex.Unlock() if t.XappKey != nil { @@ -135,7 +174,7 @@ func (t *Transaction) GetXid() string { return "" } -func (t *Transaction) GetSrc() string { +func (t *TransactionXapp) GetSrc() string { t.mutex.Lock() defer t.mutex.Unlock() if t.XappKey != nil { @@ -144,9 +183,9 @@ func (t *Transaction) GetSrc() string { return "" } -func (t *Transaction) Release() { +func (t *TransactionXapp) Release() { t.mutex.Lock() - xapp.Logger.Debug("Transaction: Release %s", t.String()) + xapp.Logger.Debug("RELEASE %s", t.String()) tracker := t.tracker xappkey := t.XappKey t.tracker = nil diff --git a/pkg/control/types.go b/pkg/control/types.go index 164e801..4d318e0 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -22,6 +22,7 @@ package control import ( "bytes" "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "strings" @@ -30,10 +31,12 @@ import ( //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -type RmrDatagram struct { - MessageType int - SubscriptionId uint16 - Payload []byte +type RequestId struct { + e2ap.RequestId +} + +func (rid *RequestId) String() string { + return "reqid(" + rid.RequestId.String() + ")" } //----------------------------------------------------------------------------- @@ -45,7 +48,15 @@ type RmrEndpoint struct { } func (endpoint RmrEndpoint) String() string { - return endpoint.Get() + return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10) +} + +func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool { + if (endpoint.Addr == ep.Addr) && + (endpoint.Port == ep.Port) { + return true + } + return false } func (endpoint *RmrEndpoint) GetAddr() string { @@ -56,10 +67,6 @@ func (endpoint *RmrEndpoint) GetPort() uint16 { return endpoint.Port } -func (endpoint *RmrEndpoint) Get() string { - return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10) -} - func (endpoint *RmrEndpoint) Set(src string) bool { elems := strings.Split(src, ":") if len(elems) == 2 { @@ -82,11 +89,10 @@ type RmrEndpointList struct { } func (eplist *RmrEndpointList) String() string { + tmpList := eplist.Endpoints valuesText := []string{} - for i := range eplist.Endpoints { - ep := eplist.Endpoints[i] - text := ep.String() - valuesText = append(valuesText, text) + for i := range tmpList { + valuesText = append(valuesText, tmpList[i].String()) } return strings.Join(valuesText, ",") } @@ -97,7 +103,7 @@ func (eplist *RmrEndpointList) Size() int { func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool { for i := range eplist.Endpoints { - if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) { + if eplist.Endpoints[i].Equal(ep) { return false } } @@ -107,7 +113,7 @@ func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool { func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool { for i := range eplist.Endpoints { - if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) { + if eplist.Endpoints[i].Equal(ep) { eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1] eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0} eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1] @@ -120,7 +126,7 @@ func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool { func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool { var retval bool = false for i := range otheplist.Endpoints { - if eplist.DelEndpoint(&eplist.Endpoints[i]) { + if eplist.DelEndpoint(&otheplist.Endpoints[i]) { retval = true } } @@ -129,7 +135,7 @@ func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool { func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool { for i := range eplist.Endpoints { - if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) { + if eplist.Endpoints[i].Equal(ep) { return true } } @@ -144,25 +150,6 @@ func NewRmrEndpoint(src string) *RmrEndpoint { return ep } -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -type Action int - -func (act Action) String() string { - actions := [...]string{ - "CREATE", - "UPDATE", - "NONE", - "DELETE", - } - - if act < CREATE || act > DELETE { - return "UNKNOWN" - } - return actions[act] -} - //----------------------------------------------------------------------------- // To add own method for rmrparams //----------------------------------------------------------------------------- diff --git a/pkg/control/types_test.go b/pkg/control/types_test.go index caeae7b..5e1cfac 100644 --- a/pkg/control/types_test.go +++ b/pkg/control/types_test.go @@ -39,7 +39,7 @@ func TestRmrEndpoint(t *testing.T) { testError(t, "Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort()) } if expect.String() != res.String() { - testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.Get()) + testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.String()) } } @@ -52,20 +52,71 @@ func TestRmrEndpoint(t *testing.T) { testEp(t, "", nil) } -func TestAction(t *testing.T) { +func TestRmrEndpointList(t *testing.T) { + epl := &RmrEndpointList{} - testActionString := func(t *testing.T, val int, str string) { - if Action(val).String() != str { - testError(t, "String for value %d expected %s got %s", val, str, Action(val).String()) - } + // Simple add / has / delete + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + testError(t, "RmrEndpointList: 8080 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true { + testError(t, "RmrEndpointList: 8080 duplicate add success") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + testError(t, "RmrEndpointList: 8081 add failed") + } + if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + testError(t, "RmrEndpointList: 8081 has failed") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + testError(t, "RmrEndpointList: 8081 del failed") + } + if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true { + testError(t, "RmrEndpointList: 8081 has non existing success") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true { + testError(t, "RmrEndpointList: 8081 del non existing success") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + testError(t, "RmrEndpointList: 8080 del failed") + } + + // list delete + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + testError(t, "RmrEndpointList: 8080 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + testError(t, "RmrEndpointList: 8081 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false { + testError(t, "RmrEndpointList: 8082 add failed") + } + + epl2 := &RmrEndpointList{} + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false { + testError(t, "RmrEndpointList: othlist add 9080 failed") + } + + if epl.DelEndpoints(epl2) == true { + testError(t, "RmrEndpointList: delete list not existing successs") + } + + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + testError(t, "RmrEndpointList: othlist add 8080 failed") + } + if epl.DelEndpoints(epl2) == false { + testError(t, "RmrEndpointList: delete list 8080,9080 failed") + } + + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + testError(t, "RmrEndpointList: othlist add 8081 failed") + } + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false { + testError(t, "RmrEndpointList: othlist add 8082 failed") + } + + if epl.DelEndpoints(epl2) == false { + testError(t, "RmrEndpointList: delete list 8080,8081,8082,9080 failed") } - testActionString(t, 0, "CREATE") - testActionString(t, 1, "UPDATE") - testActionString(t, 2, "NONE") - testActionString(t, 3, "DELETE") - testActionString(t, 5, "UNKNOWN") - testActionString(t, 6, "UNKNOWN") - testActionString(t, 7, "UNKNOWN") - testActionString(t, 10, "UNKNOWN") } diff --git a/pkg/control/ut_ctrl_submgr_test.go b/pkg/control/ut_ctrl_submgr_test.go index 8403c93..16da422 100644 --- a/pkg/control/ut_ctrl_submgr_test.go +++ b/pkg/control/ut_ctrl_submgr_test.go @@ -57,11 +57,11 @@ func (mc *testingSubmgrControl) wait_registry_empty(t *testing.T, secs int) bool return false } -func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool { +func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId uint32, secs int) bool { var subs *Subscription i := 1 for ; i <= secs*2; i++ { - subs = mc.c.registry.GetSubscription(uint16(e2SubsId)) + subs = mc.c.registry.GetSubscription(e2SubsId) if subs == nil { return true } @@ -75,11 +75,11 @@ func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId int, secs return false } -func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId int, secs int) bool { - var trans *Transaction +func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId uint32, secs int) bool { + var trans TransactionIf i := 1 for ; i <= secs*2; i++ { - subs := mc.c.registry.GetSubscription(uint16(e2SubsId)) + subs := mc.c.registry.GetSubscription(e2SubsId) if subs == nil { return true } @@ -97,13 +97,13 @@ func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId int return false } -func (mc *testingSubmgrControl) get_subid(t *testing.T) uint16 { +func (mc *testingSubmgrControl) get_subid(t *testing.T) uint32 { mc.c.registry.mutex.Lock() defer mc.c.registry.mutex.Unlock() return mc.c.registry.subIds[0] } -func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint16, secs int) (uint16, bool) { +func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint32, secs int) (uint32, bool) { i := 1 for ; i <= secs*2; i++ { mc.c.registry.mutex.Lock() diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go index 40207b3..2e8bc0e 100644 --- a/pkg/control/ut_messaging_test.go +++ b/pkg/control/ut_messaging_test.go @@ -56,7 +56,7 @@ func TestSubReqAndRouteNok(t *testing.T) { waiter.WaitResult(t) //Wait that subs is cleaned - mainCtrl.wait_subs_clean(t, int(newSubsId), 10) + mainCtrl.wait_subs_clean(t, newSubsId, 10) xappConn1.TestMsgCnt(t) xappConn2.TestMsgCnt(t) @@ -585,7 +585,7 @@ func TestSubReqRetryNoRespSubDelRespInSubmgr(t *testing.T) { e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg) // Wait that subs is cleaned - mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 10) + mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 10) xappConn1.TestMsgCnt(t) xappConn2.TestMsgCnt(t) @@ -642,7 +642,7 @@ func TestSubReqTwoRetriesNoRespAtAllInSubmgr(t *testing.T) { delreq, _ := e2termConn.handle_e2term_subs_del_req(t) // Wait that subs is cleaned - mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 15) + mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 15) xappConn1.TestMsgCnt(t) xappConn2.TestMsgCnt(t) @@ -1156,7 +1156,7 @@ func TestSubReqAndSubDelNoAnswerSameActionParallel(t *testing.T) { e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1) //Wait that subs is cleaned - mainCtrl.wait_subs_clean(t, int(delreq1.RequestId.Seq), 10) + mainCtrl.wait_subs_clean(t, delreq1.RequestId.Seq, 10) xappConn1.TestMsgCnt(t) xappConn2.TestMsgCnt(t) diff --git a/pkg/control/ut_stub_e2term_test.go b/pkg/control/ut_stub_e2term_test.go index 18a6264..273d159 100644 --- a/pkg/control/ut_stub_e2term_test.go +++ b/pkg/control/ut_stub_e2term_test.go @@ -132,7 +132,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_resp(t *testing.T, req * } e2SubsResp.Set(resp) - xapp.Logger.Debug("%s", e2SubsResp.String()) + xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsResp.String()) packerr, packedMsg := e2SubsResp.Pack(nil) if packerr != nil { testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) @@ -198,7 +198,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_fail(t *testing.T, fpara xapp.Logger.Info("(%s) Send Subs Fail", e2termConn.desc) e2SubsFail.Set(fparams.fail) - xapp.Logger.Debug("%s", e2SubsFail.String()) + xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsFail.String()) packerr, packedMsg := e2SubsFail.Pack(nil) if packerr != nil { testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) @@ -276,7 +276,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_del_resp(t *testing.T, r resp.FunctionId = req.FunctionId e2SubsDelResp.Set(resp) - xapp.Logger.Debug("%s", e2SubsDelResp.String()) + xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelResp.String()) packerr, packedMsg := e2SubsDelResp.Pack(nil) if packerr != nil { testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) @@ -316,7 +316,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_del_fail(t *testing.T, r resp.Cause.CauseVal = 4 // unspecified e2SubsDelFail.Set(resp) - xapp.Logger.Debug("%s", e2SubsDelFail.String()) + xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelFail.String()) packerr, packedMsg := e2SubsDelFail.Pack(nil) if packerr != nil { testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) diff --git a/pkg/control/ut_stub_xapp_test.go b/pkg/control/ut_stub_xapp_test.go index 331d14d..ac72b76 100644 --- a/pkg/control/ut_stub_xapp_test.go +++ b/pkg/control/ut_stub_xapp_test.go @@ -194,10 +194,10 @@ func (xappConn *testingXappStub) handle_xapp_subs_req(t *testing.T, rparams *tes //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int { +func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) uint32 { xapp.Logger.Info("(%s) handle_xapp_subs_resp", xappConn.desc) e2SubsResp := xapp_e2asnpacker.NewPackerSubscriptionResponse() - var e2SubsId int + var e2SubsId uint32 //--------------------------------- // xapp activity: Recv Subs Resp @@ -207,14 +207,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xapp xappConn.DecMsgCnt() if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] { testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype]) - return -1 + return 0 } else if msg.Xid != trans.xid { testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid) - return -1 + return 0 } else { packedData := &packer.PackedData{} packedData.Buf = msg.Payload - e2SubsId = msg.SubId + if msg.SubId > 0 { + e2SubsId = uint32(msg.SubId) + } else { + e2SubsId = 0 + } unpackerr := e2SubsResp.UnPack(packedData) if unpackerr != nil { @@ -230,18 +234,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xapp } case <-time.After(15 * time.Second): testError(t, "(%s) Not Received RIC_SUB_RESP within 15 secs", xappConn.desc) - return -1 + return 0 } - return -1 + return 0 } //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) int { +func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) uint32 { xapp.Logger.Info("(%s) handle_xapp_subs_fail", xappConn.desc) e2SubsFail := xapp_e2asnpacker.NewPackerSubscriptionFailure() - var e2SubsId int + var e2SubsId uint32 //------------------------------- // xapp activity: Recv Subs Fail @@ -251,14 +255,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xapp xappConn.DecMsgCnt() if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_FAILURE"] { testError(t, "(%s) Received RIC_SUB_FAILURE wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_FAILURE", xapp.RicMessageTypeToName[msg.Mtype]) - return -1 + return 0 } else if msg.Xid != trans.xid { testError(t, "(%s) Received RIC_SUB_FAILURE wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid) - return -1 + return 0 } else { packedData := &packer.PackedData{} packedData.Buf = msg.Payload - e2SubsId = msg.SubId + if msg.SubId > 0 { + e2SubsId = uint32(msg.SubId) + } else { + e2SubsId = 0 + } unpackerr := e2SubsFail.UnPack(packedData) if unpackerr != nil { @@ -274,15 +282,15 @@ func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xapp } case <-time.After(15 * time.Second): testError(t, "(%s) Not Received RIC_SUB_FAILURE within 15 secs", xappConn.desc) - return -1 + return 0 } - return -1 + return 0 } //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction { +func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId uint32) *xappTransaction { xapp.Logger.Info("(%s) handle_xapp_subs_del_req", xappConn.desc) e2SubsDelReq := xapp_e2asnpacker.NewPackerSubscriptionDeleteRequest() @@ -293,7 +301,7 @@ func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans req := &e2ap.E2APSubscriptionDeleteRequest{} req.RequestId.Id = 1 - req.RequestId.Seq = uint32(e2SubsId) + req.RequestId.Seq = e2SubsId req.FunctionId = 1 e2SubsDelReq.Set(req) @@ -311,7 +319,7 @@ func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_DEL_REQ - params.SubId = e2SubsId + params.SubId = int(e2SubsId) params.Payload = packedMsg.Buf params.Meid = trans.meid params.Xid = trans.xid